mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 11:12:12 -05:00
Compare commits
191 Commits
plugin/tem
...
v0.24.14
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4af50e31cc | ||
|
|
a754b749f8 | ||
|
|
547731a2a6 | ||
|
|
18570b7512 | ||
|
|
a1e3b912b3 | ||
|
|
c04b9a4f17 | ||
|
|
65395feeab | ||
|
|
d6e172f715 | ||
|
|
8a2271b587 | ||
|
|
0c89ba25e9 | ||
|
|
5a01961c55 | ||
|
|
96355b96f9 | ||
|
|
5b676409a4 | ||
|
|
8af1bdc2c6 | ||
|
|
de9b4f0f30 | ||
|
|
1bd839556c | ||
|
|
761d208a08 | ||
|
|
ba55930334 | ||
|
|
ddc072009c | ||
|
|
0c7cf30df0 | ||
|
|
b6e613d8b3 | ||
|
|
ae547565cd | ||
|
|
a3944aa710 | ||
|
|
3cec08b352 | ||
|
|
003a6359bd | ||
|
|
2b804d17e4 | ||
|
|
3722642977 | ||
|
|
7ac54bc08d | ||
|
|
bbf55bee8e | ||
|
|
44579b1b62 | ||
|
|
374c589194 | ||
|
|
a933ee6cbe | ||
|
|
9e4eafaf84 | ||
|
|
aa8a54f51b | ||
|
|
2ecdb289b6 | ||
|
|
2f8e530ab1 | ||
|
|
6bf4aeb11b | ||
|
|
fb70b7299e | ||
|
|
4bd261d650 | ||
|
|
d4afed683c | ||
|
|
e92b8da56a | ||
|
|
01a145ac2c | ||
|
|
59eed01e3a | ||
|
|
5efeae2a94 | ||
|
|
650e8fac17 | ||
|
|
73e59496cf | ||
|
|
ed3748e60e | ||
|
|
3fcd8e9da7 | ||
|
|
bc1a60233e | ||
|
|
bd49bb8b7b | ||
|
|
36ba850d7f | ||
|
|
cd080831c5 | ||
|
|
8282bf7940 | ||
|
|
0c494ebeef | ||
|
|
cc58a765e4 | ||
|
|
cbcd76416a | ||
|
|
e98732e121 | ||
|
|
19e265ee1a | ||
|
|
d7fbac8132 | ||
|
|
430bac5039 | ||
|
|
039ccf827a | ||
|
|
e15287a79b | ||
|
|
798ed061d4 | ||
|
|
9717faaade | ||
|
|
631f0c5cdb | ||
|
|
bbf619246e | ||
|
|
cb307780e4 | ||
|
|
95f2bbea1d | ||
|
|
5cb9a340a7 | ||
|
|
c5ccae287e | ||
|
|
40a9732dac | ||
|
|
ae7aefcc17 | ||
|
|
0a81f67981 | ||
|
|
c4757ed915 | ||
|
|
4577070e32 | ||
|
|
7bd519ddb4 | ||
|
|
62c85078b6 | ||
|
|
3718be9658 | ||
|
|
2df6c1b730 | ||
|
|
9af86ea677 | ||
|
|
8601905994 | ||
|
|
9de1a15d02 | ||
|
|
25b056ebb3 | ||
|
|
87d8f9867f | ||
|
|
a00c1f8397 | ||
|
|
f4470095ff | ||
|
|
cbfaa8815d | ||
|
|
10e55bbb77 | ||
|
|
59d5d4cb91 | ||
|
|
e8ee3b0a84 | ||
|
|
602ff849e3 | ||
|
|
155bdca83f | ||
|
|
faaaeada3a | ||
|
|
6ef35974d7 | ||
|
|
46f9bb768f | ||
|
|
ab87f63e8c | ||
|
|
cdb73ccbd7 | ||
|
|
8fc936e0a3 | ||
|
|
1e0ebc94b8 | ||
|
|
5318592eff | ||
|
|
2da08f160d | ||
|
|
8cbc9e7aff | ||
|
|
f8e15d103f | ||
|
|
49794a4f2a | ||
|
|
bafa5fe03c | ||
|
|
208b244f0f | ||
|
|
b93976091d | ||
|
|
eec52d76f0 | ||
|
|
b96fd87572 | ||
|
|
1aa5bfab43 | ||
|
|
c4572e86a5 | ||
|
|
f2f97bb70c | ||
|
|
804c740d3c | ||
|
|
75cd4f44e0 | ||
|
|
f167a2a2bb | ||
|
|
08d9416e3a | ||
|
|
2a879c617c | ||
|
|
3227ca7c11 | ||
|
|
428a52ce02 | ||
|
|
f58bc4caba | ||
|
|
e99ae9513f | ||
|
|
c8b51fcacf | ||
|
|
813b2f6439 | ||
|
|
c6b5bca25b | ||
|
|
de35d2cdb9 | ||
|
|
a6ffbd59d0 | ||
|
|
568740a214 | ||
|
|
aa0d2c545f | ||
|
|
cda77d5146 | ||
|
|
d4fd1f61ba | ||
|
|
9859ea5eb6 | ||
|
|
aca374a28f | ||
|
|
c413ba95e1 | ||
|
|
9c6b92619e | ||
|
|
8173e8df51 | ||
|
|
5c95505911 | ||
|
|
33f0b533bb | ||
|
|
23e35a7f97 | ||
|
|
0357321c58 | ||
|
|
5c08403398 | ||
|
|
a63cb71218 | ||
|
|
317885b91c | ||
|
|
87637302e4 | ||
|
|
056faaaf9f | ||
|
|
54c74a1328 | ||
|
|
fae0c88c5e | ||
|
|
db5d83d1cb | ||
|
|
066b947762 | ||
|
|
b6597475b1 | ||
|
|
f2610baf15 | ||
|
|
b619bf76d8 | ||
|
|
117f453a77 | ||
|
|
053d6276ff | ||
|
|
3870eca70b | ||
|
|
afd7c216f9 | ||
|
|
59a17e88e7 | ||
|
|
99f8dca1c2 | ||
|
|
1068c9fe51 | ||
|
|
ea6d30df7c | ||
|
|
04ba7363c2 | ||
|
|
281a987944 | ||
|
|
c9ce54b0be | ||
|
|
ccd9baef3c | ||
|
|
97869b9c75 | ||
|
|
1c681c1492 | ||
|
|
de2a446f93 | ||
|
|
d778947017 | ||
|
|
3f97845fdd | ||
|
|
631cd169a1 | ||
|
|
1648fa076c | ||
|
|
474806882e | ||
|
|
65467bd118 | ||
|
|
387bbb80ac | ||
|
|
19d4c64f19 | ||
|
|
809c0a228c | ||
|
|
6a045900fb | ||
|
|
4ada5fe8f3 | ||
|
|
998087ca30 | ||
|
|
146338e48f | ||
|
|
de177b925e | ||
|
|
04bfb19095 | ||
|
|
c913c48785 | ||
|
|
0d5b593d42 | ||
|
|
83f92535c5 | ||
|
|
fd6a0a6c11 | ||
|
|
104c4c97b4 | ||
|
|
21cd21269f | ||
|
|
679befa2fe | ||
|
|
8a0ecdeb8a | ||
|
|
ee8762e138 | ||
|
|
d16324f265 |
29
.github/actions/plugins-list/action.yml
vendored
29
.github/actions/plugins-list/action.yml
vendored
@@ -1,29 +0,0 @@
|
||||
name: 'Load Kestra Plugin List'
|
||||
description: 'Composite action to load list of plugins'
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
default: 'LATEST'
|
||||
required: true
|
||||
plugin-file:
|
||||
description: "File of the plugins"
|
||||
default: './.plugins'
|
||||
required: true
|
||||
outputs:
|
||||
plugins:
|
||||
description: "List of all Kestra plugins"
|
||||
value: ${{ steps.plugins.outputs.plugins }}
|
||||
repositories:
|
||||
description: "List of all Kestra repositories of plugins"
|
||||
value: ${{ steps.plugins.outputs.repositories }}
|
||||
runs:
|
||||
using: composite
|
||||
steps:
|
||||
- name: Get Plugins List
|
||||
id: plugins
|
||||
shell: bash
|
||||
run: |
|
||||
PLUGINS=$([ -f ${{ inputs.plugin-file }} ] && cat ${{ inputs.plugin-file }} | grep "io\\.kestra\\." | sed -e '/#/s/^.//' | sed -e "s/LATEST/${{ inputs.plugin-version }}/g" | cut -d':' -f2- | xargs || echo '');
|
||||
REPOSITORIES=$([ -f ${{ inputs.plugin-file }} ] && cat ${{ inputs.plugin-file }} | grep "io\\.kestra\\." | sed -e '/#/s/^.//' | cut -d':' -f1 | uniq | sort | xargs || echo '')
|
||||
echo "plugins=$PLUGINS" >> $GITHUB_OUTPUT
|
||||
echo "repositories=$REPOSITORIES" >> $GITHUB_OUTPUT
|
||||
20
.github/actions/setup-vars/action.yml
vendored
20
.github/actions/setup-vars/action.yml
vendored
@@ -1,20 +0,0 @@
|
||||
name: 'Setup vars'
|
||||
description: 'Composite action to setup common vars'
|
||||
outputs:
|
||||
tag:
|
||||
description: "Git tag"
|
||||
value: ${{ steps.vars.outputs.tag }}
|
||||
commit:
|
||||
description: "Git commit"
|
||||
value: ${{ steps.vars.outputs.commit }}
|
||||
runs:
|
||||
using: composite
|
||||
steps:
|
||||
# Setup vars
|
||||
- name: Set variables
|
||||
id: vars
|
||||
shell: bash
|
||||
run: |
|
||||
TAG=${GITHUB_REF#refs/*/}
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
echo "commit=$(git rev-parse --short "$GITHUB_SHA")" >> $GITHUB_OUTPUT
|
||||
67
.github/workflows/auto-translate-ui-keys.yml
vendored
67
.github/workflows/auto-translate-ui-keys.yml
vendored
@@ -1,67 +0,0 @@
|
||||
name: Auto-Translate UI keys and create PR
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: "0 9-21 * * *" # Every hour from 9 AM to 9 PM
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
retranslate_modified_keys:
|
||||
description: "Whether to re-translate modified keys even if they already have translations."
|
||||
type: choice
|
||||
options:
|
||||
- "false"
|
||||
- "true"
|
||||
default: "false"
|
||||
required: false
|
||||
|
||||
jobs:
|
||||
translations:
|
||||
name: Translations
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 10
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
name: Checkout
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.x"
|
||||
|
||||
- name: Install Python dependencies
|
||||
run: pip install gitpython openai
|
||||
|
||||
- name: Generate translations
|
||||
run: python ui/src/translations/generate_translations.py ${{ github.event.inputs.retranslate_modified_keys }}
|
||||
env:
|
||||
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
|
||||
|
||||
- name: Set up Node
|
||||
uses: actions/setup-node@v4
|
||||
with:
|
||||
node-version: "20.x"
|
||||
|
||||
- name: Set up Git
|
||||
run: |
|
||||
git config --global user.name "GitHub Action"
|
||||
git config --global user.email "actions@github.com"
|
||||
|
||||
- name: Commit and create PR
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
run: |
|
||||
BRANCH_NAME="chore/update-translations-$(date +%s)"
|
||||
git checkout -b $BRANCH_NAME
|
||||
git add ui/src/translations/*.json
|
||||
if git diff --cached --quiet; then
|
||||
echo "No changes to commit. Exiting with success."
|
||||
exit 0
|
||||
fi
|
||||
git commit -m "chore(core): localize to languages other than english" -m "Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference."
|
||||
git push -u origin $BRANCH_NAME || (git push origin --delete $BRANCH_NAME && git push -u origin $BRANCH_NAME)
|
||||
gh pr create --title "Translations from en.json" --body $'This PR was created automatically by a GitHub Action.\n\nSomeone from the @kestra-io/frontend team needs to review and merge.' --base ${{ github.ref_name }} --head $BRANCH_NAME
|
||||
|
||||
- name: Check keys matching
|
||||
run: node ui/src/translations/check.js
|
||||
85
.github/workflows/codeql-analysis.yml
vendored
85
.github/workflows/codeql-analysis.yml
vendored
@@ -1,85 +0,0 @@
|
||||
# For most projects, this workflow file will not need changing; you simply need
|
||||
# to commit it to your repository.
|
||||
#
|
||||
# You may wish to alter this file to override the set of languages analyzed,
|
||||
# or to provide custom queries or build logic.
|
||||
name: "CodeQL"
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: '0 5 * * 1'
|
||||
|
||||
workflow_dispatch: {}
|
||||
|
||||
jobs:
|
||||
analyze:
|
||||
name: Analyze
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
# Override automatic language detection by changing the below list
|
||||
# Supported options are ['csharp', 'cpp', 'go', 'java', 'javascript', 'python']
|
||||
language: ['java', 'javascript']
|
||||
# Learn more...
|
||||
# https://docs.github.com/en/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#overriding-automatic-language-detection
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
# We must fetch at least the immediate parents so that if this is
|
||||
# a pull request then we can checkout the head.
|
||||
fetch-depth: 2
|
||||
|
||||
# If this run was triggered by a pull request event, then checkout
|
||||
# the head of the pull request instead of the merge commit.
|
||||
- run: git checkout HEAD^2
|
||||
if: ${{ github.event_name == 'pull_request' }}
|
||||
|
||||
# Initializes the CodeQL tools for scanning.
|
||||
- name: Initialize CodeQL
|
||||
uses: github/codeql-action/init@v3
|
||||
with:
|
||||
languages: ${{ matrix.language }}
|
||||
# If you wish to specify custom queries, you can do so here or in a config file.
|
||||
# By default, queries listed here will override any specified in a config file.
|
||||
# Prefix the list here with "+" to use these queries and those in the config file.
|
||||
# queries: ./path/to/local/query, your-org/your-repo/queries@main
|
||||
|
||||
# Set up JDK
|
||||
- name: Set up JDK
|
||||
uses: actions/setup-java@v4
|
||||
if: ${{ matrix.language == 'java' }}
|
||||
with:
|
||||
distribution: 'temurin'
|
||||
java-version: 21
|
||||
|
||||
- name: Setup gradle
|
||||
if: ${{ matrix.language == 'java' }}
|
||||
uses: gradle/actions/setup-gradle@v4
|
||||
|
||||
- name: Build with Gradle
|
||||
if: ${{ matrix.language == 'java' }}
|
||||
run: ./gradlew testClasses -x :ui:assembleFrontend
|
||||
|
||||
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
|
||||
# If this step fails, then you should remove it and run the build manually (see below)
|
||||
- name: Autobuild
|
||||
if: ${{ matrix.language != 'java' }}
|
||||
uses: github/codeql-action/autobuild@v3
|
||||
|
||||
# ℹ️ Command-line programs to run using the OS shell.
|
||||
# 📚 https://git.io/JvXDl
|
||||
|
||||
# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
|
||||
# and modify them (or add more) to build your code if your project
|
||||
# uses a compiled language
|
||||
|
||||
#- run: |
|
||||
# make bootstrap
|
||||
# make release
|
||||
|
||||
- name: Perform CodeQL Analysis
|
||||
uses: github/codeql-action/analyze@v3
|
||||
147
.github/workflows/docker.yml
vendored
147
.github/workflows/docker.yml
vendored
@@ -1,147 +0,0 @@
|
||||
name: Create Docker images on Release
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
retag-latest:
|
||||
description: 'Retag latest Docker images'
|
||||
required: true
|
||||
type: string
|
||||
default: "false"
|
||||
options:
|
||||
- "true"
|
||||
- "false"
|
||||
release-tag:
|
||||
description: 'Kestra Release Tag'
|
||||
required: false
|
||||
type: string
|
||||
plugin-version:
|
||||
description: 'Plugin version'
|
||||
required: false
|
||||
type: string
|
||||
default: "LATEST"
|
||||
env:
|
||||
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
jobs:
|
||||
plugins:
|
||||
name: List Plugins
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
plugins: ${{ steps.plugins.outputs.plugins }}
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
# Get Plugins List
|
||||
- name: Get Plugins List
|
||||
uses: ./.github/actions/plugins-list
|
||||
id: plugins
|
||||
with:
|
||||
plugin-version: ${{ env.PLUGIN_VERSION }}
|
||||
docker:
|
||||
name: Publish Docker
|
||||
needs: [ plugins ]
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
image:
|
||||
- name: "-no-plugins"
|
||||
plugins: ""
|
||||
packages: jattach
|
||||
python-libs: ""
|
||||
- name: ""
|
||||
plugins: ${{needs.plugins.outputs.plugins}}
|
||||
packages: python3 python-is-python3 python3-pip curl jattach
|
||||
python-libs: kestra
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
# Vars
|
||||
- name: Set image name
|
||||
id: vars
|
||||
run: |
|
||||
if [[ "${{ inputs.release-tag }}" == "" ]]; then
|
||||
TAG=${GITHUB_REF#refs/*/}
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
else
|
||||
TAG="${{ inputs.release-tag }}"
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
if [[ "${{ env.PLUGIN_VERSION }}" == *"-SNAPSHOT" ]]; then
|
||||
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ ${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT;
|
||||
else
|
||||
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
# Download release
|
||||
- name: Download release
|
||||
uses: robinraju/release-downloader@v1.12
|
||||
with:
|
||||
tag: ${{steps.vars.outputs.tag}}
|
||||
fileName: 'kestra-*'
|
||||
out-file-path: build/executable
|
||||
|
||||
- name: Copy exe to image
|
||||
run: |
|
||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||
|
||||
# Docker setup
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Docker - Fix Qemu
|
||||
shell: bash
|
||||
run: |
|
||||
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
# Docker Login
|
||||
- name: Login to DockerHub
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
username: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
|
||||
# Docker Build and push
|
||||
- name: Push to Docker Hub
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
context: .
|
||||
push: true
|
||||
tags: ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }}
|
||||
platforms: linux/amd64,linux/arm64
|
||||
build-args: |
|
||||
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }}
|
||||
APT_PACKAGES=${{ matrix.image.packages }}
|
||||
PYTHON_LIBRARIES=${{ matrix.image.python-libs }}
|
||||
|
||||
- name: Install regctl
|
||||
if: github.event.inputs.retag-latest == 'true'
|
||||
uses: regclient/actions/regctl-installer@main
|
||||
|
||||
- name: Retag to latest
|
||||
if: github.event.inputs.retag-latest == 'true'
|
||||
run: |
|
||||
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
|
||||
|
||||
end:
|
||||
runs-on: ubuntu-latest
|
||||
needs:
|
||||
- docker
|
||||
if: always()
|
||||
env:
|
||||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
steps:
|
||||
|
||||
# Slack
|
||||
- name: Slack notification
|
||||
uses: Gamesight/slack-workflow-status@master
|
||||
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
|
||||
with:
|
||||
repo_token: ${{ secrets.GITHUB_TOKEN }}
|
||||
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
name: GitHub Actions
|
||||
icon_emoji: ':github-actions:'
|
||||
channel: 'C02DQ1A7JLR' # _int_git channel
|
||||
86
.github/workflows/e2e.yml
vendored
86
.github/workflows/e2e.yml
vendored
@@ -1,86 +0,0 @@
|
||||
name: 'E2E tests revival'
|
||||
description: 'New E2E tests implementation started by Roman. Based on playwright in npm UI project, tests Kestra OSS develop docker image. These tests are written from zero, lets make them unflaky from the start!.'
|
||||
on:
|
||||
schedule:
|
||||
- cron: "0 * * * *" # Every hour
|
||||
workflow_call:
|
||||
inputs:
|
||||
noInputYet:
|
||||
description: 'not input yet.'
|
||||
required: false
|
||||
type: string
|
||||
default: "no input"
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
noInputYet:
|
||||
description: 'not input yet.'
|
||||
required: false
|
||||
type: string
|
||||
default: "no input"
|
||||
jobs:
|
||||
check:
|
||||
timeout-minutes: 10
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
|
||||
steps:
|
||||
- name: Login to DockerHub
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: ghcr.io
|
||||
username: ${{ github.actor }}
|
||||
password: ${{ github.token }}
|
||||
|
||||
- name: Checkout kestra
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
path: kestra
|
||||
|
||||
# Setup build
|
||||
- uses: kestra-io/actions/.github/actions/setup-build@main
|
||||
name: Setup - Build
|
||||
id: build
|
||||
with:
|
||||
java-enabled: true
|
||||
node-enabled: true
|
||||
python-enabled: true
|
||||
|
||||
- name: Install Npm dependencies
|
||||
run: |
|
||||
cd kestra/ui
|
||||
npm i
|
||||
npx playwright install --with-deps chromium
|
||||
|
||||
- name: Run E2E Tests
|
||||
run: |
|
||||
cd kestra
|
||||
sh build-and-start-e2e-tests.sh
|
||||
|
||||
- name: Upload Playwright Report as Github artifact
|
||||
# 'With this report, you can analyze locally the results of the tests. see https://playwright.dev/docs/ci-intro#html-report'
|
||||
uses: actions/upload-artifact@v4
|
||||
if: ${{ !cancelled() }}
|
||||
with:
|
||||
name: playwright-report
|
||||
path: kestra/ui/playwright-report/
|
||||
retention-days: 7
|
||||
# Allure check
|
||||
# TODO I don't know what it should do
|
||||
# - uses: rlespinasse/github-slug-action@v5
|
||||
# name: Allure - Generate slug variables
|
||||
#
|
||||
# - name: Allure - Publish report
|
||||
# uses: andrcuns/allure-publish-action@v2.9.0
|
||||
# if: always() && env.GOOGLE_SERVICE_ACCOUNT != ''
|
||||
# continue-on-error: true
|
||||
# env:
|
||||
# GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_AUTH_TOKEN }}
|
||||
# JAVA_HOME: /usr/lib/jvm/default-jvm/
|
||||
# with:
|
||||
# storageType: gcs
|
||||
# resultsGlob: "**/build/allure-results"
|
||||
# bucket: internal-kestra-host
|
||||
# baseUrl: "https://internal.dev.kestra.io"
|
||||
# prefix: ${{ format('{0}/{1}', github.repository, 'allure/java') }}
|
||||
# copyLatest: true
|
||||
# ignoreMissingResults: true
|
||||
82
.github/workflows/gradle-release-plugins.yml
vendored
82
.github/workflows/gradle-release-plugins.yml
vendored
@@ -1,82 +0,0 @@
|
||||
name: Run Gradle Release for Kestra Plugins
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
releaseVersion:
|
||||
description: 'The release version (e.g., 0.21.0-rc1)'
|
||||
required: true
|
||||
type: string
|
||||
nextVersion:
|
||||
description: 'The next version (e.g., 0.22.0-SNAPSHOT)'
|
||||
required: true
|
||||
type: string
|
||||
dryRun:
|
||||
description: 'Use DRY_RUN mode'
|
||||
required: false
|
||||
default: 'false'
|
||||
jobs:
|
||||
release:
|
||||
name: Release plugins
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
ref: main
|
||||
|
||||
# Setup build
|
||||
- uses: ./actions/.github/actions/setup-build
|
||||
id: build
|
||||
with:
|
||||
java-enabled: true
|
||||
node-enabled: true
|
||||
python-enabled: true
|
||||
caches-enabled: true
|
||||
|
||||
# Get Plugins List
|
||||
- name: Get Plugins List
|
||||
uses: ./.github/actions/plugins-list
|
||||
id: plugins-list
|
||||
with:
|
||||
plugin-version: 'LATEST'
|
||||
|
||||
- name: 'Configure Git'
|
||||
run: |
|
||||
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
|
||||
git config --global user.name "github-actions[bot]"
|
||||
|
||||
# Execute
|
||||
- name: Run Gradle Release
|
||||
if: ${{ github.event.inputs.dryRun == 'false' }}
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
chmod +x ./dev-tools/release-plugins.sh;
|
||||
|
||||
./dev-tools/release-plugins.sh \
|
||||
--release-version=${{github.event.inputs.releaseVersion}} \
|
||||
--next-version=${{github.event.inputs.nextVersion}} \
|
||||
--yes \
|
||||
${{ steps.plugins-list.outputs.repositories }}
|
||||
|
||||
- name: Run Gradle Release (DRY_RUN)
|
||||
if: ${{ github.event.inputs.dryRun == 'true' }}
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
chmod +x ./dev-tools/release-plugins.sh;
|
||||
|
||||
./dev-tools/release-plugins.sh \
|
||||
--release-version=${{github.event.inputs.releaseVersion}} \
|
||||
--next-version=${{github.event.inputs.nextVersion}} \
|
||||
--dry-run \
|
||||
--yes \
|
||||
${{ steps.plugins-list.outputs.repositories }}
|
||||
92
.github/workflows/gradle-release.yml
vendored
92
.github/workflows/gradle-release.yml
vendored
@@ -1,92 +0,0 @@
|
||||
name: Run Gradle Release
|
||||
run-name: "Releasing Kestra ${{ github.event.inputs.releaseVersion }} 🚀"
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
releaseVersion:
|
||||
description: 'The release version (e.g., 0.21.0-rc1)'
|
||||
required: true
|
||||
type: string
|
||||
nextVersion:
|
||||
description: 'The next version (e.g., 0.22.0-SNAPSHOT)'
|
||||
required: true
|
||||
type: string
|
||||
env:
|
||||
RELEASE_VERSION: "${{ github.event.inputs.releaseVersion }}"
|
||||
NEXT_VERSION: "${{ github.event.inputs.nextVersion }}"
|
||||
jobs:
|
||||
release:
|
||||
name: Release Kestra
|
||||
runs-on: ubuntu-latest
|
||||
if: github.ref == 'refs/heads/develop'
|
||||
steps:
|
||||
# Checks
|
||||
- name: Check Inputs
|
||||
run: |
|
||||
if ! [[ "$RELEASE_VERSION" =~ ^[0-9]+(\.[0-9]+)\.0-rc[01](-SNAPSHOT)?$ ]]; then
|
||||
echo "Invalid release version. Must match regex: ^[0-9]+(\.[0-9]+)\.0-rc[01](-SNAPSHOT)?$"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if ! [[ "$NEXT_VERSION" =~ ^[0-9]+(\.[0-9]+)\.0-SNAPSHOT$ ]]; then
|
||||
echo "Invalid next version. Must match regex: ^[0-9]+(\.[0-9]+)\.0-SNAPSHOT$"
|
||||
exit 1;
|
||||
fi
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
path: kestra
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
ref: main
|
||||
|
||||
# Setup build
|
||||
- uses: ./actions/.github/actions/setup-build
|
||||
id: build
|
||||
with:
|
||||
java-enabled: true
|
||||
node-enabled: true
|
||||
python-enabled: true
|
||||
caches-enabled: true
|
||||
|
||||
- name: Configure Git
|
||||
run: |
|
||||
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
|
||||
git config --global user.name "github-actions[bot]"
|
||||
|
||||
# Execute
|
||||
- name: Run Gradle Release
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
# Extract the major and minor versions
|
||||
BASE_VERSION=$(echo "$RELEASE_VERSION" | sed -E 's/^([0-9]+\.[0-9]+)\..*/\1/')
|
||||
PUSH_RELEASE_BRANCH="releases/v${BASE_VERSION}.x"
|
||||
|
||||
cd kestra
|
||||
|
||||
# Create and push release branch
|
||||
git checkout -b "$PUSH_RELEASE_BRANCH";
|
||||
git push -u origin "$PUSH_RELEASE_BRANCH";
|
||||
|
||||
# Run gradle release
|
||||
git checkout develop;
|
||||
|
||||
if [[ "$RELEASE_VERSION" == *"-SNAPSHOT" ]]; then
|
||||
# -SNAPSHOT qualifier maybe used to test release-candidates
|
||||
./gradlew release -Prelease.useAutomaticVersion=true \
|
||||
-Prelease.releaseVersion="${RELEASE_VERSION}" \
|
||||
-Prelease.newVersion="${NEXT_VERSION}" \
|
||||
-Prelease.pushReleaseVersionBranch="${PUSH_RELEASE_BRANCH}" \
|
||||
-Prelease.failOnSnapshotDependencies=false
|
||||
else
|
||||
./gradlew release -Prelease.useAutomaticVersion=true \
|
||||
-Prelease.releaseVersion="${RELEASE_VERSION}" \
|
||||
-Prelease.newVersion="${NEXT_VERSION}" \
|
||||
-Prelease.pushReleaseVersionBranch="${PUSH_RELEASE_BRANCH}"
|
||||
fi
|
||||
86
.github/workflows/main-build.yml
vendored
Normal file
86
.github/workflows/main-build.yml
vendored
Normal file
@@ -0,0 +1,86 @@
|
||||
name: Main Workflow
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- releases/*
|
||||
- develop
|
||||
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
skip-test:
|
||||
description: 'Skip test'
|
||||
type: choice
|
||||
required: true
|
||||
default: 'false'
|
||||
options:
|
||||
- "true"
|
||||
- "false"
|
||||
|
||||
concurrency:
|
||||
group: ${{ github.workflow }}-${{ github.ref }}-main
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
backend-tests:
|
||||
name: Backend tests
|
||||
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
|
||||
uses: kestra-io/actions/.github/workflows/kestra-oss-backend-tests.yml@main
|
||||
secrets:
|
||||
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
|
||||
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
|
||||
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
|
||||
|
||||
frontend-tests:
|
||||
name: Frontend tests
|
||||
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
|
||||
uses: kestra-io/actions/.github/workflows/kestra-oss-frontend-tests.yml@main
|
||||
secrets:
|
||||
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
|
||||
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
publish-develop-docker:
|
||||
name: Publish Docker
|
||||
needs: [backend-tests, frontend-tests]
|
||||
if: "!failure() && !cancelled() && github.ref == 'refs/heads/develop'"
|
||||
uses: kestra-io/actions/.github/workflows/kestra-oss-publish-docker.yml@main
|
||||
with:
|
||||
plugin-version: 'LATEST-SNAPSHOT'
|
||||
secrets:
|
||||
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
|
||||
|
||||
publish-develop-maven:
|
||||
name: Publish develop Maven
|
||||
needs: [ backend-tests, frontend-tests ]
|
||||
if: "!failure() && !cancelled() && github.ref == 'refs/heads/develop'"
|
||||
uses: kestra-io/actions/.github/workflows/kestra-oss-publish-maven.yml@main
|
||||
secrets:
|
||||
SONATYPE_USER: ${{ secrets.SONATYPE_USER }}
|
||||
SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }}
|
||||
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
|
||||
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
|
||||
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
|
||||
|
||||
end:
|
||||
runs-on: ubuntu-latest
|
||||
needs: [publish-develop-docker, publish-develop-maven]
|
||||
if: always()
|
||||
steps:
|
||||
- name: Trigger EE Workflow
|
||||
uses: peter-evans/repository-dispatch@v3
|
||||
if: github.ref == 'refs/heads/develop' && needs.release.result == 'success'
|
||||
with:
|
||||
token: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
repository: kestra-io/kestra-ee
|
||||
event-type: "oss-updated"
|
||||
|
||||
# Slack
|
||||
- name: Slack - Notification
|
||||
if: ${{ failure() && env.SLACK_WEBHOOK_URL != 0 && (github.ref == 'refs/heads/master' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/develop') }}
|
||||
uses: kestra-io/actions/composite/slack-status@main
|
||||
with:
|
||||
webhook-url: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
73
.github/workflows/main.yml
vendored
73
.github/workflows/main.yml
vendored
@@ -1,73 +0,0 @@
|
||||
name: Main Workflow
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
default: 'LATEST'
|
||||
required: true
|
||||
type: string
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
- main
|
||||
- releases/*
|
||||
- develop
|
||||
tags:
|
||||
- v*
|
||||
|
||||
concurrency:
|
||||
group: ${{ github.workflow }}-${{ github.ref }}-main
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
tests:
|
||||
name: Execute tests
|
||||
uses: ./.github/workflows/workflow-test.yml
|
||||
with:
|
||||
report-status: false
|
||||
|
||||
release:
|
||||
name: Release
|
||||
needs: [tests]
|
||||
if: "!startsWith(github.ref, 'refs/heads/releases')"
|
||||
uses: ./.github/workflows/workflow-release.yml
|
||||
with:
|
||||
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
secrets:
|
||||
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
SONATYPE_USER: ${{ secrets.SONATYPE_USER }}
|
||||
SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }}
|
||||
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
|
||||
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
|
||||
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
|
||||
|
||||
end:
|
||||
runs-on: ubuntu-latest
|
||||
needs:
|
||||
- release
|
||||
if: always()
|
||||
env:
|
||||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
steps:
|
||||
- name: Trigger EE Workflow
|
||||
uses: peter-evans/repository-dispatch@v3
|
||||
if: github.ref == 'refs/heads/develop' && needs.release.result == 'success'
|
||||
with:
|
||||
token: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
repository: kestra-io/kestra-ee
|
||||
event-type: "oss-updated"
|
||||
|
||||
|
||||
# Slack
|
||||
- name: Slack - Notification
|
||||
uses: Gamesight/slack-workflow-status@master
|
||||
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
|
||||
with:
|
||||
repo_token: ${{ secrets.GITHUB_TOKEN }}
|
||||
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
name: GitHub Actions
|
||||
icon_emoji: ":github-actions:"
|
||||
channel: "C02DQ1A7JLR" # _int_git channel
|
||||
60
.github/workflows/pre-release.yml
vendored
Normal file
60
.github/workflows/pre-release.yml
vendored
Normal file
@@ -0,0 +1,60 @@
|
||||
name: Pre Release
|
||||
|
||||
on:
|
||||
push:
|
||||
tags:
|
||||
- 'v*'
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
skip-test:
|
||||
description: 'Skip test'
|
||||
type: choice
|
||||
required: true
|
||||
default: 'false'
|
||||
options:
|
||||
- "true"
|
||||
- "false"
|
||||
|
||||
jobs:
|
||||
build-artifacts:
|
||||
name: Build Artifacts
|
||||
uses: kestra-io/actions/.github/workflows/kestra-oss-build-artifacts.yml@main
|
||||
|
||||
backend-tests:
|
||||
name: Backend tests
|
||||
uses: kestra-io/actions/.github/workflows/kestra-oss-backend-tests.yml@main
|
||||
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
|
||||
secrets:
|
||||
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
|
||||
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
|
||||
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
|
||||
|
||||
frontend-tests:
|
||||
name: Frontend tests
|
||||
uses: kestra-io/actions/.github/workflows/kestra-oss-frontend-tests.yml@main
|
||||
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
|
||||
secrets:
|
||||
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
|
||||
|
||||
publish-maven:
|
||||
name: Publish Maven
|
||||
needs: [ backend-tests, frontend-tests ]
|
||||
if: "!failure() && !cancelled()"
|
||||
uses: kestra-io/actions/.github/workflows/kestra-oss-publish-maven.yml@main
|
||||
secrets:
|
||||
SONATYPE_USER: ${{ secrets.SONATYPE_USER }}
|
||||
SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }}
|
||||
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
|
||||
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
|
||||
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
|
||||
|
||||
publish-github:
|
||||
name: Github Release
|
||||
needs: [build-artifacts, backend-tests, frontend-tests]
|
||||
if: "!failure() && !cancelled()"
|
||||
uses: kestra-io/actions/.github/workflows/kestra-oss-publish-github.yml@main
|
||||
secrets:
|
||||
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
|
||||
16
.github/workflows/pull-request-cleanup.yml
vendored
Normal file
16
.github/workflows/pull-request-cleanup.yml
vendored
Normal file
@@ -0,0 +1,16 @@
|
||||
name: Pull Request - Delete Docker
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
types: [closed]
|
||||
# TODO import a reusable one
|
||||
jobs:
|
||||
publish:
|
||||
name: Pull Request - Delete Docker
|
||||
if: github.repository == 'kestra-io/kestra' # prevent running on forks
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: dataaxiom/ghcr-cleanup-action@v1
|
||||
with:
|
||||
package: kestra-pr
|
||||
delete-tags: ${{ github.event.pull_request.number }}
|
||||
33
.github/workflows/pull-request.yml
vendored
33
.github/workflows/pull-request.yml
vendored
@@ -2,17 +2,12 @@ name: Pull Request Workflow
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
branches:
|
||||
- develop
|
||||
|
||||
concurrency:
|
||||
group: ${{ github.workflow }}-${{ github.ref_name }}-pr
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
# ********************************************************************************************************************
|
||||
# File changes detection
|
||||
# ********************************************************************************************************************
|
||||
file-changes:
|
||||
if: ${{ github.event.pull_request.draft == false }}
|
||||
name: File changes detection
|
||||
@@ -33,14 +28,11 @@ jobs:
|
||||
- '!{ui,.github}/**'
|
||||
token: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
# ********************************************************************************************************************
|
||||
# Tests
|
||||
# ********************************************************************************************************************
|
||||
frontend:
|
||||
name: Frontend - Tests
|
||||
needs: [file-changes]
|
||||
if: "needs.file-changes.outputs.ui == 'true'"
|
||||
uses: ./.github/workflows/workflow-frontend-test.yml
|
||||
uses: kestra-io/actions/.github/workflows/kestra-oss-frontend-tests.yml@main
|
||||
secrets:
|
||||
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
|
||||
@@ -49,7 +41,7 @@ jobs:
|
||||
name: Backend - Tests
|
||||
needs: file-changes
|
||||
if: "needs.file-changes.outputs.backend == 'true'"
|
||||
uses: ./.github/workflows/workflow-backend-test.yml
|
||||
uses: kestra-io/actions/.github/workflows/kestra-oss-backend-tests.yml@main
|
||||
secrets:
|
||||
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
|
||||
@@ -58,21 +50,8 @@ jobs:
|
||||
|
||||
e2e-tests:
|
||||
name: E2E - Tests
|
||||
uses: ./.github/workflows/e2e.yml
|
||||
uses: kestra-io/actions/.github/workflows/kestra-oss-e2e-tests.yml@main
|
||||
|
||||
end:
|
||||
name: End
|
||||
runs-on: ubuntu-latest
|
||||
if: always()
|
||||
needs: [frontend, backend]
|
||||
steps:
|
||||
# Slack
|
||||
- name: Slack notification
|
||||
uses: Gamesight/slack-workflow-status@master
|
||||
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
|
||||
with:
|
||||
repo_token: ${{ secrets.GITHUB_TOKEN }}
|
||||
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
name: GitHub Actions
|
||||
icon_emoji: ":github-actions:"
|
||||
channel: "C02DQ1A7JLR"
|
||||
generate-pull-request-docker-image:
|
||||
name: Generate PR docker image
|
||||
uses: kestra-io/actions/.github/workflows/kestra-oss-pullrequest-publish-docker.yml@main
|
||||
|
||||
40
.github/workflows/release-docker.yml
vendored
Normal file
40
.github/workflows/release-docker.yml
vendored
Normal file
@@ -0,0 +1,40 @@
|
||||
name: Publish docker
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
retag-latest:
|
||||
description: 'Retag latest Docker images'
|
||||
required: true
|
||||
type: boolean
|
||||
default: false
|
||||
retag-lts:
|
||||
description: 'Retag LTS Docker images'
|
||||
required: true
|
||||
type: boolean
|
||||
default: false
|
||||
plugin-version:
|
||||
description: '(deprecated) Plugin version window for old Kestra releases using .plugins file (0.22 to 0.24). If omitted, then plugin list will be fetched from the API compatible versions endpoint'
|
||||
required: false
|
||||
type: string
|
||||
default: "[0.24,1.0)"
|
||||
dry-run:
|
||||
description: 'Dry run mode that will not write or release anything'
|
||||
required: true
|
||||
type: boolean
|
||||
default: false
|
||||
|
||||
jobs:
|
||||
publish-docker:
|
||||
name: Publish Docker
|
||||
if: startsWith(github.ref, 'refs/tags/v')
|
||||
uses: kestra-io/actions/.github/workflows/kestra-oss-publish-docker.yml@main
|
||||
with:
|
||||
plugin-version: ${{ inputs.plugin-version }}
|
||||
retag-latest: ${{ inputs.retag-latest }}
|
||||
retag-lts: ${{ inputs.retag-lts }}
|
||||
dry-run: ${{ inputs.dry-run }}
|
||||
secrets:
|
||||
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
60
.github/workflows/setversion-tag-plugins.yml
vendored
60
.github/workflows/setversion-tag-plugins.yml
vendored
@@ -1,60 +0,0 @@
|
||||
name: Set Version and Tag Plugins
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
releaseVersion:
|
||||
description: 'The release version (e.g., 0.21.0)'
|
||||
required: true
|
||||
type: string
|
||||
dryRun:
|
||||
description: 'Use DRY_RUN mode'
|
||||
required: false
|
||||
default: 'false'
|
||||
jobs:
|
||||
tag:
|
||||
name: Release plugins
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Get Plugins List
|
||||
- name: Get Plugins List
|
||||
uses: ./.github/actions/plugins-list
|
||||
id: plugins-list
|
||||
with:
|
||||
plugin-version: 'LATEST'
|
||||
|
||||
- name: 'Configure Git'
|
||||
run: |
|
||||
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
|
||||
git config --global user.name "github-actions[bot]"
|
||||
|
||||
# Execute
|
||||
- name: Set Version and Tag Plugins
|
||||
if: ${{ github.event.inputs.dryRun == 'false' }}
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
chmod +x ./dev-tools/setversion-tag-plugins.sh;
|
||||
|
||||
./dev-tools/setversion-tag-plugins.sh \
|
||||
--release-version=${{github.event.inputs.releaseVersion}} \
|
||||
--yes \
|
||||
${{ steps.plugins-list.outputs.repositories }}
|
||||
|
||||
- name: Set Version and Tag Plugins (DRY_RUN)
|
||||
if: ${{ github.event.inputs.dryRun == 'true' }}
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
chmod +x ./dev-tools/setversion-tag-plugins.sh;
|
||||
|
||||
./dev-tools/setversion-tag-plugins.sh \
|
||||
--release-version=${{github.event.inputs.releaseVersion}} \
|
||||
--dry-run \
|
||||
--yes \
|
||||
${{ steps.plugins-list.outputs.repositories }}
|
||||
57
.github/workflows/setversion-tag.yml
vendored
57
.github/workflows/setversion-tag.yml
vendored
@@ -1,57 +0,0 @@
|
||||
name: Set Version and Tag
|
||||
run-name: "Set version and Tag Kestra to ${{ github.event.inputs.releaseVersion }} 🚀"
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
releaseVersion:
|
||||
description: 'The release version (e.g., 0.21.1)'
|
||||
required: true
|
||||
type: string
|
||||
env:
|
||||
RELEASE_VERSION: "${{ github.event.inputs.releaseVersion }}"
|
||||
jobs:
|
||||
release:
|
||||
name: Release Kestra
|
||||
runs-on: ubuntu-latest
|
||||
if: startsWith(github.ref, 'refs/heads/releases/v')
|
||||
steps:
|
||||
# Checks
|
||||
- name: Check Inputs
|
||||
run: |
|
||||
if ! [[ "$RELEASE_VERSION" =~ ^[0-9]+(\.[0-9]+)(\.[0-9]+)(-rc[0-9])?(-SNAPSHOT)?$ ]]; then
|
||||
echo "Invalid release version. Must match regex: ^[0-9]+(\.[0-9]+)(\.[0-9]+)-(rc[0-9])?(-SNAPSHOT)?$"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Extract the major and minor versions
|
||||
BASE_VERSION=$(echo "$RELEASE_VERSION" | sed -E 's/^([0-9]+\.[0-9]+)\..*/\1/')
|
||||
RELEASE_BRANCH="refs/heads/releases/v${BASE_VERSION}.x"
|
||||
|
||||
CURRENT_BRANCH="$GITHUB_REF"
|
||||
if ! [[ "$CURRENT_BRANCH" == "$RELEASE_BRANCH" ]]; then
|
||||
echo "Invalid release branch. Expected $RELEASE_BRANCH, was $CURRENT_BRANCH"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Configure Git
|
||||
run: |
|
||||
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
|
||||
git config --global user.name "github-actions[bot]"
|
||||
|
||||
# Execute
|
||||
- name: Run Gradle Release
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
# Update version
|
||||
sed -i "s/^version=.*/version=$RELEASE_VERSION/" ./gradle.properties
|
||||
git add ./gradle.properties
|
||||
git commit -m"chore(version): update to version '$RELEASE_VERSION'"
|
||||
git push
|
||||
git tag -a "v$RELEASE_VERSION" -m"v$RELEASE_VERSION"
|
||||
git push --tags
|
||||
101
.github/workflows/vulnerabilities-check.yml
vendored
101
.github/workflows/vulnerabilities-check.yml
vendored
@@ -17,24 +17,16 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
ref: main
|
||||
|
||||
# Setup build
|
||||
- uses: ./actions/.github/actions/setup-build
|
||||
- uses: kestra-io/actions/composite/setup-build@main
|
||||
id: build
|
||||
with:
|
||||
java-enabled: true
|
||||
node-enabled: true
|
||||
caches-enabled: true
|
||||
|
||||
# Npm
|
||||
- name: Npm - Install
|
||||
@@ -56,92 +48,3 @@ jobs:
|
||||
with:
|
||||
name: dependency-check-report
|
||||
path: build/reports/dependency-check-report.html
|
||||
|
||||
develop-image-check:
|
||||
name: Image Check (develop)
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: read
|
||||
security-events: write
|
||||
actions: read
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
ref: main
|
||||
|
||||
# Setup build
|
||||
- uses: ./actions/.github/actions/setup-build
|
||||
id: build
|
||||
with:
|
||||
java-enabled: false
|
||||
node-enabled: false
|
||||
caches-enabled: true
|
||||
|
||||
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
|
||||
- name: Docker Vulnerabilities Check
|
||||
uses: aquasecurity/trivy-action@0.32.0
|
||||
with:
|
||||
image-ref: kestra/kestra:develop
|
||||
format: 'template'
|
||||
template: '@/contrib/sarif.tpl'
|
||||
severity: 'CRITICAL,HIGH'
|
||||
output: 'trivy-results.sarif'
|
||||
skip-dirs: /app/plugins
|
||||
|
||||
- name: Upload Trivy scan results to GitHub Security tab
|
||||
uses: github/codeql-action/upload-sarif@v3
|
||||
with:
|
||||
sarif_file: 'trivy-results.sarif'
|
||||
category: docker-
|
||||
|
||||
latest-image-check:
|
||||
name: Image Check (latest)
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: read
|
||||
security-events: write
|
||||
actions: read
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
ref: main
|
||||
|
||||
# Setup build
|
||||
- uses: ./actions/.github/actions/setup-build
|
||||
id: build
|
||||
with:
|
||||
java-enabled: false
|
||||
node-enabled: false
|
||||
caches-enabled: true
|
||||
|
||||
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
|
||||
- name: Docker Vulnerabilities Check
|
||||
uses: aquasecurity/trivy-action@0.32.0
|
||||
with:
|
||||
image-ref: kestra/kestra:latest
|
||||
format: table
|
||||
skip-dirs: /app/plugins
|
||||
scanners: vuln
|
||||
severity: 'CRITICAL,HIGH'
|
||||
output: 'trivy-results.sarif'
|
||||
|
||||
- name: Upload Trivy scan results to GitHub Security tab
|
||||
uses: github/codeql-action/upload-sarif@v3
|
||||
with:
|
||||
sarif_file: 'trivy-results.sarif'
|
||||
142
.github/workflows/workflow-backend-test.yml
vendored
142
.github/workflows/workflow-backend-test.yml
vendored
@@ -1,142 +0,0 @@
|
||||
name: Backend - Tests
|
||||
|
||||
on:
|
||||
workflow_call:
|
||||
secrets:
|
||||
GITHUB_AUTH_TOKEN:
|
||||
description: "The GitHub Token."
|
||||
required: true
|
||||
CODECOV_TOKEN:
|
||||
description: 'Codecov Token'
|
||||
required: true
|
||||
SONAR_TOKEN:
|
||||
description: 'Sonar Token'
|
||||
required: true
|
||||
GOOGLE_SERVICE_ACCOUNT:
|
||||
description: 'Google Service Account'
|
||||
required: true
|
||||
|
||||
permissions:
|
||||
contents: write
|
||||
checks: write
|
||||
actions: read
|
||||
|
||||
jobs:
|
||||
test:
|
||||
name: Backend - Tests
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
|
||||
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
name: Checkout - Current ref
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Setup build
|
||||
- uses: kestra-io/actions/.github/actions/setup-build@main
|
||||
name: Setup - Build
|
||||
id: build
|
||||
with:
|
||||
java-enabled: true
|
||||
node-enabled: true
|
||||
python-enabled: true
|
||||
|
||||
# Services
|
||||
- name: Setup - Start docker compose
|
||||
shell: bash
|
||||
run: docker compose -f docker-compose-ci.yml up -d
|
||||
|
||||
# Gradle check
|
||||
- name: Gradle - Build
|
||||
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
|
||||
env:
|
||||
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
|
||||
shell: bash
|
||||
run: |
|
||||
echo $GOOGLE_SERVICE_ACCOUNT | base64 -d > ~/.gcp-service-account.json
|
||||
export GOOGLE_APPLICATION_CREDENTIALS=$HOME/.gcp-service-account.json
|
||||
./gradlew check javadoc --parallel
|
||||
|
||||
# report test
|
||||
- name: Test - Publish Test Results
|
||||
uses: dorny/test-reporter@v2
|
||||
if: always()
|
||||
with:
|
||||
name: Java Tests Report
|
||||
reporter: java-junit
|
||||
path: '**/build/test-results/test/TEST-*.xml'
|
||||
list-suites: 'failed'
|
||||
list-tests: 'failed'
|
||||
fail-on-error: 'false'
|
||||
token: ${{ secrets.GITHUB_AUTH_TOKEN }}
|
||||
|
||||
# Sonar
|
||||
- name: Test - Analyze with Sonar
|
||||
if: env.SONAR_TOKEN != ''
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_AUTH_TOKEN }}
|
||||
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
|
||||
shell: bash
|
||||
run: ./gradlew sonar --info
|
||||
|
||||
# GCP
|
||||
- name: GCP - Auth with unit test account
|
||||
id: auth
|
||||
if: always() && env.GOOGLE_SERVICE_ACCOUNT != ''
|
||||
continue-on-error: true
|
||||
uses: "google-github-actions/auth@v2"
|
||||
with:
|
||||
credentials_json: "${{ secrets.GOOGLE_SERVICE_ACCOUNT }}"
|
||||
|
||||
- name: GCP - Setup Cloud SDK
|
||||
if: env.GOOGLE_SERVICE_ACCOUNT != ''
|
||||
uses: "google-github-actions/setup-gcloud@v2"
|
||||
|
||||
# Allure check
|
||||
- uses: rlespinasse/github-slug-action@v5
|
||||
name: Allure - Generate slug variables
|
||||
|
||||
- name: Allure - Publish report
|
||||
uses: andrcuns/allure-publish-action@v2.9.0
|
||||
if: always() && env.GOOGLE_SERVICE_ACCOUNT != ''
|
||||
continue-on-error: true
|
||||
env:
|
||||
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_AUTH_TOKEN }}
|
||||
JAVA_HOME: /usr/lib/jvm/default-jvm/
|
||||
with:
|
||||
storageType: gcs
|
||||
resultsGlob: "**/build/allure-results"
|
||||
bucket: internal-kestra-host
|
||||
baseUrl: "https://internal.dev.kestra.io"
|
||||
prefix: ${{ format('{0}/{1}', github.repository, 'allure/java') }}
|
||||
copyLatest: true
|
||||
ignoreMissingResults: true
|
||||
|
||||
# Jacoco
|
||||
- name: Jacoco - Copy reports
|
||||
if: env.GOOGLE_SERVICE_ACCOUNT != ''
|
||||
continue-on-error: true
|
||||
shell: bash
|
||||
run: |
|
||||
mv build/reports/jacoco/testCodeCoverageReport build/reports/jacoco/test/
|
||||
mv build/reports/jacoco/test/testCodeCoverageReport.xml build/reports/jacoco/test/jacocoTestReport.xml
|
||||
gsutil -m rsync -d -r build/reports/jacoco/test/ gs://internal-kestra-host/${{ format('{0}/{1}', github.repository, 'jacoco') }}
|
||||
|
||||
# Codecov
|
||||
- name: Codecov - Upload coverage reports
|
||||
uses: codecov/codecov-action@v5
|
||||
if: ${{ !cancelled() }}
|
||||
continue-on-error: true
|
||||
with:
|
||||
token: ${{ secrets.CODECOV_TOKEN }}
|
||||
flags: backend
|
||||
|
||||
- name: Codecov - Upload test results
|
||||
uses: codecov/test-results-action@v1
|
||||
if: ${{ !cancelled() }}
|
||||
continue-on-error: true
|
||||
with:
|
||||
token: ${{ secrets.CODECOV_TOKEN }}
|
||||
flags: backend
|
||||
152
.github/workflows/workflow-build-artifacts.yml
vendored
152
.github/workflows/workflow-build-artifacts.yml
vendored
@@ -1,152 +0,0 @@
|
||||
name: Build Artifacts
|
||||
|
||||
on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
default: 'LATEST'
|
||||
required: true
|
||||
type: string
|
||||
outputs:
|
||||
docker-tag:
|
||||
value: ${{ jobs.build.outputs.docker-tag }}
|
||||
description: "The Docker image Tag for Kestra"
|
||||
docker-artifact-name:
|
||||
value: ${{ jobs.build.outputs.docker-artifact-name }}
|
||||
description: "The GitHub artifact containing the Kestra docker image name."
|
||||
plugins:
|
||||
value: ${{ jobs.build.outputs.plugins }}
|
||||
description: "The Kestra plugins list used for the build."
|
||||
|
||||
jobs:
|
||||
build:
|
||||
name: Build - Artifacts
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
docker-tag: ${{ steps.vars.outputs.tag }}
|
||||
docker-artifact-name: ${{ steps.vars.outputs.artifact }}
|
||||
plugins: ${{ steps.plugins.outputs.plugins }}
|
||||
env:
|
||||
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
steps:
|
||||
- name: Checkout - Current ref
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Npm
|
||||
- name: Setup - Npm install
|
||||
shell: bash
|
||||
working-directory: ui
|
||||
run: npm ci
|
||||
|
||||
# Setup build
|
||||
- uses: kestra-io/actions/.github/actions/setup-build@main
|
||||
name: Setup - Build
|
||||
id: build
|
||||
with:
|
||||
java-enabled: true
|
||||
node-enabled: true
|
||||
|
||||
# Get Plugins List
|
||||
- name: Plugins - Get List
|
||||
uses: ./.github/actions/plugins-list
|
||||
if: "!startsWith(github.ref, 'refs/tags/v')"
|
||||
id: plugins-list
|
||||
with:
|
||||
plugin-version: ${{ env.PLUGIN_VERSION }}
|
||||
|
||||
# Set Plugins List
|
||||
- name: Plugins - Set List
|
||||
id: plugins
|
||||
if: "!startsWith(github.ref, 'refs/tags/v')"
|
||||
shell: bash
|
||||
run: |
|
||||
PLUGINS="${{ steps.plugins-list.outputs.plugins }}"
|
||||
TAG=${GITHUB_REF#refs/*/}
|
||||
if [[ $TAG = "master" || $TAG == v* ]]; then
|
||||
echo "plugins=$PLUGINS" >> $GITHUB_OUTPUT
|
||||
else
|
||||
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ $PLUGINS" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
# Build
|
||||
- name: Gradle - Build
|
||||
shell: bash
|
||||
run: |
|
||||
./gradlew executableJar
|
||||
|
||||
- name: Artifacts - Copy exe to image
|
||||
shell: bash
|
||||
run: |
|
||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||
|
||||
# Docker Tag
|
||||
- name: Setup - Docker vars
|
||||
id: vars
|
||||
shell: bash
|
||||
run: |
|
||||
TAG=${GITHUB_REF#refs/*/}
|
||||
if [[ $TAG = "master" ]]
|
||||
then
|
||||
TAG="latest";
|
||||
elif [[ $TAG = "develop" ]]
|
||||
then
|
||||
TAG="develop";
|
||||
elif [[ $TAG = v* ]]
|
||||
then
|
||||
TAG="${TAG}";
|
||||
else
|
||||
TAG="build-${{ github.run_id }}";
|
||||
fi
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
echo "artifact=docker-kestra-${TAG}" >> $GITHUB_OUTPUT
|
||||
|
||||
# Docker setup
|
||||
- name: Docker - Setup QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Docker - Fix Qemu
|
||||
shell: bash
|
||||
run: |
|
||||
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
|
||||
|
||||
- name: Docker - Setup Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
# Docker Build
|
||||
- name: Docker - Build & export image
|
||||
uses: docker/build-push-action@v6
|
||||
if: "!startsWith(github.ref, 'refs/tags/v')"
|
||||
with:
|
||||
context: .
|
||||
push: false
|
||||
file: Dockerfile
|
||||
tags: |
|
||||
kestra/kestra:${{ steps.vars.outputs.tag }}
|
||||
build-args: |
|
||||
KESTRA_PLUGINS=${{ steps.plugins.outputs.plugins }}
|
||||
APT_PACKAGES=${{ env.DOCKER_APT_PACKAGES }}
|
||||
PYTHON_LIBRARIES=${{ env.DOCKER_PYTHON_LIBRARIES }}
|
||||
outputs: type=docker,dest=/tmp/${{ steps.vars.outputs.artifact }}.tar
|
||||
|
||||
# Upload artifacts
|
||||
- name: Artifacts - Upload JAR
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: jar
|
||||
path: build/libs/
|
||||
|
||||
- name: Artifacts - Upload Executable
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: exe
|
||||
path: build/executable/
|
||||
|
||||
- name: Artifacts - Upload Docker
|
||||
uses: actions/upload-artifact@v4
|
||||
if: "!startsWith(github.ref, 'refs/tags/v')"
|
||||
with:
|
||||
name: ${{ steps.vars.outputs.artifact }}
|
||||
path: /tmp/${{ steps.vars.outputs.artifact }}.tar
|
||||
70
.github/workflows/workflow-frontend-test.yml
vendored
70
.github/workflows/workflow-frontend-test.yml
vendored
@@ -1,70 +0,0 @@
|
||||
name: Frontend - Tests
|
||||
|
||||
on:
|
||||
workflow_call:
|
||||
secrets:
|
||||
GITHUB_AUTH_TOKEN:
|
||||
description: "The GitHub Token."
|
||||
required: true
|
||||
CODECOV_TOKEN:
|
||||
description: 'Codecov Token'
|
||||
required: true
|
||||
|
||||
env:
|
||||
# to save corepack from itself
|
||||
COREPACK_INTEGRITY_KEYS: 0
|
||||
|
||||
jobs:
|
||||
test:
|
||||
name: Frontend - Tests
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Cache Node Modules
|
||||
id: cache-node-modules
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: |
|
||||
ui/node_modules
|
||||
key: modules-${{ hashFiles('ui/package-lock.json') }}
|
||||
|
||||
- name: Cache Playwright Binaries
|
||||
id: cache-playwright
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: |
|
||||
~/.cache/ms-playwright
|
||||
key: playwright-${{ hashFiles('ui/package-lock.json') }}
|
||||
|
||||
- name: Npm - install
|
||||
if: steps.cache-node-modules.outputs.cache-hit != 'true'
|
||||
working-directory: ui
|
||||
run: npm ci
|
||||
|
||||
- name: Npm - lint
|
||||
uses: reviewdog/action-eslint@v1
|
||||
with:
|
||||
github_token: ${{ secrets.GITHUB_AUTH_TOKEN }}
|
||||
reporter: github-pr-review
|
||||
workdir: ui
|
||||
|
||||
- name: Npm - Run build
|
||||
working-directory: ui
|
||||
env:
|
||||
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
|
||||
run: npm run build
|
||||
|
||||
- name: Run front-end unit tests
|
||||
working-directory: ui
|
||||
run: npm run test:unit -- --coverage
|
||||
|
||||
- name: Storybook - Install Playwright
|
||||
working-directory: ui
|
||||
if: steps.cache-playwright.outputs.cache-hit != 'true'
|
||||
run: npx playwright install --with-deps
|
||||
|
||||
- name: Run storybook component tests
|
||||
working-directory: ui
|
||||
run: npm run test:storybook -- --coverage
|
||||
78
.github/workflows/workflow-github-release.yml
vendored
78
.github/workflows/workflow-github-release.yml
vendored
@@ -1,78 +0,0 @@
|
||||
name: Github - Release
|
||||
|
||||
on:
|
||||
workflow_call:
|
||||
secrets:
|
||||
GH_PERSONAL_TOKEN:
|
||||
description: "The Github personal token."
|
||||
required: true
|
||||
push:
|
||||
tags:
|
||||
- '*'
|
||||
|
||||
jobs:
|
||||
publish:
|
||||
name: Github - Release
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
# Check out
|
||||
- name: Checkout - Repository
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
submodules: true
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- name: Checkout - Actions
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
sparse-checkout-cone-mode: true
|
||||
path: actions
|
||||
sparse-checkout: |
|
||||
.github/actions
|
||||
|
||||
# Download Exec
|
||||
# Must be done after checkout actions
|
||||
- name: Artifacts - Download executable
|
||||
uses: actions/download-artifact@v4
|
||||
if: startsWith(github.ref, 'refs/tags/v')
|
||||
with:
|
||||
name: exe
|
||||
path: build/executable
|
||||
|
||||
- name: Check if current tag is latest
|
||||
id: is_latest
|
||||
run: |
|
||||
latest_tag=$(git tag | grep -E '^v[0-9]+\.[0-9]+\.[0-9]+$' | sed 's/^v//' | sort -V | tail -n1)
|
||||
current_tag="${GITHUB_REF_NAME#v}"
|
||||
if [ "$current_tag" = "$latest_tag" ]; then
|
||||
echo "latest=true" >> $GITHUB_OUTPUT
|
||||
else
|
||||
echo "latest=false" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
env:
|
||||
GITHUB_REF_NAME: ${{ github.ref_name }}
|
||||
|
||||
# GitHub Release
|
||||
- name: Create GitHub release
|
||||
uses: ./actions/.github/actions/github-release
|
||||
if: ${{ startsWith(github.ref, 'refs/tags/v') }}
|
||||
env:
|
||||
MAKE_LATEST: ${{ steps.is_latest.outputs.latest }}
|
||||
GITHUB_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
|
||||
|
||||
# Trigger gha workflow to bump helm chart version
|
||||
- name: GitHub - Trigger the Helm chart version bump
|
||||
uses: peter-evans/repository-dispatch@v3
|
||||
with:
|
||||
token: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
repository: kestra-io/helm-charts
|
||||
event-type: update-helm-chart-version
|
||||
client-payload: |-
|
||||
{
|
||||
"new_version": "${{ github.ref_name }}",
|
||||
"github_repository": "${{ github.repository }}",
|
||||
"github_actor": "${{ github.actor }}"
|
||||
}
|
||||
146
.github/workflows/workflow-publish-docker.yml
vendored
146
.github/workflows/workflow-publish-docker.yml
vendored
@@ -1,146 +0,0 @@
|
||||
name: Publish - Docker
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
default: 'LATEST'
|
||||
required: false
|
||||
type: string
|
||||
force-download-artifact:
|
||||
description: 'Force download artifact'
|
||||
required: false
|
||||
type: string
|
||||
default: "true"
|
||||
workflow_call:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
default: 'LATEST'
|
||||
required: false
|
||||
type: string
|
||||
force-download-artifact:
|
||||
description: 'Force download artifact'
|
||||
required: false
|
||||
type: string
|
||||
default: "true"
|
||||
secrets:
|
||||
DOCKERHUB_USERNAME:
|
||||
description: "The Dockerhub username."
|
||||
required: true
|
||||
DOCKERHUB_PASSWORD:
|
||||
description: "The Dockerhub password."
|
||||
required: true
|
||||
|
||||
jobs:
|
||||
# ********************************************************************************************************************
|
||||
# Build
|
||||
# ********************************************************************************************************************
|
||||
build-artifacts:
|
||||
name: Build Artifacts
|
||||
if: ${{ github.event.inputs.force-download-artifact == 'true' }}
|
||||
uses: ./.github/workflows/workflow-build-artifacts.yml
|
||||
with:
|
||||
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
# ********************************************************************************************************************
|
||||
# Docker
|
||||
# ********************************************************************************************************************
|
||||
publish:
|
||||
name: Publish - Docker
|
||||
runs-on: ubuntu-latest
|
||||
needs: build-artifacts
|
||||
if: |
|
||||
always() &&
|
||||
(needs.build-artifacts.result == 'success' ||
|
||||
github.event.inputs.force-download-artifact != 'true')
|
||||
env:
|
||||
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
strategy:
|
||||
matrix:
|
||||
image:
|
||||
- tag: -no-plugins
|
||||
packages: jattach
|
||||
plugins: false
|
||||
python-libraries: ""
|
||||
|
||||
- tag: ""
|
||||
plugins: true
|
||||
packages: python3 python3-venv python-is-python3 python3-pip nodejs npm curl zip unzip jattach
|
||||
python-libraries: kestra
|
||||
steps:
|
||||
- name: Checkout - Current ref
|
||||
uses: actions/checkout@v4
|
||||
|
||||
# Docker setup
|
||||
- name: Docker - Setup QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Docker - Fix Qemu
|
||||
shell: bash
|
||||
run: |
|
||||
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
|
||||
|
||||
- name: Docker - Setup Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
# Docker Login
|
||||
- name: Docker - Login to DockerHub
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
username: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
|
||||
|
||||
# # Get Plugins List
|
||||
- name: Plugins - Get List
|
||||
uses: ./.github/actions/plugins-list
|
||||
id: plugins-list
|
||||
if: ${{ matrix.image.plugins}}
|
||||
with:
|
||||
plugin-version: ${{ env.PLUGIN_VERSION }}
|
||||
|
||||
# Vars
|
||||
- name: Docker - Set variables
|
||||
shell: bash
|
||||
id: vars
|
||||
run: |
|
||||
TAG=${GITHUB_REF#refs/*/}
|
||||
PLUGINS="${{ matrix.image.plugins == true && steps.plugins-list.outputs.plugins || '' }}"
|
||||
if [[ $TAG == v* ]]; then
|
||||
TAG="${TAG}";
|
||||
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
|
||||
elif [[ $TAG = "develop" ]]; then
|
||||
TAG="develop";
|
||||
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ $PLUGINS" >> $GITHUB_OUTPUT
|
||||
else
|
||||
TAG="build-${{ github.run_id }}";
|
||||
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ $PLUGINS" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
echo "tag=${TAG}${{ matrix.image.tag }}" >> $GITHUB_OUTPUT
|
||||
|
||||
# Build Docker Image
|
||||
- name: Artifacts - Download executable
|
||||
uses: actions/download-artifact@v4
|
||||
with:
|
||||
name: exe
|
||||
path: build/executable
|
||||
|
||||
- name: Docker - Copy exe to image
|
||||
shell: bash
|
||||
run: |
|
||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||
|
||||
# Docker Build and push
|
||||
- name: Docker - Build image
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
context: .
|
||||
push: true
|
||||
tags: kestra/kestra:${{ steps.vars.outputs.tag }}
|
||||
platforms: linux/amd64,linux/arm64
|
||||
build-args: |
|
||||
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }}
|
||||
APT_PACKAGES=${{ matrix.image.packages }}
|
||||
PYTHON_LIBRARIES=${{ matrix.image.python-libraries }}
|
||||
57
.github/workflows/workflow-publish-maven.yml
vendored
57
.github/workflows/workflow-publish-maven.yml
vendored
@@ -1,57 +0,0 @@
|
||||
name: Publish - Maven
|
||||
|
||||
on:
|
||||
workflow_call:
|
||||
secrets:
|
||||
SONATYPE_USER:
|
||||
description: "The Sonatype username."
|
||||
required: true
|
||||
SONATYPE_PASSWORD:
|
||||
description: "The Sonatype password."
|
||||
required: true
|
||||
SONATYPE_GPG_KEYID:
|
||||
description: "The Sonatype GPG key id."
|
||||
required: true
|
||||
SONATYPE_GPG_PASSWORD:
|
||||
description: "The Sonatype GPG password."
|
||||
required: true
|
||||
SONATYPE_GPG_FILE:
|
||||
description: "The Sonatype GPG file."
|
||||
required: true
|
||||
|
||||
jobs:
|
||||
publish:
|
||||
name: Publish - Maven
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout - Current ref
|
||||
uses: actions/checkout@v4
|
||||
|
||||
# Setup build
|
||||
- name: Setup - Build
|
||||
uses: kestra-io/actions/.github/actions/setup-build@main
|
||||
id: build
|
||||
with:
|
||||
java-enabled: true
|
||||
node-enabled: true
|
||||
|
||||
# Publish
|
||||
- name: Publish - Release package to Maven Central
|
||||
shell: bash
|
||||
env:
|
||||
ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.SONATYPE_USER }}
|
||||
ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }}
|
||||
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
|
||||
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
|
||||
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE}}
|
||||
run: |
|
||||
mkdir -p ~/.gradle/
|
||||
echo "signing.keyId=${SONATYPE_GPG_KEYID}" > ~/.gradle/gradle.properties
|
||||
echo "signing.password=${SONATYPE_GPG_PASSWORD}" >> ~/.gradle/gradle.properties
|
||||
echo "signing.secretKeyRingFile=${HOME}/.gradle/secring.gpg" >> ~/.gradle/gradle.properties
|
||||
echo ${SONATYPE_GPG_FILE} | base64 -d > ~/.gradle/secring.gpg
|
||||
./gradlew publishToMavenCentral
|
||||
|
||||
# Gradle dependency
|
||||
- name: Java - Gradle dependency graph
|
||||
uses: gradle/actions/dependency-submission@v4
|
||||
80
.github/workflows/workflow-release.yml
vendored
80
.github/workflows/workflow-release.yml
vendored
@@ -1,80 +0,0 @@
|
||||
name: Release
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
default: 'LATEST'
|
||||
required: false
|
||||
type: string
|
||||
publish-docker:
|
||||
description: "Publish Docker image"
|
||||
default: 'false'
|
||||
required: false
|
||||
type: string
|
||||
workflow_call:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
default: 'LATEST'
|
||||
required: false
|
||||
type: string
|
||||
secrets:
|
||||
DOCKERHUB_USERNAME:
|
||||
description: "The Dockerhub username."
|
||||
required: true
|
||||
DOCKERHUB_PASSWORD:
|
||||
description: "The Dockerhub password."
|
||||
required: true
|
||||
SONATYPE_USER:
|
||||
description: "The Sonatype username."
|
||||
required: true
|
||||
SONATYPE_PASSWORD:
|
||||
description: "The Sonatype password."
|
||||
required: true
|
||||
SONATYPE_GPG_KEYID:
|
||||
description: "The Sonatype GPG key id."
|
||||
required: true
|
||||
SONATYPE_GPG_PASSWORD:
|
||||
description: "The Sonatype GPG password."
|
||||
required: true
|
||||
SONATYPE_GPG_FILE:
|
||||
description: "The Sonatype GPG file."
|
||||
required: true
|
||||
jobs:
|
||||
build-artifacts:
|
||||
name: Build - Artifacts
|
||||
uses: ./.github/workflows/workflow-build-artifacts.yml
|
||||
with:
|
||||
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
|
||||
Docker:
|
||||
name: Publish Docker
|
||||
needs: build-artifacts
|
||||
uses: ./.github/workflows/workflow-publish-docker.yml
|
||||
if: startsWith(github.ref, 'refs/heads/develop') || github.event.inputs.publish-docker == 'true'
|
||||
with:
|
||||
force-download-artifact: 'false'
|
||||
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
secrets:
|
||||
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
|
||||
Maven:
|
||||
name: Publish Maven
|
||||
uses: ./.github/workflows/workflow-publish-maven.yml
|
||||
secrets:
|
||||
SONATYPE_USER: ${{ secrets.SONATYPE_USER }}
|
||||
SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }}
|
||||
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
|
||||
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
|
||||
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
|
||||
|
||||
Github:
|
||||
name: Github Release
|
||||
needs: build-artifacts
|
||||
if: startsWith(github.ref, 'refs/tags/v')
|
||||
uses: ./.github/workflows/workflow-github-release.yml
|
||||
secrets:
|
||||
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
97
.github/workflows/workflow-test.yml
vendored
97
.github/workflows/workflow-test.yml
vendored
@@ -1,97 +0,0 @@
|
||||
name: Tests
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: '0 4 * * 1,2,3,4,5'
|
||||
workflow_call:
|
||||
inputs:
|
||||
report-status:
|
||||
description: "Report status of the jobs in outputs"
|
||||
type: string
|
||||
required: false
|
||||
default: false
|
||||
outputs:
|
||||
frontend_status:
|
||||
description: "Status of the frontend job"
|
||||
value: ${{ jobs.set-frontend-status.outputs.frontend_status }}
|
||||
backend_status:
|
||||
description: "Status of the backend job"
|
||||
value: ${{ jobs.set-backend-status.outputs.backend_status }}
|
||||
|
||||
jobs:
|
||||
file-changes:
|
||||
name: File changes detection
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 60
|
||||
outputs:
|
||||
ui: ${{ steps.changes.outputs.ui }}
|
||||
backend: ${{ steps.changes.outputs.backend }}
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
if: "!startsWith(github.ref, 'refs/tags/v')"
|
||||
- uses: dorny/paths-filter@v3
|
||||
if: "!startsWith(github.ref, 'refs/tags/v')"
|
||||
id: changes
|
||||
with:
|
||||
filters: |
|
||||
ui:
|
||||
- 'ui/**'
|
||||
backend:
|
||||
- '!{ui,.github}/**'
|
||||
token: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
frontend:
|
||||
name: Frontend - Tests
|
||||
needs: file-changes
|
||||
if: "needs.file-changes.outputs.ui == 'true' || startsWith(github.ref, 'refs/tags/v')"
|
||||
uses: ./.github/workflows/workflow-frontend-test.yml
|
||||
secrets:
|
||||
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
|
||||
|
||||
|
||||
backend:
|
||||
name: Backend - Tests
|
||||
needs: file-changes
|
||||
if: "needs.file-changes.outputs.backend == 'true' || startsWith(github.ref, 'refs/tags/v')"
|
||||
uses: ./.github/workflows/workflow-backend-test.yml
|
||||
secrets:
|
||||
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
|
||||
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
|
||||
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
|
||||
|
||||
# Output every job status
|
||||
# To be used in other workflows
|
||||
report-status:
|
||||
name: Report Status
|
||||
runs-on: ubuntu-latest
|
||||
needs: [ frontend, backend ]
|
||||
if: always() && (inputs.report-status == 'true')
|
||||
outputs:
|
||||
frontend_status: ${{ steps.set-frontend-status.outputs.frontend_status }}
|
||||
backend_status: ${{ steps.set-backend-status.outputs.backend_status }}
|
||||
steps:
|
||||
- id: set-frontend-status
|
||||
name: Set frontend job status
|
||||
run: echo "::set-output name=frontend_status::${{ needs.frontend.result }}"
|
||||
|
||||
- id: set-backend-status
|
||||
name: Set backend job status
|
||||
run: echo "::set-output name=backend_status::${{ needs.backend.result }}"
|
||||
|
||||
notify:
|
||||
name: Notify - Slack
|
||||
runs-on: ubuntu-latest
|
||||
needs: [ frontend, backend ]
|
||||
if: github.event_name == 'schedule'
|
||||
steps:
|
||||
- name: Notify failed CI
|
||||
id: send-ci-failed
|
||||
if: |
|
||||
always() && (needs.frontend.result != 'success' ||
|
||||
needs.backend.result != 'success')
|
||||
uses: kestra-io/actions/.github/actions/send-ci-failed@main
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
5
.plugins
5
.plugins
@@ -87,13 +87,18 @@
|
||||
#plugin-powerbi:io.kestra.plugin:plugin-powerbi:LATEST
|
||||
#plugin-pulsar:io.kestra.plugin:plugin-pulsar:LATEST
|
||||
#plugin-redis:io.kestra.plugin:plugin-redis:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-bun:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-deno:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-go:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-groovy:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-jbang:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-julia:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-jython:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-lua:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-nashorn:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-node:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-perl:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-php:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-powershell:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-python:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-r:LATEST
|
||||
|
||||
8
Dockerfile.pr
Normal file
8
Dockerfile.pr
Normal file
@@ -0,0 +1,8 @@
|
||||
ARG KESTRA_DOCKER_BASE_VERSION=develop
|
||||
FROM kestra/kestra:$KESTRA_DOCKER_BASE_VERSION
|
||||
|
||||
USER root
|
||||
|
||||
COPY --chown=kestra:kestra docker /
|
||||
|
||||
USER kestra
|
||||
62
build.gradle
62
build.gradle
@@ -205,23 +205,59 @@ subprojects {
|
||||
testImplementation 'org.assertj:assertj-core'
|
||||
}
|
||||
|
||||
test {
|
||||
useJUnitPlatform()
|
||||
|
||||
def commonTestConfig = { Test t ->
|
||||
// set Xmx for test workers
|
||||
maxHeapSize = '4g'
|
||||
t.maxHeapSize = '4g'
|
||||
|
||||
// configure en_US default locale for tests
|
||||
systemProperty 'user.language', 'en'
|
||||
systemProperty 'user.country', 'US'
|
||||
t.systemProperty 'user.language', 'en'
|
||||
t.systemProperty 'user.country', 'US'
|
||||
|
||||
environment 'SECRET_MY_SECRET', "{\"secretKey\":\"secretValue\"}".bytes.encodeBase64().toString()
|
||||
environment 'SECRET_NEW_LINE', "cGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2\nZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZl\neXJsb25n"
|
||||
environment 'SECRET_WEBHOOK_KEY', "secretKey".bytes.encodeBase64().toString()
|
||||
environment 'SECRET_NON_B64_SECRET', "some secret value"
|
||||
environment 'SECRET_PASSWORD', "cGFzc3dvcmQ="
|
||||
environment 'ENV_TEST1', "true"
|
||||
environment 'ENV_TEST2', "Pass by env"
|
||||
t.environment 'SECRET_MY_SECRET', "{\"secretKey\":\"secretValue\"}".bytes.encodeBase64().toString()
|
||||
t.environment 'SECRET_NEW_LINE', "cGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2\nZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZl\neXJsb25n"
|
||||
t.environment 'SECRET_WEBHOOK_KEY', "secretKey".bytes.encodeBase64().toString()
|
||||
t.environment 'SECRET_NON_B64_SECRET', "some secret value"
|
||||
t.environment 'SECRET_PASSWORD', "cGFzc3dvcmQ="
|
||||
t.environment 'ENV_TEST1', "true"
|
||||
t.environment 'ENV_TEST2', "Pass by env"
|
||||
|
||||
}
|
||||
|
||||
tasks.register('flakyTest', Test) { Test t ->
|
||||
group = 'verification'
|
||||
description = 'Runs tests tagged @Flaky but does not fail the build.'
|
||||
|
||||
useJUnitPlatform {
|
||||
includeTags 'flaky'
|
||||
}
|
||||
ignoreFailures = true
|
||||
|
||||
reports {
|
||||
junitXml.required = true
|
||||
junitXml.outputPerTestCase = true
|
||||
junitXml.mergeReruns = true
|
||||
junitXml.includeSystemErrLog = true
|
||||
junitXml.outputLocation = layout.buildDirectory.dir("test-results/flakyTest")
|
||||
}
|
||||
commonTestConfig(t)
|
||||
|
||||
}
|
||||
|
||||
test {
|
||||
useJUnitPlatform {
|
||||
excludeTags 'flaky'
|
||||
}
|
||||
reports {
|
||||
junitXml.required = true
|
||||
junitXml.outputPerTestCase = true
|
||||
junitXml.mergeReruns = true
|
||||
junitXml.includeSystemErrLog = true
|
||||
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
|
||||
}
|
||||
commonTestConfig(it)
|
||||
|
||||
|
||||
finalizedBy(tasks.named('flakyTest'))
|
||||
}
|
||||
|
||||
testlogger {
|
||||
|
||||
@@ -4,10 +4,13 @@ import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.runners.IndexerInterface;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.services.SkipExecutionService;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
import picocli.CommandLine;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@CommandLine.Command(
|
||||
@@ -17,6 +20,11 @@ import java.util.Map;
|
||||
public class IndexerCommand extends AbstractServerCommand {
|
||||
@Inject
|
||||
private ApplicationContext applicationContext;
|
||||
@Inject
|
||||
private SkipExecutionService skipExecutionService;
|
||||
|
||||
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting purpose only")
|
||||
private List<String> skipIndexerRecords = Collections.emptyList();
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static Map<String, Object> propertiesOverrides() {
|
||||
@@ -27,6 +35,8 @@ public class IndexerCommand extends AbstractServerCommand {
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
this.skipExecutionService.setSkipIndexerRecords(skipIndexerRecords);
|
||||
|
||||
super.call();
|
||||
|
||||
IndexerInterface indexer = applicationContext.getBean(IndexerInterface.class);
|
||||
|
||||
@@ -63,6 +63,9 @@ public class StandAloneCommand extends AbstractServerCommand {
|
||||
@CommandLine.Option(names = {"--skip-tenants"}, split=",", description = "a list of tenants to skip, separated by a coma; for troubleshooting purpose only")
|
||||
private List<String> skipTenants = Collections.emptyList();
|
||||
|
||||
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting purpose only")
|
||||
private List<String> skipIndexerRecords = Collections.emptyList();
|
||||
|
||||
@CommandLine.Option(names = {"--no-tutorials"}, description = "Flag to disable auto-loading of tutorial flows.")
|
||||
boolean tutorialsDisabled = false;
|
||||
|
||||
@@ -93,6 +96,7 @@ public class StandAloneCommand extends AbstractServerCommand {
|
||||
this.skipExecutionService.setSkipFlows(skipFlows);
|
||||
this.skipExecutionService.setSkipNamespaces(skipNamespaces);
|
||||
this.skipExecutionService.setSkipTenants(skipTenants);
|
||||
this.skipExecutionService.setSkipIndexerRecords(skipIndexerRecords);
|
||||
this.startExecutorService.applyOptions(startExecutors, notStartExecutors);
|
||||
|
||||
KestraContext.getContext().injectWorkerConfigs(workerThread, null);
|
||||
|
||||
@@ -5,12 +5,15 @@ import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.runners.IndexerInterface;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.ExecutorsUtils;
|
||||
import io.kestra.core.services.SkipExecutionService;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
import picocli.CommandLine.Option;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
@@ -28,11 +31,17 @@ public class WebServerCommand extends AbstractServerCommand {
|
||||
@Inject
|
||||
private ExecutorsUtils executorsUtils;
|
||||
|
||||
@Inject
|
||||
private SkipExecutionService skipExecutionService;
|
||||
|
||||
@Option(names = {"--no-tutorials"}, description = "Flag to disable auto-loading of tutorial flows.")
|
||||
boolean tutorialsDisabled = false;
|
||||
private boolean tutorialsDisabled = false;
|
||||
|
||||
@Option(names = {"--no-indexer"}, description = "Flag to disable starting an embedded indexer.")
|
||||
boolean indexerDisabled = false;
|
||||
private boolean indexerDisabled = false;
|
||||
|
||||
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting purpose only")
|
||||
private List<String> skipIndexerRecords = Collections.emptyList();
|
||||
|
||||
@Override
|
||||
public boolean isFlowAutoLoadEnabled() {
|
||||
@@ -48,6 +57,8 @@ public class WebServerCommand extends AbstractServerCommand {
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
this.skipExecutionService.setSkipIndexerRecords(skipIndexerRecords);
|
||||
|
||||
super.call();
|
||||
|
||||
// start the indexer
|
||||
|
||||
@@ -18,6 +18,10 @@ micronaut:
|
||||
root:
|
||||
paths: classpath:root
|
||||
mapping: /**
|
||||
codec:
|
||||
json:
|
||||
additional-types:
|
||||
- application/scim+json
|
||||
server:
|
||||
max-request-size: 10GB
|
||||
multipart:
|
||||
@@ -78,8 +82,19 @@ micronaut:
|
||||
type: scheduled
|
||||
core-pool-size: 1
|
||||
|
||||
# Disable OpenTelemetry metrics by default, users that need it must enable it and configure the collector URL.
|
||||
metrics:
|
||||
binders:
|
||||
retry:
|
||||
enabled: true
|
||||
netty:
|
||||
queues:
|
||||
enabled: true
|
||||
bytebuf-allocators:
|
||||
enabled: true
|
||||
channels:
|
||||
enabled: true
|
||||
|
||||
# Disable OpenTelemetry metrics by default, users that need it must enable it and configure the collector URL.
|
||||
export:
|
||||
otlp:
|
||||
enabled: false
|
||||
@@ -92,6 +107,8 @@ jackson:
|
||||
serialization-inclusion: non_null
|
||||
deserialization:
|
||||
FAIL_ON_UNKNOWN_PROPERTIES: false
|
||||
mapper:
|
||||
ACCEPT_CASE_INSENSITIVE_ENUMS: true
|
||||
|
||||
endpoints:
|
||||
all:
|
||||
@@ -100,6 +117,10 @@ endpoints:
|
||||
sensitive: false
|
||||
health:
|
||||
details-visible: ANONYMOUS
|
||||
disk-space:
|
||||
enabled: false
|
||||
discovery-client:
|
||||
enabled: false
|
||||
loggers:
|
||||
write-sensitive: false
|
||||
env:
|
||||
@@ -133,12 +154,46 @@ kestra:
|
||||
tutorial-flows:
|
||||
# Automatically loads all tutorial flows at startup.
|
||||
enabled: true
|
||||
|
||||
retries:
|
||||
attempts: 5
|
||||
multiplier: 2.0
|
||||
delay: 1s
|
||||
maxDelay: ""
|
||||
|
||||
server:
|
||||
basic-auth:
|
||||
# These URLs will not be authenticated, by default we open some of the Micronaut default endpoints but not all for security reasons
|
||||
open-urls:
|
||||
- "/ping"
|
||||
- "/api/v1/executions/webhook/"
|
||||
- "/api/v1/main/executions/webhook/"
|
||||
- "/api/v1/*/executions/webhook/"
|
||||
|
||||
preview:
|
||||
initial-rows: 100
|
||||
max-rows: 5000
|
||||
|
||||
# The expected time for this server to complete all its tasks before initiating a graceful shutdown.
|
||||
terminationGracePeriod: 5m
|
||||
workerTaskRestartStrategy: AFTER_TERMINATION_GRACE_PERIOD
|
||||
# Configuration for Liveness and Heartbeat mechanism between servers.
|
||||
liveness:
|
||||
enabled: true
|
||||
# The expected time between liveness probe.
|
||||
interval: 10s
|
||||
# The timeout used to detect service failures.
|
||||
timeout: 1m
|
||||
# The time to wait before executing a liveness probe.
|
||||
initialDelay: 1m
|
||||
# The expected time between service heartbeats.
|
||||
heartbeatInterval: 3s
|
||||
service:
|
||||
purge:
|
||||
initial-delay: 1h
|
||||
fixed-delay: 1d
|
||||
retention: 30d
|
||||
|
||||
jdbc:
|
||||
queues:
|
||||
min-poll-interval: 25ms
|
||||
@@ -150,7 +205,7 @@ kestra:
|
||||
fixed-delay: 1h
|
||||
retention: 7d
|
||||
types:
|
||||
- type : io.kestra.core.models.executions.LogEntry
|
||||
- type: io.kestra.core.models.executions.LogEntry
|
||||
retention: 1h
|
||||
- type: io.kestra.core.models.executions.MetricEntry
|
||||
retention: 1h
|
||||
@@ -182,37 +237,12 @@ kestra:
|
||||
traces:
|
||||
root: DISABLED
|
||||
|
||||
server:
|
||||
basic-auth:
|
||||
# These URLs will not be authenticated, by default we open some of the Micronaut default endpoints but not all for security reasons
|
||||
open-urls:
|
||||
- "/ping"
|
||||
- "/api/v1/executions/webhook/"
|
||||
preview:
|
||||
initial-rows: 100
|
||||
max-rows: 5000
|
||||
# The expected time for this server to complete all its tasks before initiating a graceful shutdown.
|
||||
terminationGracePeriod: 5m
|
||||
workerTaskRestartStrategy: AFTER_TERMINATION_GRACE_PERIOD
|
||||
# Configuration for Liveness and Heartbeat mechanism between servers.
|
||||
liveness:
|
||||
enabled: true
|
||||
# The expected time between liveness probe.
|
||||
interval: 10s
|
||||
# The timeout used to detect service failures.
|
||||
timeout: 1m
|
||||
# The time to wait before executing a liveness probe.
|
||||
initialDelay: 1m
|
||||
# The expected time between service heartbeats.
|
||||
heartbeatInterval: 3s
|
||||
service:
|
||||
purge:
|
||||
initial-delay: 1h
|
||||
fixed-delay: 1d
|
||||
retention: 30d
|
||||
ui-anonymous-usage-report:
|
||||
enabled: true
|
||||
|
||||
anonymous-usage-report:
|
||||
enabled: true
|
||||
uri: https://api.kestra.io/v1/reports/usages
|
||||
uri: https://api.kestra.io/v1/reports/server-events
|
||||
initial-delay: 5m
|
||||
fixed-delay: 1h
|
||||
|
||||
|
||||
@@ -63,6 +63,10 @@ dependencies {
|
||||
exclude group: 'com.fasterxml.jackson.core'
|
||||
}
|
||||
|
||||
// micrometer
|
||||
implementation "io.micronaut.micrometer:micronaut-micrometer-observation"
|
||||
implementation 'io.micrometer:micrometer-java21'
|
||||
|
||||
// test
|
||||
testAnnotationProcessor project(':processor')
|
||||
testImplementation project(':tests')
|
||||
|
||||
@@ -122,12 +122,13 @@ public class JsonSchemaGenerator {
|
||||
if (jsonNode instanceof ObjectNode clazzSchema && clazzSchema.get("required") instanceof ArrayNode requiredPropsNode && clazzSchema.get("properties") instanceof ObjectNode properties) {
|
||||
List<String> requiredFieldValues = StreamSupport.stream(requiredPropsNode.spliterator(), false)
|
||||
.map(JsonNode::asText)
|
||||
.toList();
|
||||
.collect(Collectors.toList());
|
||||
|
||||
properties.fields().forEachRemaining(e -> {
|
||||
int indexInRequiredArray = requiredFieldValues.indexOf(e.getKey());
|
||||
if (indexInRequiredArray != -1 && e.getValue() instanceof ObjectNode valueNode && valueNode.has("default")) {
|
||||
requiredPropsNode.remove(indexInRequiredArray);
|
||||
requiredFieldValues.remove(indexInRequiredArray);
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@@ -6,8 +6,14 @@ import io.kestra.core.http.HttpRequest;
|
||||
import io.kestra.core.http.HttpResponse;
|
||||
import io.kestra.core.http.client.apache.*;
|
||||
import io.kestra.core.http.client.configurations.HttpConfiguration;
|
||||
import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.micrometer.common.KeyValues;
|
||||
import io.micrometer.core.instrument.binder.httpcomponents.hc5.ApacheHttpClientContext;
|
||||
import io.micrometer.core.instrument.binder.httpcomponents.hc5.DefaultApacheHttpClientObservationConvention;
|
||||
import io.micrometer.core.instrument.binder.httpcomponents.hc5.ObservationExecChainHandler;
|
||||
import io.micrometer.observation.ObservationRegistry;
|
||||
import io.micronaut.http.MediaType;
|
||||
import jakarta.annotation.Nullable;
|
||||
import lombok.Builder;
|
||||
@@ -16,6 +22,7 @@ import org.apache.commons.lang3.ArrayUtils;
|
||||
import org.apache.hc.client5.http.ContextBuilder;
|
||||
import org.apache.hc.client5.http.auth.*;
|
||||
import org.apache.hc.client5.http.config.ConnectionConfig;
|
||||
import org.apache.hc.client5.http.impl.ChainElement;
|
||||
import org.apache.hc.client5.http.impl.DefaultAuthenticationStrategy;
|
||||
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
|
||||
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
|
||||
@@ -50,11 +57,16 @@ public class HttpClient implements Closeable {
|
||||
private transient CloseableHttpClient client;
|
||||
private final RunContext runContext;
|
||||
private final HttpConfiguration configuration;
|
||||
private ObservationRegistry observationRegistry;
|
||||
|
||||
@Builder
|
||||
public HttpClient(RunContext runContext, @Nullable HttpConfiguration configuration) throws IllegalVariableEvaluationException {
|
||||
this.runContext = runContext;
|
||||
this.configuration = configuration == null ? HttpConfiguration.builder().build() : configuration;
|
||||
if (runContext instanceof DefaultRunContext defaultRunContext) {
|
||||
this.observationRegistry = defaultRunContext.getApplicationContext().findBean(ObservationRegistry.class).orElse(null);
|
||||
}
|
||||
|
||||
this.client = this.createClient();
|
||||
}
|
||||
|
||||
@@ -67,6 +79,13 @@ public class HttpClient implements Closeable {
|
||||
.disableDefaultUserAgent()
|
||||
.setUserAgent("Kestra");
|
||||
|
||||
if (observationRegistry != null) {
|
||||
// micrometer, must be placed before the retry strategy (see https://docs.micrometer.io/micrometer/reference/reference/httpcomponents.html#_retry_strategy_considerations)
|
||||
builder.addExecInterceptorAfter(ChainElement.RETRY.name(), "micrometer",
|
||||
new ObservationExecChainHandler(observationRegistry, new CustomApacheHttpClientObservationConvention())
|
||||
);
|
||||
}
|
||||
|
||||
// logger
|
||||
if (this.configuration.getLogs() != null && this.configuration.getLogs().length > 0) {
|
||||
if (ArrayUtils.contains(this.configuration.getLogs(), HttpConfiguration.LoggingType.REQUEST_HEADERS) ||
|
||||
@@ -297,4 +316,14 @@ public class HttpClient implements Closeable {
|
||||
this.client.close();
|
||||
}
|
||||
}
|
||||
|
||||
public static class CustomApacheHttpClientObservationConvention extends DefaultApacheHttpClientObservationConvention {
|
||||
@Override
|
||||
public KeyValues getLowCardinalityKeyValues(ApacheHttpClientContext context) {
|
||||
return KeyValues.concat(
|
||||
super.getLowCardinalityKeyValues(context),
|
||||
KeyValues.of("type", "core-client")
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,34 @@
|
||||
package io.kestra.core.metrics;
|
||||
|
||||
import io.micrometer.core.instrument.binder.jvm.JvmThreadDeadlockMetrics;
|
||||
import io.micrometer.java21.instrument.binder.jdk.VirtualThreadMetrics;
|
||||
import io.micronaut.configuration.metrics.annotation.RequiresMetrics;
|
||||
import io.micronaut.context.annotation.Bean;
|
||||
import io.micronaut.context.annotation.Factory;
|
||||
import io.micronaut.context.annotation.Primary;
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import static io.micronaut.configuration.metrics.micrometer.MeterRegistryFactory.MICRONAUT_METRICS_BINDERS;
|
||||
import static io.micronaut.core.util.StringUtils.FALSE;
|
||||
|
||||
@Factory
|
||||
@RequiresMetrics
|
||||
|
||||
public class MeterRegistryBinderFactory {
|
||||
@Bean
|
||||
@Primary
|
||||
@Singleton
|
||||
@Requires(property = MICRONAUT_METRICS_BINDERS + ".jvm.enabled", notEquals = FALSE)
|
||||
public VirtualThreadMetrics virtualThreadMetrics() {
|
||||
return new VirtualThreadMetrics();
|
||||
}
|
||||
|
||||
@Bean
|
||||
@Primary
|
||||
@Singleton
|
||||
@Requires(property = MICRONAUT_METRICS_BINDERS + ".jvm.enabled", notEquals = FALSE)
|
||||
public JvmThreadDeadlockMetrics threadDeadlockMetricsMetrics() {
|
||||
return new JvmThreadDeadlockMetrics();
|
||||
}
|
||||
}
|
||||
@@ -1,13 +1,15 @@
|
||||
package io.kestra.core.models;
|
||||
|
||||
import io.kestra.core.utils.MapUtils;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public record Label(@NotNull String key, @NotNull String value) {
|
||||
@Schema(description = "A key/value pair that can be attached to a Flow or Execution. Labels are often used to organize and categorize objects.")
|
||||
public record Label(@NotEmpty String key, @NotEmpty String value) {
|
||||
public static final String SYSTEM_PREFIX = "system.";
|
||||
|
||||
// system labels
|
||||
|
||||
@@ -272,7 +272,7 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
}
|
||||
|
||||
public Execution withTaskRun(TaskRun taskRun) throws InternalException {
|
||||
ArrayList<TaskRun> newTaskRunList = new ArrayList<>(this.taskRunList);
|
||||
ArrayList<TaskRun> newTaskRunList = this.taskRunList == null ? new ArrayList<>() : new ArrayList<>(this.taskRunList);
|
||||
|
||||
boolean b = Collections.replaceAll(
|
||||
newTaskRunList,
|
||||
@@ -1040,6 +1040,16 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find all children of this {@link TaskRun}.
|
||||
*/
|
||||
public List<TaskRun> findChildren(TaskRun parentTaskRun) {
|
||||
return taskRunList.stream()
|
||||
.filter(taskRun -> parentTaskRun.getId().equals(taskRun.getParentTaskRunId()))
|
||||
.toList();
|
||||
}
|
||||
|
||||
|
||||
public List<String> findParentsValues(TaskRun taskRun, boolean withCurrent) {
|
||||
return (withCurrent ?
|
||||
Stream.concat(findParents(taskRun).stream(), Stream.of(taskRun)) :
|
||||
|
||||
@@ -3,7 +3,6 @@ package io.kestra.core.models.flows;
|
||||
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
|
||||
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.tasks.WorkerGroup;
|
||||
import io.kestra.core.serializers.ListOrMapOfLabelDeserializer;
|
||||
import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
|
||||
@@ -59,7 +58,14 @@ public abstract class AbstractFlow implements FlowInterface {
|
||||
|
||||
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
|
||||
@JsonDeserialize(using = ListOrMapOfLabelDeserializer.class)
|
||||
@Schema(implementation = Object.class, oneOf = {List.class, Map.class})
|
||||
@Schema(
|
||||
description = "Labels as a list of Label (key/value pairs) or as a map of string to string.",
|
||||
oneOf = {
|
||||
Label[].class,
|
||||
Map.class
|
||||
}
|
||||
)
|
||||
@Valid
|
||||
List<Label> labels;
|
||||
|
||||
@Schema(additionalProperties = Schema.AdditionalPropertiesValue.TRUE)
|
||||
@@ -67,4 +73,5 @@ public abstract class AbstractFlow implements FlowInterface {
|
||||
|
||||
@Valid
|
||||
private WorkerGroup workerGroup;
|
||||
|
||||
}
|
||||
|
||||
@@ -116,7 +116,7 @@ public class State {
|
||||
}
|
||||
|
||||
public Instant maxDate() {
|
||||
if (this.histories.size() == 0) {
|
||||
if (this.histories.isEmpty()) {
|
||||
return Instant.now();
|
||||
}
|
||||
|
||||
@@ -124,7 +124,7 @@ public class State {
|
||||
}
|
||||
|
||||
public Instant minDate() {
|
||||
if (this.histories.size() == 0) {
|
||||
if (this.histories.isEmpty()) {
|
||||
return Instant.now();
|
||||
}
|
||||
|
||||
@@ -173,6 +173,11 @@ public class State {
|
||||
return this.current.isBreakpoint();
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public boolean isQueued() {
|
||||
return this.current.isQueued();
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public boolean isRetrying() {
|
||||
return this.current.isRetrying();
|
||||
@@ -206,6 +211,14 @@ public class State {
|
||||
return this.histories.get(this.histories.size() - 2).state.isPaused();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true if the execution has failed, then was restarted.
|
||||
* This is to disambiguate between a RESTARTED after PAUSED and RESTARTED after FAILED state.
|
||||
*/
|
||||
public boolean failedThenRestarted() {
|
||||
return this.current == Type.RESTARTED && this.histories.get(this.histories.size() - 2).state.isFailed();
|
||||
}
|
||||
|
||||
@Introspected
|
||||
public enum Type {
|
||||
CREATED,
|
||||
@@ -264,6 +277,10 @@ public class State {
|
||||
return this == Type.KILLED;
|
||||
}
|
||||
|
||||
public boolean isQueued(){
|
||||
return this == Type.QUEUED;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return states that are terminal to an execution
|
||||
*/
|
||||
|
||||
@@ -68,6 +68,19 @@ public class Property<T> {
|
||||
String getExpression() {
|
||||
return expression;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new {@link Property} with no cached rendered value,
|
||||
* so that the next render will evaluate its original Pebble expression.
|
||||
* <p>
|
||||
* The returned property will still cache its rendered result.
|
||||
* To re-evaluate on a subsequent render, call {@code skipCache()} again.
|
||||
*
|
||||
* @return a new {@link Property} without a pre-rendered value
|
||||
*/
|
||||
public Property<T> skipCache() {
|
||||
return Property.ofExpression(expression);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a new Property object with a value already set.<br>
|
||||
|
||||
@@ -29,6 +29,7 @@ public interface QueueFactoryInterface {
|
||||
String CLUSTER_EVENT_NAMED = "clusterEventQueue";
|
||||
String SUBFLOWEXECUTIONEND_NAMED = "subflowExecutionEndQueue";
|
||||
String EXECUTION_RUNNING_NAMED = "executionRunningQueue";
|
||||
String MULTIPLE_CONDITION_EVENT_NAMED = "multipleConditionEventQueue";
|
||||
|
||||
QueueInterface<Execution> execution();
|
||||
|
||||
@@ -61,4 +62,6 @@ public interface QueueFactoryInterface {
|
||||
QueueInterface<SubflowExecutionEnd> subflowExecutionEnd();
|
||||
|
||||
QueueInterface<ExecutionRunning> executionRunning();
|
||||
|
||||
QueueInterface<MultipleConditionEvent> multipleConditionEvent();
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import io.kestra.core.models.Pauseable;
|
||||
import io.kestra.core.utils.Either;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.List;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public interface QueueInterface<T> extends Closeable, Pauseable {
|
||||
@@ -18,7 +19,15 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
|
||||
emitAsync(null, message);
|
||||
}
|
||||
|
||||
void emitAsync(String consumerGroup, T message) throws QueueException;
|
||||
default void emitAsync(String consumerGroup, T message) throws QueueException {
|
||||
emitAsync(consumerGroup, List.of(message));
|
||||
}
|
||||
|
||||
default void emitAsync(List<T> messages) throws QueueException {
|
||||
emitAsync(null, messages);
|
||||
}
|
||||
|
||||
void emitAsync(String consumerGroup, List<T> messages) throws QueueException;
|
||||
|
||||
default void delete(T message) throws QueueException {
|
||||
delete(null, message);
|
||||
@@ -27,7 +36,7 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
|
||||
void delete(String consumerGroup, T message) throws QueueException;
|
||||
|
||||
default Runnable receive(Consumer<Either<T, DeserializationException>> consumer) {
|
||||
return receive((String) null, consumer);
|
||||
return receive(null, consumer, false);
|
||||
}
|
||||
|
||||
default Runnable receive(String consumerGroup, Consumer<Either<T, DeserializationException>> consumer) {
|
||||
|
||||
@@ -0,0 +1,12 @@
|
||||
package io.kestra.core.queues;
|
||||
|
||||
import java.io.Serial;
|
||||
|
||||
public class UnsupportedMessageException extends QueueException {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public UnsupportedMessageException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
||||
@@ -106,6 +106,8 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
|
||||
|
||||
Integer purge(Execution execution);
|
||||
|
||||
Integer purge(List<Execution> executions);
|
||||
|
||||
List<DailyExecutionStatistics> dailyStatisticsForAllTenants(
|
||||
@Nullable String query,
|
||||
@Nullable String namespace,
|
||||
@@ -161,7 +163,7 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
|
||||
}
|
||||
|
||||
List<Execution> lastExecutions(
|
||||
@Nullable String tenantId,
|
||||
String tenantId,
|
||||
@Nullable List<FlowFilter> flows
|
||||
);
|
||||
}
|
||||
|
||||
@@ -25,8 +25,8 @@ public interface FlowRepositoryInterface {
|
||||
* Used only if result is used internally and not exposed to the user.
|
||||
* It is useful when we want to restart/resume a flow.
|
||||
*/
|
||||
default Flow findByExecutionWithoutAcl(Execution execution) {
|
||||
Optional<Flow> find = this.findByIdWithoutAcl(
|
||||
default FlowWithSource findByExecutionWithoutAcl(Execution execution) {
|
||||
Optional<FlowWithSource> find = this.findByIdWithSourceWithoutAcl(
|
||||
execution.getTenantId(),
|
||||
execution.getNamespace(),
|
||||
execution.getFlowId(),
|
||||
|
||||
@@ -94,6 +94,8 @@ public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry
|
||||
|
||||
Integer purge(Execution execution);
|
||||
|
||||
Integer purge(List<Execution> executions);
|
||||
|
||||
void deleteByQuery(String tenantId, String executionId, String taskId, String taskRunId, Level minLevel, Integer attempt);
|
||||
|
||||
void deleteByQuery(String tenantId, String namespace, String flowId, String triggerId);
|
||||
|
||||
@@ -29,6 +29,8 @@ public interface MetricRepositoryInterface extends SaveRepositoryInterface<Metri
|
||||
|
||||
Integer purge(Execution execution);
|
||||
|
||||
Integer purge(List<Execution> executions);
|
||||
|
||||
Flux<MetricEntry> findAllAsync(@Nullable String tenantId);
|
||||
|
||||
default Function<String, String> sortMapping() throws IllegalArgumentException {
|
||||
|
||||
@@ -86,7 +86,7 @@ public class Executor {
|
||||
|
||||
public Boolean canBeProcessed() {
|
||||
return !(this.getException() != null || this.getFlow() == null || this.getFlow() instanceof FlowWithException || this.getFlow().getTasks() == null ||
|
||||
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint());
|
||||
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint() || this.getExecution().getState().isQueued());
|
||||
}
|
||||
|
||||
public Executor withFlow(FlowWithSource flow) {
|
||||
|
||||
@@ -237,9 +237,9 @@ public class ExecutorService {
|
||||
try {
|
||||
state = flowableParent.resolveState(runContext, execution, parentTaskRun);
|
||||
} catch (Exception e) {
|
||||
// This will lead to the next task being still executed but at least Kestra will not crash.
|
||||
// This will lead to the next task being still executed, but at least Kestra will not crash.
|
||||
// This is the best we can do, Flowable task should not fail, so it's a kind of panic mode.
|
||||
runContext.logger().error("Unable to resolve state from the Flowable task: " + e.getMessage(), e);
|
||||
runContext.logger().error("Unable to resolve state from the Flowable task: {}", e.getMessage(), e);
|
||||
state = Optional.of(State.Type.FAILED);
|
||||
}
|
||||
Optional<WorkerTaskResult> endedTask = childWorkerTaskTypeToWorkerTask(
|
||||
@@ -589,6 +589,23 @@ public class ExecutorService {
|
||||
list = list.stream().filter(workerTaskResult -> !workerTaskResult.getTaskRun().getId().equals(taskRun.getParentTaskRunId()))
|
||||
.collect(Collectors.toCollection(ArrayList::new));
|
||||
}
|
||||
|
||||
// If the task is a flowable and its terminated, check that all children are terminated.
|
||||
// This may not be the case for parallel flowable tasks like Parallel, Dag, ForEach...
|
||||
// After a fail task, some child flowable may not be correctly terminated.
|
||||
if (task instanceof FlowableTask<?> && taskRun.getState().isTerminated()) {
|
||||
List<TaskRun> updated = executor.getExecution().findChildren(taskRun).stream()
|
||||
.filter(child -> !child.getState().isTerminated())
|
||||
.map(throwFunction(child -> child.withState(taskRun.getState().getCurrent())))
|
||||
.toList();
|
||||
if (!updated.isEmpty()) {
|
||||
Execution execution = executor.getExecution();
|
||||
for (TaskRun child : updated) {
|
||||
execution = execution.withTaskRun(child);
|
||||
}
|
||||
executor = executor.withExecution(execution, "handledTerminatedFlowableTasks");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
metricRegistry
|
||||
@@ -1045,6 +1062,17 @@ public class ExecutorService {
|
||||
var executionUpdatingTask = (ExecutionUpdatableTask) workerTask.getTask();
|
||||
|
||||
try {
|
||||
// handle runIf
|
||||
if (!TruthUtils.isTruthy(workerTask.getRunContext().render(workerTask.getTask().getRunIf()))) {
|
||||
executor.withExecution(
|
||||
executor
|
||||
.getExecution()
|
||||
.withTaskRun(workerTask.getTaskRun().withState(State.Type.SKIPPED)),
|
||||
"handleExecutionUpdatingTaskSkipped"
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
executor.withExecution(
|
||||
executionUpdatingTask.update(executor.getExecution(), workerTask.getRunContext())
|
||||
.withTaskRun(workerTask.getTaskRun().withState(State.Type.RUNNING)),
|
||||
@@ -1144,7 +1172,7 @@ public class ExecutorService {
|
||||
}
|
||||
}
|
||||
|
||||
return taskRuns.size() > execution.getTaskRunList().size() ? execution.withTaskRunList(taskRuns) : null;
|
||||
return taskRuns.size() > ListUtils.emptyOnNull(execution.getTaskRunList()).size() ? execution.withTaskRunList(taskRuns) : null;
|
||||
}
|
||||
|
||||
public boolean canBePurged(final Executor executor) {
|
||||
|
||||
@@ -82,6 +82,8 @@ public abstract class FilesService {
|
||||
}
|
||||
|
||||
private static String resolveUniqueNameForFile(final Path path) {
|
||||
return IdUtils.from(path.toString()) + "-" + path.toFile().getName();
|
||||
String filename = path.getFileName().toString();
|
||||
String encodedFilename = java.net.URLEncoder.encode(filename, java.nio.charset.StandardCharsets.UTF_8);
|
||||
return IdUtils.from(path.toString()) + "-" + encodedFilename;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -463,7 +463,7 @@ public class FlowableUtils {
|
||||
|
||||
ArrayList<ResolvedTask> result = new ArrayList<>();
|
||||
|
||||
int index = 0;
|
||||
int iteration = 0;
|
||||
for (Object current : distinctValue) {
|
||||
try {
|
||||
String resolvedValue = current instanceof String stringValue ? stringValue : MAPPER.writeValueAsString(current);
|
||||
@@ -471,7 +471,7 @@ public class FlowableUtils {
|
||||
result.add(ResolvedTask.builder()
|
||||
.task(task)
|
||||
.value(resolvedValue)
|
||||
.iteration(index++)
|
||||
.iteration(iteration)
|
||||
.parentId(parentTaskRun.getId())
|
||||
.build()
|
||||
);
|
||||
@@ -479,6 +479,7 @@ public class FlowableUtils {
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalVariableEvaluationException(e);
|
||||
}
|
||||
iteration++;
|
||||
}
|
||||
|
||||
return result;
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
|
||||
public record MultipleConditionEvent(Flow flow, Execution execution) implements HasUID {
|
||||
@Override
|
||||
public String uid() {
|
||||
return IdUtils.fromParts(flow.uidWithoutRevision(), execution.getId());
|
||||
}
|
||||
}
|
||||
@@ -29,7 +29,7 @@ import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
||||
private static final int MAX_MESSAGE_LENGTH = 1024 * 10;
|
||||
private static final int MAX_MESSAGE_LENGTH = 1024 * 15;
|
||||
public static final String ORIGINAL_TIMESTAMP_KEY = "originalTimestamp";
|
||||
|
||||
private final String loggerName;
|
||||
@@ -80,7 +80,6 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
||||
}
|
||||
|
||||
List<LogEntry> result = new ArrayList<>();
|
||||
long i = 0;
|
||||
for (String s : split) {
|
||||
result.add(LogEntry.builder()
|
||||
.namespace(logEntry.getNamespace())
|
||||
@@ -98,7 +97,6 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
||||
.thread(event.getThreadName())
|
||||
.build()
|
||||
);
|
||||
i++;
|
||||
}
|
||||
|
||||
return result;
|
||||
@@ -331,14 +329,11 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
||||
protected void append(ILoggingEvent e) {
|
||||
e = this.transform(e);
|
||||
|
||||
logEntries(e, logEntry)
|
||||
.forEach(l -> {
|
||||
try {
|
||||
logQueue.emitAsync(l);
|
||||
} catch (QueueException ex) {
|
||||
log.warn("Unable to emit logQueue", ex);
|
||||
}
|
||||
});
|
||||
try {
|
||||
logQueue.emitAsync(logEntries(e, logEntry));
|
||||
} catch (QueueException ex) {
|
||||
log.warn("Unable to emit logQueue", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,15 +4,11 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import jakarta.validation.ConstraintViolation;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import jakarta.validation.Validator;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
|
||||
@@ -27,12 +23,19 @@ public class RunContextProperty<T> {
|
||||
private final RunContext runContext;
|
||||
private final Task task;
|
||||
private final AbstractTrigger trigger;
|
||||
|
||||
private final boolean skipCache;
|
||||
|
||||
RunContextProperty(Property<T> property, RunContext runContext) {
|
||||
this(property, runContext, false);
|
||||
}
|
||||
|
||||
RunContextProperty(Property<T> property, RunContext runContext, boolean skipCache) {
|
||||
this.property = property;
|
||||
this.runContext = runContext;
|
||||
this.task = ((DefaultRunContext) runContext).getTask();
|
||||
this.trigger = ((DefaultRunContext) runContext).getTrigger();
|
||||
this.skipCache = skipCache;
|
||||
}
|
||||
|
||||
private void validate() {
|
||||
@@ -45,6 +48,19 @@ public class RunContextProperty<T> {
|
||||
log.trace("Unable to do validation: no task or trigger found");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new {@link RunContextProperty} that will always be rendered by evaluating
|
||||
* its original Pebble expression, without using any previously cached value.
|
||||
* <p>
|
||||
* This ensures that each time the property is rendered, the underlying
|
||||
* expression is re-evaluated to produce a fresh result.
|
||||
*
|
||||
* @return a new {@link Property} that bypasses the cache
|
||||
*/
|
||||
public RunContextProperty<T> skipCache() {
|
||||
return new RunContextProperty<>(this.property, this.runContext, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Render a property then convert it to its target type and validate it.<br>
|
||||
@@ -55,13 +71,13 @@ public class RunContextProperty<T> {
|
||||
* Warning, due to the caching mechanism, this method is not thread-safe.
|
||||
*/
|
||||
public Optional<T> as(Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.as(prop, this.runContext, clazz)));
|
||||
|
||||
validate();
|
||||
return as;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Render a property with additional variables, then convert it to its target type and validate it.<br>
|
||||
*
|
||||
@@ -71,7 +87,7 @@ public class RunContextProperty<T> {
|
||||
* Warning, due to the caching mechanism, this method is not thread-safe.
|
||||
*/
|
||||
public Optional<T> as(Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.as(prop, this.runContext, clazz, variables)));
|
||||
|
||||
validate();
|
||||
@@ -89,7 +105,7 @@ public class RunContextProperty<T> {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public <I> T asList(Class<I> itemClazz) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz)))
|
||||
.orElse((T) Collections.emptyList());
|
||||
|
||||
@@ -108,7 +124,7 @@ public class RunContextProperty<T> {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public <I> T asList(Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz, variables)))
|
||||
.orElse((T) Collections.emptyList());
|
||||
|
||||
@@ -127,7 +143,7 @@ public class RunContextProperty<T> {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass)))
|
||||
.orElse((T) Collections.emptyMap());
|
||||
|
||||
@@ -146,11 +162,15 @@ public class RunContextProperty<T> {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass, variables)))
|
||||
.orElse((T) Collections.emptyMap());
|
||||
|
||||
validate();
|
||||
return as;
|
||||
}
|
||||
|
||||
private Property<T> getProperty() {
|
||||
return skipCache ? this.property.skipCache() : this.property;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,7 +45,7 @@ final class Secret {
|
||||
for (var entry: data.entrySet()) {
|
||||
if (entry.getValue() instanceof Map map) {
|
||||
// if some value are of type EncryptedString we decode them and replace the object
|
||||
if (EncryptedString.TYPE.equalsIgnoreCase((String)map.get("type"))) {
|
||||
if (map.get("type") instanceof String typeStr && EncryptedString.TYPE.equalsIgnoreCase(typeStr)) {
|
||||
try {
|
||||
String decoded = decrypt((String) map.get("value"));
|
||||
decryptedMap.put(entry.getKey(), decoded);
|
||||
|
||||
@@ -0,0 +1,39 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.runners.pebble.PebbleEngineFactory;
|
||||
import io.kestra.core.runners.pebble.functions.SecretFunction;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import java.util.List;
|
||||
|
||||
@Singleton
|
||||
public class SecureVariableRendererFactory {
|
||||
|
||||
private final PebbleEngineFactory pebbleEngineFactory;
|
||||
private final ApplicationContext applicationContext;
|
||||
|
||||
private VariableRenderer secureVariableRenderer;
|
||||
|
||||
@Inject
|
||||
public SecureVariableRendererFactory(ApplicationContext applicationContext, PebbleEngineFactory pebbleEngineFactory) {
|
||||
this.pebbleEngineFactory = pebbleEngineFactory;
|
||||
this.applicationContext = applicationContext;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates or returns the existing secured {@link VariableRenderer} instance.
|
||||
*
|
||||
* @return the secured {@link VariableRenderer} instance
|
||||
*/
|
||||
public synchronized VariableRenderer createOrGet() {
|
||||
if (this.secureVariableRenderer == null) {
|
||||
// Explicitly create a new instance through the application context to ensure
|
||||
// eventual custom VariableRenderer implementation is used
|
||||
secureVariableRenderer = applicationContext.createBean(VariableRenderer.class);
|
||||
secureVariableRenderer.setPebbleEngine(pebbleEngineFactory.createWithMaskedFunctions(secureVariableRenderer, List.of(SecretFunction.NAME)));
|
||||
}
|
||||
return secureVariableRenderer;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,121 +2,44 @@ package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.runners.pebble.*;
|
||||
import io.kestra.core.runners.pebble.functions.RenderingFunctionInterface;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.annotation.ConfigurationProperties;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.pebbletemplates.pebble.PebbleEngine;
|
||||
import io.pebbletemplates.pebble.error.AttributeNotFoundException;
|
||||
import io.pebbletemplates.pebble.error.PebbleException;
|
||||
import io.pebbletemplates.pebble.extension.Extension;
|
||||
import io.pebbletemplates.pebble.extension.Function;
|
||||
import io.pebbletemplates.pebble.template.PebbleTemplate;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.util.*;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Singleton
|
||||
public class VariableRenderer {
|
||||
private static final Pattern RAW_PATTERN = Pattern.compile("(\\{%-*\\s*raw\\s*-*%}(.*?)\\{%-*\\s*endraw\\s*-*%})");
|
||||
public static final int MAX_RENDERING_AMOUNT = 100;
|
||||
|
||||
private final PebbleEngine pebbleEngine;
|
||||
private PebbleEngine pebbleEngine;
|
||||
private final VariableConfiguration variableConfiguration;
|
||||
|
||||
@Inject
|
||||
public VariableRenderer(ApplicationContext applicationContext, @Nullable VariableConfiguration variableConfiguration) {
|
||||
this(applicationContext, variableConfiguration, Collections.emptyList());
|
||||
this(applicationContext.getBean(PebbleEngineFactory.class), variableConfiguration);
|
||||
}
|
||||
|
||||
public VariableRenderer(ApplicationContext applicationContext, @Nullable VariableConfiguration variableConfiguration, List<String> functionsToMask) {
|
||||
|
||||
public VariableRenderer(PebbleEngineFactory pebbleEngineFactory, @Nullable VariableConfiguration variableConfiguration) {
|
||||
this.variableConfiguration = variableConfiguration != null ? variableConfiguration : new VariableConfiguration();
|
||||
|
||||
PebbleEngine.Builder pebbleBuilder = new PebbleEngine.Builder()
|
||||
.registerExtensionCustomizer(ExtensionCustomizer::new)
|
||||
.strictVariables(true)
|
||||
.cacheActive(this.variableConfiguration.getCacheEnabled())
|
||||
.newLineTrimming(false)
|
||||
.autoEscaping(false);
|
||||
|
||||
List<Extension> extensions = applicationContext.getBeansOfType(Extension.class).stream()
|
||||
.map(e -> functionsToMask.stream().anyMatch(excludedFunction -> e.getFunctions().containsKey(excludedFunction))
|
||||
? extensionWithMaskedFunctions(e, functionsToMask)
|
||||
: e)
|
||||
.toList();
|
||||
|
||||
extensions.forEach(pebbleBuilder::extension);
|
||||
|
||||
if (this.variableConfiguration.getCacheEnabled()) {
|
||||
pebbleBuilder.templateCache(new PebbleLruCache(this.variableConfiguration.getCacheSize()));
|
||||
}
|
||||
|
||||
this.pebbleEngine = pebbleBuilder.build();
|
||||
this.pebbleEngine = pebbleEngineFactory.create();
|
||||
}
|
||||
|
||||
private Extension extensionWithMaskedFunctions(Extension initialExtension, List<String> maskedFunctions) {
|
||||
return (Extension) Proxy.newProxyInstance(
|
||||
initialExtension.getClass().getClassLoader(),
|
||||
new Class[]{Extension.class},
|
||||
(proxy, method, methodArgs) -> {
|
||||
if (method.getName().equals("getFunctions")) {
|
||||
return initialExtension.getFunctions().entrySet().stream()
|
||||
.map(entry -> {
|
||||
if (maskedFunctions.contains(entry.getKey())) {
|
||||
return Map.entry(entry.getKey(), this.maskedFunctionProxy(entry.getValue()));
|
||||
} else if (RenderingFunctionInterface.class.isAssignableFrom(entry.getValue().getClass())) {
|
||||
return Map.entry(entry.getKey(), this.variableRendererProxy(entry.getValue()));
|
||||
}
|
||||
|
||||
return entry;
|
||||
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
}
|
||||
|
||||
return method.invoke(initialExtension, methodArgs);
|
||||
}
|
||||
);
|
||||
|
||||
public void setPebbleEngine(final PebbleEngine pebbleEngine) {
|
||||
this.pebbleEngine = pebbleEngine;
|
||||
}
|
||||
|
||||
private Function variableRendererProxy(Function initialFunction) {
|
||||
return (Function) Proxy.newProxyInstance(
|
||||
initialFunction.getClass().getClassLoader(),
|
||||
new Class[]{Function.class, RenderingFunctionInterface.class},
|
||||
(functionProxy, functionMethod, functionArgs) -> {
|
||||
if (functionMethod.getName().equals("variableRenderer")) {
|
||||
return this;
|
||||
}
|
||||
return functionMethod.invoke(initialFunction, functionArgs);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private Function maskedFunctionProxy(Function initialFunction) {
|
||||
return (Function) Proxy.newProxyInstance(
|
||||
initialFunction.getClass().getClassLoader(),
|
||||
new Class[]{Function.class},
|
||||
(functionProxy, functionMethod, functionArgs) -> {
|
||||
Object result;
|
||||
try {
|
||||
result = functionMethod.invoke(initialFunction, functionArgs);
|
||||
} catch (InvocationTargetException e) {
|
||||
throw e.getCause();
|
||||
}
|
||||
if (functionMethod.getName().equals("execute")) {
|
||||
return "******";
|
||||
}
|
||||
return result;
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
public static IllegalVariableEvaluationException properPebbleException(PebbleException initialExtension) {
|
||||
if (initialExtension instanceof AttributeNotFoundException current) {
|
||||
return new IllegalVariableEvaluationException(
|
||||
|
||||
@@ -508,14 +508,11 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
Execution execution = workerTrigger.getTrigger().isFailOnTriggerError() ? TriggerService.generateExecution(workerTrigger.getTrigger(), workerTrigger.getConditionContext(), workerTrigger.getTriggerContext(), (Output) null)
|
||||
.withState(FAILED) : null;
|
||||
if (execution != null) {
|
||||
RunContextLogger.logEntries(Execution.loggingEventFromException(e), LogEntry.of(execution))
|
||||
.forEach(log -> {
|
||||
try {
|
||||
logQueue.emitAsync(log);
|
||||
} catch (QueueException ex) {
|
||||
// fail silently
|
||||
}
|
||||
});
|
||||
try {
|
||||
logQueue.emitAsync(RunContextLogger.logEntries(Execution.loggingEventFromException(e), LogEntry.of(execution)));
|
||||
} catch (QueueException ex) {
|
||||
// fail silently
|
||||
}
|
||||
}
|
||||
this.workerTriggerResultQueue.emit(
|
||||
WorkerTriggerResult.builder()
|
||||
@@ -764,6 +761,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
workerTask = workerTask.withTaskRun(workerTask.getTaskRun().withState(state));
|
||||
|
||||
WorkerTaskResult workerTaskResult = new WorkerTaskResult(workerTask.getTaskRun(), dynamicTaskRuns);
|
||||
|
||||
this.workerTaskResultQueue.emit(workerTaskResult);
|
||||
|
||||
// upload the cache file, hash may not be present if we didn't succeed in computing it
|
||||
@@ -796,6 +794,10 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
// If it's a message too big, we remove the outputs
|
||||
failed = failed.withOutputs(Variables.empty());
|
||||
}
|
||||
if (e instanceof UnsupportedMessageException) {
|
||||
// we expect the offending char is in the output so we remove it
|
||||
failed = failed.withOutputs(Variables.empty());
|
||||
}
|
||||
WorkerTaskResult workerTaskResult = new WorkerTaskResult(failed);
|
||||
RunContextLogger contextLogger = runContextLoggerFactory.create(workerTask);
|
||||
contextLogger.logger().error("Unable to emit the worker task result to the queue: {}", e.getMessage(), e);
|
||||
@@ -818,7 +820,11 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
private Optional<String> hashTask(RunContext runContext, Task task) {
|
||||
try {
|
||||
var map = JacksonMapper.toMap(task);
|
||||
var rMap = runContext.render(map);
|
||||
// If there are task provided variables, rendering the task may fail.
|
||||
// The best we can do is to add a fake 'workingDir' as it's an often added variables,
|
||||
// and it should not be part of the task hash.
|
||||
Map<String, Object> variables = Map.of("workingDir", "workingDir");
|
||||
var rMap = runContext.render(map, variables);
|
||||
var json = JacksonMapper.ofJson().writeValueAsBytes(rMap);
|
||||
MessageDigest digest = MessageDigest.getInstance("SHA-256");
|
||||
digest.update(json);
|
||||
|
||||
@@ -0,0 +1,118 @@
|
||||
package io.kestra.core.runners.pebble;
|
||||
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import io.kestra.core.runners.pebble.functions.RenderingFunctionInterface;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.pebbletemplates.pebble.PebbleEngine;
|
||||
import io.pebbletemplates.pebble.extension.Extension;
|
||||
import io.pebbletemplates.pebble.extension.Function;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Singleton
|
||||
public class PebbleEngineFactory {
|
||||
|
||||
private final ApplicationContext applicationContext;
|
||||
private final VariableRenderer.VariableConfiguration variableConfiguration;
|
||||
|
||||
@Inject
|
||||
public PebbleEngineFactory(ApplicationContext applicationContext, @Nullable VariableRenderer.VariableConfiguration variableConfiguration) {
|
||||
this.applicationContext = applicationContext;
|
||||
this.variableConfiguration = variableConfiguration;
|
||||
}
|
||||
|
||||
public PebbleEngine create() {
|
||||
PebbleEngine.Builder builder = newPebbleEngineBuilder();
|
||||
this.applicationContext.getBeansOfType(Extension.class).forEach(builder::extension);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public PebbleEngine createWithMaskedFunctions(VariableRenderer renderer, final List<String> functionsToMask) {
|
||||
|
||||
PebbleEngine.Builder builder = newPebbleEngineBuilder();
|
||||
|
||||
this.applicationContext.getBeansOfType(Extension.class).stream()
|
||||
.map(e -> functionsToMask.stream().anyMatch(fun -> e.getFunctions().containsKey(fun))
|
||||
? extensionWithMaskedFunctions(renderer, e, functionsToMask)
|
||||
: e)
|
||||
.forEach(builder::extension);
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private PebbleEngine.Builder newPebbleEngineBuilder() {
|
||||
PebbleEngine.Builder builder = new PebbleEngine.Builder()
|
||||
.registerExtensionCustomizer(ExtensionCustomizer::new)
|
||||
.strictVariables(true)
|
||||
.cacheActive(this.variableConfiguration.getCacheEnabled())
|
||||
.newLineTrimming(false)
|
||||
.autoEscaping(false);
|
||||
|
||||
if (this.variableConfiguration.getCacheEnabled()) {
|
||||
builder = builder.templateCache(new PebbleLruCache(this.variableConfiguration.getCacheSize()));
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
|
||||
private Extension extensionWithMaskedFunctions(VariableRenderer renderer, Extension initialExtension, List<String> maskedFunctions) {
|
||||
return (Extension) Proxy.newProxyInstance(
|
||||
initialExtension.getClass().getClassLoader(),
|
||||
new Class[]{Extension.class},
|
||||
(proxy, method, methodArgs) -> {
|
||||
if (method.getName().equals("getFunctions")) {
|
||||
return initialExtension.getFunctions().entrySet().stream()
|
||||
.map(entry -> {
|
||||
if (maskedFunctions.contains(entry.getKey())) {
|
||||
return Map.entry(entry.getKey(), this.maskedFunctionProxy(entry.getValue()));
|
||||
} else if (RenderingFunctionInterface.class.isAssignableFrom(entry.getValue().getClass())) {
|
||||
return Map.entry(entry.getKey(), this.variableRendererProxy(renderer, entry.getValue()));
|
||||
}
|
||||
|
||||
return entry;
|
||||
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
}
|
||||
|
||||
return method.invoke(initialExtension, methodArgs);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private Function variableRendererProxy(VariableRenderer renderer, Function initialFunction) {
|
||||
return (Function) Proxy.newProxyInstance(
|
||||
initialFunction.getClass().getClassLoader(),
|
||||
new Class[]{Function.class, RenderingFunctionInterface.class},
|
||||
(functionProxy, functionMethod, functionArgs) -> {
|
||||
if (functionMethod.getName().equals("variableRenderer")) {
|
||||
return renderer;
|
||||
}
|
||||
return functionMethod.invoke(initialFunction, functionArgs);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private Function maskedFunctionProxy(Function initialFunction) {
|
||||
return (Function) Proxy.newProxyInstance(
|
||||
initialFunction.getClass().getClassLoader(),
|
||||
new Class[]{Function.class},
|
||||
(functionProxy, functionMethod, functionArgs) -> {
|
||||
Object result;
|
||||
try {
|
||||
result = functionMethod.invoke(initialFunction, functionArgs);
|
||||
} catch (InvocationTargetException e) {
|
||||
throw e.getCause();
|
||||
}
|
||||
if (functionMethod.getName().equals("execute")) {
|
||||
return "******";
|
||||
}
|
||||
return result;
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -1,14 +1,15 @@
|
||||
package io.kestra.core.runners.pebble.filters;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import io.pebbletemplates.pebble.error.PebbleException;
|
||||
import io.pebbletemplates.pebble.extension.Filter;
|
||||
import io.pebbletemplates.pebble.template.EvaluationContext;
|
||||
import io.pebbletemplates.pebble.template.PebbleTemplate;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class ChunkFilter implements Filter {
|
||||
@Override
|
||||
public List<String> getArgumentNames() {
|
||||
@@ -30,6 +31,10 @@ public class ChunkFilter implements Filter {
|
||||
throw new PebbleException(null, "'chunk' filter can only be applied to List. Actual type was: " + input.getClass().getName(), lineNumber, self.getName());
|
||||
}
|
||||
|
||||
return Lists.partition((List) input, ((Long) args.get("size")).intValue());
|
||||
Object sizeObj = args.get("size");
|
||||
if (!(sizeObj instanceof Number)) {
|
||||
throw new PebbleException(null, "'chunk' filter argument 'size' must be a number. Actual type was: " + sizeObj.getClass().getName(), lineNumber, self.getName());
|
||||
}
|
||||
return Lists.partition((List) input, ((Number) sizeObj).intValue());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,12 +17,17 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class JqFilter implements Filter {
|
||||
private final Scope scope;
|
||||
// Load Scope once as static to avoid repeated initialization
|
||||
// This improves performance by loading builtin functions only once when the class loads
|
||||
private static final Scope SCOPE;
|
||||
private final List<String> argumentNames = new ArrayList<>();
|
||||
|
||||
static {
|
||||
SCOPE = Scope.newEmptyScope();
|
||||
BuiltinFunctionLoader.getInstance().loadFunctions(Versions.JQ_1_6, SCOPE);
|
||||
}
|
||||
|
||||
public JqFilter() {
|
||||
scope = Scope.newEmptyScope();
|
||||
BuiltinFunctionLoader.getInstance().loadFunctions(Versions.JQ_1_6, scope);
|
||||
this.argumentNames.add("expression");
|
||||
}
|
||||
|
||||
@@ -43,10 +48,7 @@ public class JqFilter implements Filter {
|
||||
|
||||
String pattern = (String) args.get("expression");
|
||||
|
||||
Scope rootScope = Scope.newEmptyScope();
|
||||
BuiltinFunctionLoader.getInstance().loadFunctions(Versions.JQ_1_6, rootScope);
|
||||
try {
|
||||
|
||||
JsonQuery q = JsonQuery.compile(pattern, Versions.JQ_1_6);
|
||||
|
||||
JsonNode in;
|
||||
@@ -59,7 +61,7 @@ public class JqFilter implements Filter {
|
||||
final List<Object> out = new ArrayList<>();
|
||||
|
||||
try {
|
||||
q.apply(scope, in, v -> {
|
||||
q.apply(Scope.newChildScope(SCOPE), in, v -> {
|
||||
if (v instanceof TextNode) {
|
||||
out.add(v.textValue());
|
||||
} else if (v instanceof NullNode) {
|
||||
|
||||
@@ -8,6 +8,7 @@ import io.kestra.core.events.CrudEventType;
|
||||
import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
@@ -29,10 +30,7 @@ import io.kestra.core.server.Service;
|
||||
import io.kestra.core.server.ServiceStateChangeEvent;
|
||||
import io.kestra.core.server.ServiceType;
|
||||
import io.kestra.core.services.*;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.Either;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.*;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.event.ApplicationEventPublisher;
|
||||
import io.micronaut.core.util.CollectionUtils;
|
||||
@@ -91,7 +89,9 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
private volatile Boolean isReady = false;
|
||||
|
||||
private final ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||
private ScheduledFuture<?> scheduledFuture;
|
||||
private final ScheduledExecutorService executionMonitorExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||
private ScheduledFuture<?> executionMonitorFuture;
|
||||
|
||||
@Getter
|
||||
protected SchedulerTriggerStateInterface triggerState;
|
||||
@@ -152,7 +152,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
this.flowListeners.run();
|
||||
this.flowListeners.listen(this::initializedTriggers);
|
||||
|
||||
ScheduledFuture<?> evaluationLoop = scheduleExecutor.scheduleAtFixedRate(
|
||||
scheduledFuture = scheduleExecutor.scheduleAtFixedRate(
|
||||
this::handle,
|
||||
0,
|
||||
1,
|
||||
@@ -162,10 +162,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
// look at exception on the evaluation loop thread
|
||||
Thread.ofVirtual().name("scheduler-evaluation-loop-watch").start(
|
||||
() -> {
|
||||
Await.until(evaluationLoop::isDone);
|
||||
Await.until(scheduledFuture::isDone);
|
||||
|
||||
try {
|
||||
evaluationLoop.get();
|
||||
scheduledFuture.get();
|
||||
} catch (CancellationException ignored) {
|
||||
|
||||
} catch (ExecutionException | InterruptedException e) {
|
||||
@@ -177,7 +177,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
);
|
||||
|
||||
// Periodically report metrics and logs of running executions
|
||||
ScheduledFuture<?> monitoringLoop = executionMonitorExecutor.scheduleWithFixedDelay(
|
||||
executionMonitorFuture = executionMonitorExecutor.scheduleWithFixedDelay(
|
||||
this::executionMonitor,
|
||||
30,
|
||||
10,
|
||||
@@ -187,10 +187,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
// look at exception on the monitoring loop thread
|
||||
Thread.ofVirtual().name("scheduler-monitoring-loop-watch").start(
|
||||
() -> {
|
||||
Await.until(monitoringLoop::isDone);
|
||||
Await.until(executionMonitorFuture::isDone);
|
||||
|
||||
try {
|
||||
monitoringLoop.get();
|
||||
executionMonitorFuture.get();
|
||||
} catch (CancellationException ignored) {
|
||||
|
||||
} catch (ExecutionException | InterruptedException e) {
|
||||
@@ -302,6 +302,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
// Initialized local trigger state,
|
||||
// and if some flows were created outside the box, for example from the CLI,
|
||||
// then we may have some triggers that are not created yet.
|
||||
/* FIXME: There is a race between Kafka stream consumption & initializedTriggers: we can override a trigger update coming from a stream consumption with an old one because stream consumption is not waiting for trigger initialization
|
||||
* Example: we see a SUCCESS execution so we reset the trigger's executionId but then the initializedTriggers resubmits an old trigger state for some reasons (evaluationDate for eg.) */
|
||||
private void initializedTriggers(List<FlowWithSource> flows) {
|
||||
record FlowAndTrigger(FlowWithSource flow, AbstractTrigger trigger) {
|
||||
@Override
|
||||
@@ -318,7 +320,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
}
|
||||
|
||||
synchronized (this) { // we need a sync block as we read then update so we should not do it in multiple threads concurrently
|
||||
List<Trigger> triggers = triggerState.findAllForAllTenants();
|
||||
Map<String, Trigger> triggers = triggerState.findAllForAllTenants().stream().collect(Collectors.toMap(HasUID::uid, Function.identity()));
|
||||
|
||||
flows
|
||||
.stream()
|
||||
@@ -328,7 +330,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
.flatMap(flow -> flow.getTriggers().stream().filter(trigger -> trigger instanceof WorkerTriggerInterface).map(trigger -> new FlowAndTrigger(flow, trigger)))
|
||||
.distinct()
|
||||
.forEach(flowAndTrigger -> {
|
||||
Optional<Trigger> trigger = triggers.stream().filter(t -> t.uid().equals(Trigger.uid(flowAndTrigger.flow(), flowAndTrigger.trigger()))).findFirst(); // must have one or none
|
||||
String triggerUid = Trigger.uid(flowAndTrigger.flow(), flowAndTrigger.trigger());
|
||||
Optional<Trigger> trigger = Optional.ofNullable(triggers.get(triggerUid));
|
||||
if (trigger.isEmpty()) {
|
||||
RunContext runContext = runContextFactory.of(flowAndTrigger.flow(), flowAndTrigger.trigger());
|
||||
ConditionContext conditionContext = conditionService.conditionContext(runContext, flowAndTrigger.flow(), null);
|
||||
@@ -371,10 +374,13 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
|
||||
this.triggerState.update(lastUpdate);
|
||||
}
|
||||
} else if (recoverMissedSchedules == RecoverMissedSchedules.NONE) {
|
||||
lastUpdate = trigger.get().toBuilder().nextExecutionDate(schedule.nextEvaluationDate()).build();
|
||||
} else {
|
||||
ZonedDateTime nextEvaluationDate = schedule.nextEvaluationDate();
|
||||
if (recoverMissedSchedules == RecoverMissedSchedules.NONE && !Objects.equals(trigger.get().getNextExecutionDate(), nextEvaluationDate)) {
|
||||
lastUpdate = trigger.get().toBuilder().nextExecutionDate(nextEvaluationDate).build();
|
||||
|
||||
this.triggerState.update(lastUpdate);
|
||||
this.triggerState.update(lastUpdate);
|
||||
}
|
||||
}
|
||||
// Used for schedulableNextDate
|
||||
FlowWithWorkerTrigger flowWithWorkerTrigger = FlowWithWorkerTrigger.builder()
|
||||
@@ -467,9 +473,12 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
|
||||
private List<FlowWithTriggers> computeSchedulable(List<FlowWithSource> flows, List<Trigger> triggerContextsToEvaluate, ScheduleContextInterface scheduleContext) {
|
||||
List<String> flowToKeep = triggerContextsToEvaluate.stream().map(Trigger::getFlowId).toList();
|
||||
List<String> flowIds = flows.stream().map(FlowId::uidWithoutRevision).toList();
|
||||
Map<String, Trigger> triggerById = triggerContextsToEvaluate.stream().collect(Collectors.toMap(HasUID::uid, Function.identity()));
|
||||
|
||||
// delete trigger which flow has been deleted
|
||||
triggerContextsToEvaluate.stream()
|
||||
.filter(trigger -> !flows.stream().map(FlowId::uidWithoutRevision).toList().contains(FlowId.uid(trigger)))
|
||||
.filter(trigger -> !flowIds.contains(FlowId.uid(trigger)))
|
||||
.forEach(trigger -> {
|
||||
try {
|
||||
this.triggerState.delete(trigger);
|
||||
@@ -491,12 +500,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
.map(abstractTrigger -> {
|
||||
RunContext runContext = runContextFactory.of(flow, abstractTrigger);
|
||||
ConditionContext conditionContext = conditionService.conditionContext(runContext, flow, null);
|
||||
Trigger triggerContext = null;
|
||||
Trigger lastTrigger = triggerContextsToEvaluate
|
||||
.stream()
|
||||
.filter(triggerContextToFind -> triggerContextToFind.uid().equals(Trigger.uid(flow, abstractTrigger)))
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
Trigger triggerContext;
|
||||
Trigger lastTrigger = triggerById.get(Trigger.uid(flow, abstractTrigger));
|
||||
// If a trigger is not found in triggers to evaluate, then we ignore it
|
||||
if (lastTrigger == null) {
|
||||
return null;
|
||||
@@ -1006,8 +1011,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
|
||||
setState(ServiceState.TERMINATING);
|
||||
this.receiveCancellations.forEach(Runnable::run);
|
||||
this.scheduleExecutor.shutdown();
|
||||
this.executionMonitorExecutor.shutdown();
|
||||
ExecutorsUtils.closeScheduledThreadPool(this.scheduleExecutor, Duration.ofSeconds(5), List.of(scheduledFuture));
|
||||
ExecutorsUtils.closeScheduledThreadPool(executionMonitorExecutor, Duration.ofSeconds(5), List.of(executionMonitorFuture));
|
||||
try {
|
||||
if (onClose != null) {
|
||||
onClose.run();
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.core.server;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.kestra.core.utils.ExecutorsUtils;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
@@ -8,9 +9,11 @@ import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
@@ -25,6 +28,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
|
||||
protected final ServerConfig serverConfig;
|
||||
private final AtomicBoolean isStopped = new AtomicBoolean(false);
|
||||
private ScheduledExecutorService scheduledExecutorService;
|
||||
private ScheduledFuture<?> scheduledFuture;
|
||||
private Instant lastScheduledExecution;
|
||||
|
||||
/**
|
||||
@@ -98,7 +102,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
|
||||
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, name));
|
||||
Duration scheduleInterval = getScheduleInterval();
|
||||
log.debug("Scheduling '{}' at fixed rate {}.", name, scheduleInterval);
|
||||
scheduledExecutorService.scheduleAtFixedRate(
|
||||
scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(
|
||||
this,
|
||||
0,
|
||||
scheduleInterval.toSeconds(),
|
||||
@@ -133,20 +137,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
|
||||
@Override
|
||||
public void close() {
|
||||
if (isStopped.compareAndSet(false, true) && scheduledExecutorService != null) {
|
||||
scheduledExecutorService.shutdown();
|
||||
if (scheduledExecutorService.isTerminated()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (!scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS)) {
|
||||
log.debug("Failed to wait for scheduled '{}' task termination. Cause: Timeout", name);
|
||||
}
|
||||
log.debug("Stopped scheduled '{}' task.", name);
|
||||
} catch (InterruptedException e) {
|
||||
scheduledExecutorService.shutdownNow();
|
||||
Thread.currentThread().interrupt();
|
||||
log.debug("Failed to wait for scheduled '{}' task termination. Cause: Interrupted.", name);
|
||||
}
|
||||
ExecutorsUtils.closeScheduledThreadPool(scheduledExecutorService, Duration.ofSeconds(5), List.of(scheduledFuture));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.flows.input.InputAndValue;
|
||||
import io.kestra.core.models.hierarchies.AbstractGraphTask;
|
||||
import io.kestra.core.models.hierarchies.GraphCluster;
|
||||
import io.kestra.core.models.tasks.FlowableTask;
|
||||
import io.kestra.core.models.tasks.ResolvedTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.tasks.retrys.AbstractRetry;
|
||||
@@ -56,8 +57,7 @@ import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
import static io.kestra.core.utils.Rethrow.throwPredicate;
|
||||
import static io.kestra.core.utils.Rethrow.*;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
@@ -122,21 +122,38 @@ public class ExecutionService {
|
||||
* Retry set the given taskRun in created state
|
||||
* and return the execution in running state
|
||||
**/
|
||||
public Execution retryTask(Execution execution, String taskRunId) {
|
||||
List<TaskRun> newTaskRuns = execution
|
||||
.getTaskRunList()
|
||||
.stream()
|
||||
.map(taskRun -> {
|
||||
if (taskRun.getId().equals(taskRunId)) {
|
||||
return taskRun
|
||||
.withState(State.Type.CREATED);
|
||||
public Execution retryTask(Execution execution, Flow flow, String taskRunId) throws InternalException {
|
||||
TaskRun taskRun = execution.findTaskRunByTaskRunId(taskRunId).withState(State.Type.CREATED);
|
||||
List<TaskRun> taskRunList = execution.getTaskRunList();
|
||||
|
||||
if (taskRun.getParentTaskRunId() != null) {
|
||||
// we need to find the parent to remove any errors or finally tasks already executed
|
||||
TaskRun parentTaskRun = execution.findTaskRunByTaskRunId(taskRun.getParentTaskRunId());
|
||||
Task parentTask = flow.findTaskByTaskId(parentTaskRun.getTaskId());
|
||||
if (parentTask instanceof FlowableTask<?> flowableTask) {
|
||||
if (flowableTask.getErrors() != null) {
|
||||
List<Task> allErrors = Stream.concat(flowableTask.getErrors().stream()
|
||||
.filter(task -> task.isFlowable() && ((FlowableTask<?>) task).getErrors() != null)
|
||||
.flatMap(task -> ((FlowableTask<?>) task).getErrors().stream()),
|
||||
flowableTask.getErrors().stream())
|
||||
.toList();
|
||||
allErrors.forEach(error -> taskRunList.removeIf(t -> t.getTaskId().equals(error.getId())));
|
||||
}
|
||||
|
||||
return taskRun;
|
||||
})
|
||||
.toList();
|
||||
if (flowableTask.getFinally() != null) {
|
||||
List<Task> allFinally = Stream.concat(flowableTask.getFinally().stream()
|
||||
.filter(task -> task.isFlowable() && ((FlowableTask<?>) task).getFinally() != null)
|
||||
.flatMap(task -> ((FlowableTask<?>) task).getFinally().stream()),
|
||||
flowableTask.getFinally().stream())
|
||||
.toList();
|
||||
allFinally.forEach(error -> taskRunList.removeIf(t -> t.getTaskId().equals(error.getId())));
|
||||
}
|
||||
}
|
||||
|
||||
return execution.withTaskRunList(newTaskRuns).withState(State.Type.RUNNING);
|
||||
return execution.withTaskRunList(taskRunList).withTaskRun(taskRun).withState(State.Type.RUNNING);
|
||||
}
|
||||
|
||||
return execution.withTaskRun(taskRun).withState(State.Type.RUNNING);
|
||||
}
|
||||
|
||||
public Execution retryWaitFor(Execution execution, String flowableTaskRunId) {
|
||||
@@ -317,6 +334,32 @@ public class ExecutionService {
|
||||
return revision != null ? newExecution.withFlowRevision(revision) : newExecution;
|
||||
}
|
||||
|
||||
public Execution changeTaskRunState(final Execution execution, Flow flow, String taskRunId, State.Type newState) throws Exception {
|
||||
Execution newExecution = markAs(execution, flow, taskRunId, newState);
|
||||
|
||||
// if the execution was terminated, it could have executed errors/finally/afterExecutions, we must remove them as the execution will be restarted
|
||||
if (execution.getState().isTerminated()) {
|
||||
List<TaskRun> newTaskRuns = newExecution.getTaskRunList();
|
||||
// We need to remove global error tasks and flowable error tasks if any
|
||||
flow
|
||||
.allErrorsWithChildren()
|
||||
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
|
||||
|
||||
// We need to remove global finally tasks and flowable error tasks if any
|
||||
flow
|
||||
.allFinallyWithChildren()
|
||||
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
|
||||
|
||||
// We need to remove afterExecution tasks
|
||||
ListUtils.emptyOnNull(flow.getAfterExecution())
|
||||
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
|
||||
|
||||
return newExecution.withTaskRunList(newTaskRuns);
|
||||
} else {
|
||||
return newExecution;
|
||||
}
|
||||
}
|
||||
|
||||
public Execution markAs(final Execution execution, FlowInterface flow, String taskRunId, State.Type newState) throws Exception {
|
||||
return this.markAs(execution, flow, taskRunId, newState, null, null);
|
||||
}
|
||||
@@ -396,7 +439,8 @@ public class ExecutionService {
|
||||
@Nullable String flowId,
|
||||
@Nullable ZonedDateTime startDate,
|
||||
@Nullable ZonedDateTime endDate,
|
||||
@Nullable List<State.Type> state
|
||||
@Nullable List<State.Type> state,
|
||||
int batchSize
|
||||
) throws IOException {
|
||||
PurgeResult purgeResult = this.executionRepository
|
||||
.find(
|
||||
@@ -413,24 +457,27 @@ public class ExecutionService {
|
||||
null,
|
||||
true
|
||||
)
|
||||
.map(throwFunction(execution -> {
|
||||
.buffer(batchSize)
|
||||
.map(throwFunction(executions -> {
|
||||
PurgeResult.PurgeResultBuilder<?, ?> builder = PurgeResult.builder();
|
||||
|
||||
if (purgeExecution) {
|
||||
builder.executionsCount(this.executionRepository.purge(execution));
|
||||
builder.executionsCount(this.executionRepository.purge(executions));
|
||||
}
|
||||
|
||||
if (purgeLog) {
|
||||
builder.logsCount(this.logRepository.purge(execution));
|
||||
builder.logsCount(this.logRepository.purge(executions));
|
||||
}
|
||||
|
||||
if (purgeMetric) {
|
||||
builder.metricsCount(this.metricRepository.purge(execution));
|
||||
builder.metricsCount(this.metricRepository.purge(executions));
|
||||
}
|
||||
|
||||
if (purgeStorage) {
|
||||
URI uri = StorageContext.forExecution(execution).getExecutionStorageURI(StorageContext.KESTRA_SCHEME);
|
||||
builder.storagesCount(storageInterface.deleteByPrefix(execution.getTenantId(), execution.getNamespace(), uri).size());
|
||||
executions.forEach(throwConsumer(execution -> {
|
||||
URI uri = StorageContext.forExecution(execution).getExecutionStorageURI(StorageContext.KESTRA_SCHEME);
|
||||
builder.storagesCount(storageInterface.deleteByPrefix(execution.getTenantId(), execution.getNamespace(), uri).size());
|
||||
}));
|
||||
}
|
||||
|
||||
return (PurgeResult) builder.build();
|
||||
@@ -671,7 +718,7 @@ public class ExecutionService {
|
||||
// An edge case can exist where the execution is resumed automatically before we resume it with a killing.
|
||||
try {
|
||||
newExecution = this.resume(execution, flow, State.Type.KILLING, null);
|
||||
newExecution = newExecution.withState(afterKillState.orElse(newExecution.getState().getCurrent()));
|
||||
newExecution = newExecution.withState(killingOrAfterKillState);
|
||||
} catch (Exception e) {
|
||||
// if we cannot resume, we set it anyway to killing, so we don't throw
|
||||
log.warn("Unable to resume a paused execution before killing it", e);
|
||||
@@ -684,6 +731,7 @@ public class ExecutionService {
|
||||
eventPublisher.publishEvent(new CrudEvent<>(newExecution, execution, CrudEventType.UPDATE));
|
||||
return newExecution;
|
||||
}
|
||||
|
||||
public Execution kill(Execution execution, FlowInterface flow) {
|
||||
return this.kill(execution, flow, Optional.empty());
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.FlowWithException;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.flows.GenericFlow;
|
||||
import io.kestra.core.models.tasks.RunnableTask;
|
||||
import io.kestra.core.models.topologies.FlowTopology;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.validations.ModelValidator;
|
||||
@@ -51,7 +52,6 @@ import java.util.stream.StreamSupport;
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class FlowService {
|
||||
|
||||
@Inject
|
||||
Optional<FlowRepositoryInterface> flowRepository;
|
||||
|
||||
@@ -236,6 +236,7 @@ public class FlowService {
|
||||
}
|
||||
|
||||
List<String> warnings = new ArrayList<>(checkValidSubflows(flow, tenantId));
|
||||
|
||||
List<io.kestra.plugin.core.trigger.Flow> flowTriggers = ListUtils.emptyOnNull(flow.getTriggers()).stream()
|
||||
.filter(io.kestra.plugin.core.trigger.Flow.class::isInstance)
|
||||
.map(io.kestra.plugin.core.trigger.Flow.class::cast)
|
||||
@@ -246,6 +247,21 @@ public class FlowService {
|
||||
}
|
||||
});
|
||||
|
||||
// add warning for runnable properties (timeout, workerGroup, taskCache) when used not in a runnable
|
||||
flow.allTasksWithChilds().forEach(task -> {
|
||||
if (!(task instanceof RunnableTask<?>)) {
|
||||
if (task.getTimeout() != null) {
|
||||
warnings.add("The task '" + task.getId() + "' cannot use the 'timeout' property as it's only relevant for runnable tasks.");
|
||||
}
|
||||
if (task.getTaskCache() != null) {
|
||||
warnings.add("The task '" + task.getId() + "' cannot use the 'taskCache' property as it's only relevant for runnable tasks.");
|
||||
}
|
||||
if (task.getWorkerGroup() != null) {
|
||||
warnings.add("The task '" + task.getId() + "' cannot use the 'workerGroup' property as it's only relevant for runnable tasks.");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return warnings;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.core.services;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionKind;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowWithException;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
@@ -10,7 +11,6 @@ import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInte
|
||||
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
@@ -24,16 +24,16 @@ import java.util.stream.Stream;
|
||||
|
||||
@Singleton
|
||||
public class FlowTriggerService {
|
||||
@Inject
|
||||
private ConditionService conditionService;
|
||||
private final ConditionService conditionService;
|
||||
private final RunContextFactory runContextFactory;
|
||||
private final FlowService flowService;
|
||||
|
||||
@Inject
|
||||
private RunContextFactory runContextFactory;
|
||||
public FlowTriggerService(ConditionService conditionService, RunContextFactory runContextFactory, FlowService flowService) {
|
||||
this.conditionService = conditionService;
|
||||
this.runContextFactory = runContextFactory;
|
||||
this.flowService = flowService;
|
||||
}
|
||||
|
||||
@Inject
|
||||
private FlowService flowService;
|
||||
|
||||
// used in EE only
|
||||
public Stream<FlowWithFlowTrigger> withFlowTriggersOnly(Stream<FlowWithSource> allFlows) {
|
||||
return allFlows
|
||||
.filter(flow -> !flow.isDisabled())
|
||||
@@ -53,6 +53,8 @@ public class FlowTriggerService {
|
||||
List<FlowWithFlowTrigger> validTriggersBeforeMultipleConditionEval = allFlows.stream()
|
||||
// prevent recursive flow triggers
|
||||
.filter(flow -> flowService.removeUnwanted(flow, execution))
|
||||
// filter out Test Executions
|
||||
.filter(flow -> execution.getKind() == null)
|
||||
// ensure flow & triggers are enabled
|
||||
.filter(flow -> !flow.isDisabled() && !(flow instanceof FlowWithException))
|
||||
.filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty())
|
||||
|
||||
@@ -173,18 +173,15 @@ public class PluginDefaultService {
|
||||
try {
|
||||
return this.injectAllDefaults(flow, false);
|
||||
} catch (Exception e) {
|
||||
RunContextLogger
|
||||
.logEntries(
|
||||
Execution.loggingEventFromException(e),
|
||||
LogEntry.of(execution)
|
||||
)
|
||||
.forEach(logEntry -> {
|
||||
try {
|
||||
logQueue.emitAsync(logEntry);
|
||||
} catch (QueueException e1) {
|
||||
// silently do nothing
|
||||
}
|
||||
});
|
||||
try {
|
||||
logQueue.emitAsync(RunContextLogger
|
||||
.logEntries(
|
||||
Execution.loggingEventFromException(e),
|
||||
LogEntry.of(execution)
|
||||
));
|
||||
} catch (QueueException e1) {
|
||||
// silently do nothing
|
||||
}
|
||||
return readWithoutDefaultsOrThrow(flow);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package io.kestra.core.services;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.util.Collections;
|
||||
@@ -14,6 +15,7 @@ public class SkipExecutionService {
|
||||
private volatile List<FlowId> skipFlows = Collections.emptyList();
|
||||
private volatile List<NamespaceId> skipNamespaces = Collections.emptyList();
|
||||
private volatile List<String> skipTenants = Collections.emptyList();
|
||||
private volatile List<String> skipIndexerRecords = Collections.emptyList();
|
||||
|
||||
public synchronized void setSkipExecutions(List<String> skipExecutions) {
|
||||
this.skipExecutions = skipExecutions == null ? Collections.emptyList() : skipExecutions;
|
||||
@@ -31,6 +33,10 @@ public class SkipExecutionService {
|
||||
this.skipTenants = skipTenants == null ? Collections.emptyList() : skipTenants;
|
||||
}
|
||||
|
||||
public synchronized void setSkipIndexerRecords(List<String> skipIndexerRecords) {
|
||||
this.skipIndexerRecords = skipIndexerRecords == null ? Collections.emptyList() : skipIndexerRecords;
|
||||
}
|
||||
|
||||
/**
|
||||
* Warning: this method didn't check the flow, so it must be used only when neither of the others can be used.
|
||||
*/
|
||||
@@ -46,6 +52,14 @@ public class SkipExecutionService {
|
||||
return skipExecution(taskRun.getTenantId(), taskRun.getNamespace(), taskRun.getFlowId(), taskRun.getExecutionId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Skip an indexer records based on its key.
|
||||
* @param key the record key as computed by <code>QueueService.key(record)</code>, can be null
|
||||
*/
|
||||
public boolean skipIndexerRecord(@Nullable String key) {
|
||||
return key != null && skipIndexerRecords.contains(key);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
boolean skipExecution(String tenant, String namespace, String flow, String executionId) {
|
||||
return (tenant != null && skipTenants.contains(tenant)) ||
|
||||
|
||||
@@ -3,12 +3,16 @@ package io.kestra.core.utils;
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class ExecutorsUtils {
|
||||
@Inject
|
||||
private ThreadMainFactoryBuilder threadFactoryBuilder;
|
||||
@@ -61,6 +65,29 @@ public class ExecutorsUtils {
|
||||
);
|
||||
}
|
||||
|
||||
public static void closeScheduledThreadPool(ScheduledExecutorService scheduledExecutorService, Duration gracePeriod, List<ScheduledFuture<?>> taskFutures) {
|
||||
scheduledExecutorService.shutdown();
|
||||
if (scheduledExecutorService.isTerminated()) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
if (!scheduledExecutorService.awaitTermination(gracePeriod.toMillis(), TimeUnit.MILLISECONDS)) {
|
||||
log.warn("Failed to shutdown the ScheduledThreadPoolExecutor during grace period, forcing it to shutdown now");
|
||||
|
||||
// Ensure the scheduled task reaches a terminal state to avoid possible memory leak
|
||||
ListUtils.emptyOnNull(taskFutures).forEach(taskFuture -> taskFuture.cancel(true));
|
||||
|
||||
scheduledExecutorService.shutdownNow();
|
||||
}
|
||||
log.debug("Stopped scheduled ScheduledThreadPoolExecutor.");
|
||||
} catch (InterruptedException e) {
|
||||
scheduledExecutorService.shutdownNow();
|
||||
Thread.currentThread().interrupt();
|
||||
log.debug("Failed to shutdown the ScheduledThreadPoolExecutor.");
|
||||
}
|
||||
}
|
||||
|
||||
private ExecutorService wrap(String name, ExecutorService executorService) {
|
||||
return ExecutorServiceMetrics.monitor(
|
||||
meterRegistry,
|
||||
|
||||
@@ -54,9 +54,10 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
|
||||
violations.add("Namespace '" + value.getNamespace() + "' does not exist but is required to exist before a flow can be created in it.");
|
||||
}
|
||||
|
||||
List<Task> allTasks = value.allTasksWithChilds();
|
||||
|
||||
// tasks unique id
|
||||
List<String> taskIds = value.allTasksWithChilds()
|
||||
.stream()
|
||||
List<String> taskIds = allTasks.stream()
|
||||
.map(Task::getId)
|
||||
.toList();
|
||||
|
||||
@@ -72,8 +73,8 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
|
||||
violations.add("Duplicate trigger id with name [" + String.join(", ", duplicateIds) + "]");
|
||||
}
|
||||
|
||||
value.allTasksWithChilds()
|
||||
.stream().filter(task -> task instanceof ExecutableTask<?> executableTask
|
||||
allTasks.stream()
|
||||
.filter(task -> task instanceof ExecutableTask<?> executableTask
|
||||
&& value.getId().equals(executableTask.subflowId().flowId())
|
||||
&& value.getNamespace().equals(executableTask.subflowId().namespace()))
|
||||
.forEach(task -> violations.add("Recursive call to flow [" + value.getNamespace() + "." + value.getId() + "]"));
|
||||
@@ -102,7 +103,7 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
|
||||
.map(input -> Pattern.compile("\\{\\{\\s*inputs." + input.getId() + "\\s*\\}\\}"))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
List<String> invalidTasks = value.allTasks()
|
||||
List<String> invalidTasks = allTasks.stream()
|
||||
.filter(task -> checkObjectFieldsWithPatterns(task, inputsWithMinusPatterns))
|
||||
.map(task -> task.getId())
|
||||
.collect(Collectors.toList());
|
||||
@@ -112,12 +113,12 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
|
||||
" [" + String.join(", ", invalidTasks) + "]");
|
||||
}
|
||||
|
||||
List<Pattern> outputsWithMinusPattern = value.allTasks()
|
||||
List<Pattern> outputsWithMinusPattern = allTasks.stream()
|
||||
.filter(output -> Optional.ofNullable(output.getId()).orElse("").contains("-"))
|
||||
.map(output -> Pattern.compile("\\{\\{\\s*outputs\\." + output.getId() + "\\.[^}]+\\s*\\}\\}"))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
invalidTasks = value.allTasks()
|
||||
invalidTasks = allTasks.stream()
|
||||
.filter(task -> checkObjectFieldsWithPatterns(task, outputsWithMinusPattern))
|
||||
.map(task -> task.getId())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
@@ -90,7 +90,7 @@ public class ExecutionOutputs extends Condition implements ScheduleCondition {
|
||||
private static final String OUTPUTS_VAR = "outputs";
|
||||
|
||||
@NotNull
|
||||
private Property<String> expression;
|
||||
private Property<Boolean> expression;
|
||||
|
||||
/** {@inheritDoc} **/
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -105,9 +105,8 @@ public class ExecutionOutputs extends Condition implements ScheduleCondition {
|
||||
conditionContext.getVariables(),
|
||||
Map.of(TRIGGER_VAR, Map.of(OUTPUTS_VAR, conditionContext.getExecution().getOutputs()))
|
||||
);
|
||||
|
||||
String render = conditionContext.getRunContext().render(expression).as(String.class, variables).orElseThrow();
|
||||
return !(render.isBlank() || render.trim().equals("false"));
|
||||
|
||||
return conditionContext.getRunContext().render(expression).skipCache().as(Boolean.class, variables).orElseThrow();
|
||||
}
|
||||
|
||||
private boolean hasNoOutputs(final Execution execution) {
|
||||
|
||||
@@ -19,7 +19,6 @@ import lombok.experimental.SuperBuilder;
|
||||
@NoArgsConstructor
|
||||
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
|
||||
@EqualsAndHashCode
|
||||
//@TriggersDataFilterValidation
|
||||
@Schema(
|
||||
title = "Display Execution data in a dashboard chart.",
|
||||
description = "Execution data can be displayed in charts broken out by Namespace and filtered by State, for example."
|
||||
|
||||
@@ -21,6 +21,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
@@ -68,6 +69,7 @@ import java.util.Optional;
|
||||
)
|
||||
}
|
||||
)
|
||||
@Slf4j
|
||||
public class Exit extends Task implements ExecutionUpdatableTask {
|
||||
@NotNull
|
||||
@Schema(
|
||||
@@ -104,12 +106,13 @@ public class Exit extends Task implements ExecutionUpdatableTask {
|
||||
// ends all parents
|
||||
while (newTaskRun.getParentTaskRunId() != null) {
|
||||
newTaskRun = newExecution.findTaskRunByTaskRunId(newTaskRun.getParentTaskRunId()).withState(exitState);
|
||||
newExecution = execution.withTaskRun(newTaskRun);
|
||||
newExecution = newExecution.withTaskRun(newTaskRun);
|
||||
}
|
||||
return newExecution;
|
||||
} catch (InternalException e) {
|
||||
// in case we cannot update the last not terminated task run, we ignore it
|
||||
return execution;
|
||||
log.warn("Unable to update the taskrun state", e);
|
||||
return execution.withState(exitState);
|
||||
}
|
||||
})
|
||||
.orElse(execution)
|
||||
|
||||
@@ -102,6 +102,14 @@ public class PurgeExecutions extends Task implements RunnableTask<PurgeExecution
|
||||
@Builder.Default
|
||||
private Property<Boolean> purgeStorage = Property.ofValue(true);
|
||||
|
||||
@Schema(
|
||||
title = "The size of the bulk delete",
|
||||
description = "For performance, deletion is made by batch of by default 100 executions/logs/metrics."
|
||||
)
|
||||
@Builder.Default
|
||||
@NotNull
|
||||
private Property<Integer> batchSize = Property.ofValue(100);
|
||||
|
||||
@Override
|
||||
public PurgeExecutions.Output run(RunContext runContext) throws Exception {
|
||||
ExecutionService executionService = ((DefaultRunContext)runContext).getApplicationContext().getBean(ExecutionService.class);
|
||||
@@ -124,9 +132,10 @@ public class PurgeExecutions extends Task implements RunnableTask<PurgeExecution
|
||||
flowInfo.tenantId(),
|
||||
renderedNamespace,
|
||||
runContext.render(flowId).as(String.class).orElse(null),
|
||||
startDate != null ? ZonedDateTime.parse(runContext.render(startDate).as(String.class).orElseThrow()) : null,
|
||||
runContext.render(startDate).as(String.class).map(ZonedDateTime::parse).orElse(null),
|
||||
ZonedDateTime.parse(runContext.render(endDate).as(String.class).orElseThrow()),
|
||||
this.states == null ? null : runContext.render(this.states).asList(State.Type.class)
|
||||
this.states == null ? null : runContext.render(this.states).asList(State.Type.class),
|
||||
runContext.render(this.batchSize).as(Integer.class).orElseThrow()
|
||||
);
|
||||
|
||||
return Output.builder()
|
||||
|
||||
@@ -208,48 +208,50 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
boolean isOutputsAllowed = runContext
|
||||
.<Boolean>pluginConfiguration(PLUGIN_FLOW_OUTPUTS_ENABLED)
|
||||
.orElse(true);
|
||||
|
||||
final Output.OutputBuilder builder = Output.builder()
|
||||
.executionId(execution.getId())
|
||||
.state(execution.getState().getCurrent());
|
||||
|
||||
final Map<String, Object> subflowOutputs = Optional
|
||||
.ofNullable(flow.getOutputs())
|
||||
.map(outputs -> outputs
|
||||
.stream()
|
||||
.collect(Collectors.toMap(
|
||||
io.kestra.core.models.flows.Output::getId,
|
||||
io.kestra.core.models.flows.Output::getValue)
|
||||
)
|
||||
)
|
||||
.orElseGet(() -> isOutputsAllowed ? this.getOutputs() : null);
|
||||
|
||||
VariablesService variablesService = ((DefaultRunContext) runContext).getApplicationContext().getBean(VariablesService.class);
|
||||
if (subflowOutputs != null) {
|
||||
try {
|
||||
Map<String, Object> outputs = runContext.render(subflowOutputs);
|
||||
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
|
||||
if (flow.getOutputs() != null && flowInputOutput != null) {
|
||||
outputs = flowInputOutput.typedOutputs(flow, execution, outputs);
|
||||
}
|
||||
builder.outputs(outputs);
|
||||
} catch (Exception e) {
|
||||
runContext.logger().warn("Failed to extract outputs with the error: '{}'", e.getLocalizedMessage(), e);
|
||||
var state = State.Type.fail(this);
|
||||
Variables variables = variablesService.of(StorageContext.forTask(taskRun), builder.build());
|
||||
taskRun = taskRun
|
||||
.withState(state)
|
||||
.withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(state)).build()))
|
||||
.withOutputs(variables);
|
||||
if (this.wait) { // we only compute outputs if we wait for the subflow
|
||||
boolean isOutputsAllowed = runContext
|
||||
.<Boolean>pluginConfiguration(PLUGIN_FLOW_OUTPUTS_ENABLED)
|
||||
.orElse(true);
|
||||
|
||||
return Optional.of(SubflowExecutionResult.builder()
|
||||
.executionId(execution.getId())
|
||||
.state(State.Type.FAILED)
|
||||
.parentTaskRun(taskRun)
|
||||
.build());
|
||||
final Map<String, Object> subflowOutputs = Optional
|
||||
.ofNullable(flow.getOutputs())
|
||||
.map(outputs -> outputs
|
||||
.stream()
|
||||
.collect(Collectors.toMap(
|
||||
io.kestra.core.models.flows.Output::getId,
|
||||
io.kestra.core.models.flows.Output::getValue)
|
||||
)
|
||||
)
|
||||
.orElseGet(() -> isOutputsAllowed ? this.getOutputs() : null);
|
||||
|
||||
if (subflowOutputs != null) {
|
||||
try {
|
||||
Map<String, Object> outputs = runContext.render(subflowOutputs);
|
||||
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
|
||||
if (flow.getOutputs() != null && flowInputOutput != null) {
|
||||
outputs = flowInputOutput.typedOutputs(flow, execution, outputs);
|
||||
}
|
||||
builder.outputs(outputs);
|
||||
} catch (Exception e) {
|
||||
runContext.logger().warn("Failed to extract outputs with the error: '{}'", e.getLocalizedMessage(), e);
|
||||
var state = State.Type.fail(this);
|
||||
Variables variables = variablesService.of(StorageContext.forTask(taskRun), builder.build());
|
||||
taskRun = taskRun
|
||||
.withState(state)
|
||||
.withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(state)).build()))
|
||||
.withOutputs(variables);
|
||||
|
||||
return Optional.of(SubflowExecutionResult.builder()
|
||||
.executionId(execution.getId())
|
||||
.state(State.Type.FAILED)
|
||||
.parentTaskRun(taskRun)
|
||||
.build());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -102,7 +102,7 @@ public class Switch extends Task implements FlowableTask<Switch.Output> {
|
||||
@Schema(
|
||||
title = "The map of keys and a list of tasks to be executed if the conditional `value` matches the key."
|
||||
)
|
||||
@PluginProperty
|
||||
@PluginProperty(additionalProperties = Task[].class)
|
||||
private Map<String, List<Task>> cases;
|
||||
|
||||
@Valid
|
||||
|
||||
@@ -173,8 +173,8 @@ public class Download extends AbstractHttp implements RunnableTask<Download.Outp
|
||||
if (path.indexOf('/') != -1) {
|
||||
path = path.substring(path.lastIndexOf('/')); // keep the last segment
|
||||
}
|
||||
if (path.indexOf('.') != -1) {
|
||||
return path.substring(path.indexOf('.'));
|
||||
if (path.lastIndexOf('.') != -1) {
|
||||
return path.substring(path.lastIndexOf('.'));
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -9,11 +9,11 @@ import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.services.FlowService;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.codehaus.commons.nullanalysis.NotNull;
|
||||
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
|
||||
@@ -103,8 +103,10 @@ public class Set extends Task implements RunnableTask<VoidOutput> {
|
||||
|
||||
KVStore kvStore = runContext.namespaceKv(renderedNamespace);
|
||||
|
||||
if (kvType != null && renderedValue instanceof String renderedValueStr) {
|
||||
renderedValue = switch (runContext.render(kvType).as(KVType.class).orElseThrow()) {
|
||||
if (kvType != null){
|
||||
KVType renderedKvType = runContext.render(kvType).as(KVType.class).orElseThrow();
|
||||
if (renderedValue instanceof String renderedValueStr) {
|
||||
renderedValue = switch (renderedKvType) {
|
||||
case NUMBER -> JacksonMapper.ofJson().readValue(renderedValueStr, Number.class);
|
||||
case BOOLEAN -> Boolean.parseBoolean((String) renderedValue);
|
||||
case DATETIME, DATE -> Instant.parse(renderedValueStr);
|
||||
@@ -112,7 +114,10 @@ public class Set extends Task implements RunnableTask<VoidOutput> {
|
||||
case JSON -> JacksonMapper.toObject(renderedValueStr);
|
||||
default -> renderedValue;
|
||||
};
|
||||
} else if (renderedValue instanceof Number valueNumber && renderedKvType == KVType.STRING) {
|
||||
renderedValue = valueNumber.toString();
|
||||
}
|
||||
}
|
||||
|
||||
kvStore.put(renderedKey, new KVValueAndMetadata(
|
||||
new KVMetadata(
|
||||
|
||||
@@ -56,7 +56,8 @@ public class OverrideRetryInterceptor implements MethodInterceptor<Object, Objec
|
||||
retry.get("delay", Duration.class).orElse(Duration.ofSeconds(1)),
|
||||
retry.get("maxDelay", Duration.class).orElse(null),
|
||||
new DefaultRetryPredicate(resolveIncludes(retry, "includes"), resolveIncludes(retry, "excludes")),
|
||||
Throwable.class
|
||||
Throwable.class,
|
||||
0
|
||||
);
|
||||
|
||||
MutableConvertibleValues<Object> attrs = context.getAttributes();
|
||||
|
||||
@@ -0,0 +1,11 @@
|
||||
<svg width="512" height="512" viewBox="0 0 512 512" fill="currentColor" xmlns="http://www.w3.org/2000/svg">
|
||||
<g clip-path="url(#clip0_1765_9330)">
|
||||
<path d="M244.592 215.915C251.569 208.938 262.881 208.938 269.858 215.915L298.537 244.595C305.514 251.572 305.514 262.883 298.537 269.86L269.858 298.54C262.881 305.517 251.569 305.517 244.592 298.54L215.913 269.86C208.936 262.883 208.936 251.572 215.913 244.595L244.592 215.915Z" />
|
||||
<path d="M376.685 215.687C383.537 208.835 394.646 208.835 401.498 215.687L430.63 244.818C437.482 251.67 437.482 262.78 430.63 269.632L401.498 298.763C394.646 305.615 383.537 305.615 376.685 298.763L347.553 269.632C340.701 262.78 340.701 251.67 347.553 244.818L376.685 215.687Z" />
|
||||
<path d="M244.818 83.8243C251.671 76.9722 262.78 76.9722 269.632 83.8243L298.763 112.956C305.616 119.808 305.616 130.917 298.763 137.769L269.632 166.901C262.78 173.753 251.671 173.753 244.818 166.901L215.687 137.769C208.835 130.917 208.835 119.808 215.687 112.956L244.818 83.8243Z" />
|
||||
<path d="M232.611 178.663C239.588 185.64 239.588 196.951 232.611 203.928L203.931 232.608C196.955 239.585 185.643 239.585 178.666 232.608L149.986 203.928C143.01 196.952 143.01 185.64 149.986 178.663L178.666 149.983C185.643 143.006 196.955 143.006 203.931 149.983L232.611 178.663Z" />
|
||||
<path d="M166.901 244.818C173.753 251.67 173.753 262.78 166.901 269.632L137.77 298.763C130.918 305.615 119.808 305.615 112.956 298.763L83.8246 269.632C76.9725 262.78 76.9725 251.67 83.8246 244.818L112.956 215.687C119.808 208.835 130.918 208.835 137.77 215.687L166.901 244.818Z" />
|
||||
<path d="M364.472 178.663C371.449 185.64 371.449 196.951 364.472 203.928L335.793 232.608C328.816 239.585 317.504 239.585 310.527 232.608L281.848 203.928C274.871 196.952 274.871 185.64 281.848 178.663L310.527 149.983C317.504 143.006 328.816 143.006 335.793 149.983L364.472 178.663Z" />
|
||||
<path d="M285.45 367.015C301.037 382.602 301.037 407.873 285.45 423.46C269.863 439.047 244.591 439.047 229.004 423.46C213.417 407.873 213.417 382.602 229.004 367.015C244.591 351.428 269.863 351.428 285.45 367.015Z" />
|
||||
</g>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 2.1 KiB |
@@ -0,0 +1,11 @@
|
||||
<svg width="512" height="512" viewBox="0 0 512 512" fill="currentColor" xmlns="http://www.w3.org/2000/svg">
|
||||
<g clip-path="url(#clip0_1765_9330)">
|
||||
<path d="M244.592 215.915C251.569 208.938 262.881 208.938 269.858 215.915L298.537 244.595C305.514 251.572 305.514 262.883 298.537 269.86L269.858 298.54C262.881 305.517 251.569 305.517 244.592 298.54L215.913 269.86C208.936 262.883 208.936 251.572 215.913 244.595L244.592 215.915Z" />
|
||||
<path d="M376.685 215.687C383.537 208.835 394.646 208.835 401.498 215.687L430.63 244.818C437.482 251.67 437.482 262.78 430.63 269.632L401.498 298.763C394.646 305.615 383.537 305.615 376.685 298.763L347.553 269.632C340.701 262.78 340.701 251.67 347.553 244.818L376.685 215.687Z" />
|
||||
<path d="M244.818 83.8243C251.671 76.9722 262.78 76.9722 269.632 83.8243L298.763 112.956C305.616 119.808 305.616 130.917 298.763 137.769L269.632 166.901C262.78 173.753 251.671 173.753 244.818 166.901L215.687 137.769C208.835 130.917 208.835 119.808 215.687 112.956L244.818 83.8243Z" />
|
||||
<path d="M232.611 178.663C239.588 185.64 239.588 196.951 232.611 203.928L203.931 232.608C196.955 239.585 185.643 239.585 178.666 232.608L149.986 203.928C143.01 196.952 143.01 185.64 149.986 178.663L178.666 149.983C185.643 143.006 196.955 143.006 203.931 149.983L232.611 178.663Z" />
|
||||
<path d="M166.901 244.818C173.753 251.67 173.753 262.78 166.901 269.632L137.77 298.763C130.918 305.615 119.808 305.615 112.956 298.763L83.8246 269.632C76.9725 262.78 76.9725 251.67 83.8246 244.818L112.956 215.687C119.808 208.835 130.918 208.835 137.77 215.687L166.901 244.818Z" />
|
||||
<path d="M364.472 178.663C371.449 185.64 371.449 196.951 364.472 203.928L335.793 232.608C328.816 239.585 317.504 239.585 310.527 232.608L281.848 203.928C274.871 196.952 274.871 185.64 281.848 178.663L310.527 149.983C317.504 143.006 328.816 143.006 335.793 149.983L364.472 178.663Z" />
|
||||
<path d="M285.45 367.015C301.037 382.602 301.037 407.873 285.45 423.46C269.863 439.047 244.591 439.047 229.004 423.46C213.417 407.873 213.417 382.602 229.004 367.015C244.591 351.428 269.863 351.428 285.45 367.015Z" />
|
||||
</g>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 2.1 KiB |
@@ -112,7 +112,7 @@ class JsonSchemaGeneratorTest {
|
||||
|
||||
var requiredWithDefault = definitions.get("io.kestra.core.docs.JsonSchemaGeneratorTest-RequiredWithDefault");
|
||||
assertThat(requiredWithDefault, is(notNullValue()));
|
||||
assertThat((List<String>) requiredWithDefault.get("required"), not(contains("requiredWithDefault")));
|
||||
assertThat((List<String>) requiredWithDefault.get("required"), not(containsInAnyOrder("requiredWithDefault", "anotherRequiredWithDefault")));
|
||||
|
||||
var properties = (Map<String, Map<String, Object>>) flow.get("properties");
|
||||
var listeners = properties.get("listeners");
|
||||
@@ -253,7 +253,7 @@ class JsonSchemaGeneratorTest {
|
||||
void requiredAreRemovedIfThereIsADefault() {
|
||||
Map<String, Object> generate = jsonSchemaGenerator.properties(Task.class, RequiredWithDefault.class);
|
||||
assertThat(generate, is(not(nullValue())));
|
||||
assertThat((List<String>) generate.get("required"), not(containsInAnyOrder("requiredWithDefault")));
|
||||
assertThat((List<String>) generate.get("required"), not(containsInAnyOrder("requiredWithDefault", "anotherRequiredWithDefault")));
|
||||
assertThat((List<String>) generate.get("required"), containsInAnyOrder("requiredWithNoDefault"));
|
||||
}
|
||||
|
||||
@@ -466,6 +466,11 @@ class JsonSchemaGeneratorTest {
|
||||
@Builder.Default
|
||||
private Property<TaskWithEnum.TestClass> requiredWithDefault = Property.ofValue(TaskWithEnum.TestClass.builder().testProperty("test").build());
|
||||
|
||||
@PluginProperty
|
||||
@NotNull
|
||||
@Builder.Default
|
||||
private Property<TaskWithEnum.TestClass> anotherRequiredWithDefault = Property.ofValue(TaskWithEnum.TestClass.builder().testProperty("test2").build());
|
||||
|
||||
@PluginProperty
|
||||
@NotNull
|
||||
private Property<TaskWithEnum.TestClass> requiredWithNoDefault;
|
||||
|
||||
@@ -44,6 +44,7 @@ import java.time.ZonedDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static io.kestra.core.models.flows.FlowScope.USER;
|
||||
@@ -392,6 +393,20 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
assertThat(full.isPresent()).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
protected void purgeExecutions() {
|
||||
var execution1 = ExecutionFixture.EXECUTION_1;
|
||||
executionRepository.save(execution1);
|
||||
var execution2 = ExecutionFixture.EXECUTION_2;
|
||||
executionRepository.save(execution2);
|
||||
|
||||
var results = executionRepository.purge(List.of(execution1, execution2));
|
||||
assertThat(results).isEqualTo(2);
|
||||
|
||||
assertThat(executionRepository.findById(null, execution1.getId())).isEmpty();
|
||||
assertThat(executionRepository.findById(null, execution2.getId())).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
protected void delete() {
|
||||
executionRepository.save(ExecutionFixture.EXECUTION_1);
|
||||
@@ -740,4 +755,16 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
|
||||
assertThat(executions.size()).isEqualTo(0L);
|
||||
}
|
||||
|
||||
@Test
|
||||
protected void shouldReturnLastExecutionsWhenInputsAreNull() {
|
||||
inject();
|
||||
|
||||
List<Execution> lastExecutions = executionRepository.lastExecutions(MAIN_TENANT, null);
|
||||
|
||||
assertThat(lastExecutions).isNotEmpty();
|
||||
Set<String> flowIds = lastExecutions.stream().map(Execution::getFlowId).collect(Collectors.toSet());
|
||||
assertThat(flowIds.size()).isEqualTo(lastExecutions.size());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -113,7 +113,8 @@ public abstract class AbstractExecutionServiceTest {
|
||||
flow.getId(),
|
||||
null,
|
||||
ZonedDateTime.now(),
|
||||
null
|
||||
null,
|
||||
100
|
||||
);
|
||||
|
||||
assertThat(purge.getExecutionsCount()).isEqualTo(1);
|
||||
@@ -131,7 +132,8 @@ public abstract class AbstractExecutionServiceTest {
|
||||
flow.getId(),
|
||||
null,
|
||||
ZonedDateTime.now(),
|
||||
null
|
||||
null,
|
||||
100
|
||||
);
|
||||
|
||||
assertThat(purge.getExecutionsCount()).isZero();
|
||||
|
||||
@@ -281,18 +281,6 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
|
||||
}
|
||||
|
||||
@Test
|
||||
void findByNamespace() {
|
||||
List<Flow> save = flowRepository.findByNamespace(MAIN_TENANT, "io.kestra.tests");
|
||||
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 22);
|
||||
|
||||
save = flowRepository.findByNamespace(MAIN_TENANT, "io.kestra.tests2");
|
||||
assertThat((long) save.size()).isEqualTo(1L);
|
||||
|
||||
save = flowRepository.findByNamespace(MAIN_TENANT, "io.kestra.tests.minimal.bis");
|
||||
assertThat((long) save.size()).isEqualTo(1L);
|
||||
}
|
||||
|
||||
@Test
|
||||
void findByNamespacePrefix() {
|
||||
List<Flow> save = flowRepository.findByNamespacePrefix(MAIN_TENANT, "io.kestra.tests");
|
||||
@@ -609,13 +597,7 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
assertThat(FlowListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L);
|
||||
assertThat(FlowListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.DELETE).count()).isEqualTo(1L);
|
||||
}
|
||||
|
||||
@Test
|
||||
void findDistinctNamespace() {
|
||||
List<String> distinctNamespace = flowRepository.findDistinctNamespace(MAIN_TENANT);
|
||||
assertThat((long) distinctNamespace.size()).isEqualTo(8L);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
protected void shouldReturnNullRevisionForNonExistingFlow() {
|
||||
assertThat(flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, IdUtils.create())).isNull();
|
||||
|
||||
@@ -13,6 +13,8 @@ import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.plugin.core.dashboard.data.Executions;
|
||||
import io.kestra.plugin.core.dashboard.data.Logs;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -41,12 +43,16 @@ public abstract class AbstractLogRepositoryTest {
|
||||
@Inject
|
||||
protected LogRepositoryInterface logRepository;
|
||||
|
||||
protected static LogEntry.LogEntryBuilder logEntry(Level level) {
|
||||
private static LogEntry.LogEntryBuilder logEntry(Level level) {
|
||||
return logEntry(level, "executionId");
|
||||
}
|
||||
|
||||
protected static LogEntry.LogEntryBuilder logEntry(Level level, String executionId) {
|
||||
return LogEntry.builder()
|
||||
.flowId("flowId")
|
||||
.namespace("io.kestra.unittest")
|
||||
.taskId("taskId")
|
||||
.executionId("executionId")
|
||||
.executionId(executionId)
|
||||
.taskRunId(IdUtils.create())
|
||||
.attemptNumber(0)
|
||||
.timestamp(Instant.now())
|
||||
@@ -343,4 +349,15 @@ public abstract class AbstractLogRepositoryTest {
|
||||
|
||||
assertThat(results).hasSize(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void purge() {
|
||||
logRepository.save(logEntry(Level.INFO, "execution1").build());
|
||||
logRepository.save(logEntry(Level.INFO, "execution1").build());
|
||||
logRepository.save(logEntry(Level.INFO, "execution2").build());
|
||||
logRepository.save(logEntry(Level.INFO, "execution2").build());
|
||||
|
||||
var result = logRepository.purge(List.of(Execution.builder().id("execution1").build(), Execution.builder().id("execution2").build()));
|
||||
assertThat(result).isEqualTo(4);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.core.repositories;
|
||||
|
||||
import com.devskiller.friendly_id.FriendlyId;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionKind;
|
||||
import io.kestra.core.models.executions.MetricEntry;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
@@ -11,6 +12,7 @@ import io.micronaut.data.model.Pageable;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.ZonedDateTime;
|
||||
@@ -115,6 +117,17 @@ public abstract class AbstractMetricRepositoryTest {
|
||||
assertThat(results).hasSize(3);
|
||||
}
|
||||
|
||||
@Test
|
||||
void purge() {
|
||||
metricRepository.save(MetricEntry.of(taskRun("execution1", "task"), counter("counter1"), null));
|
||||
metricRepository.save(MetricEntry.of(taskRun("execution1", "task"), counter("counter2"), null));
|
||||
metricRepository.save(MetricEntry.of(taskRun("execution2", "task"), counter("counter1"), null));
|
||||
metricRepository.save(MetricEntry.of(taskRun("execution2", "task"), counter("counter2"), null));
|
||||
|
||||
var result = metricRepository.purge(List.of(Execution.builder().id("execution1").build(), Execution.builder().id("execution2").build()));
|
||||
assertThat(result).isEqualTo(4);
|
||||
}
|
||||
|
||||
private Counter counter(String metricName) {
|
||||
return Counter.of(metricName, 1);
|
||||
}
|
||||
|
||||
@@ -387,6 +387,13 @@ public abstract class AbstractRunnerTest {
|
||||
forEachItemCaseTest.forEachItemInIf();
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/for-each-item-subflow-after-execution.yaml",
|
||||
"flows/valids/for-each-item-after-execution.yaml"})
|
||||
protected void forEachItemWithAfterExecution() throws Exception {
|
||||
forEachItemCaseTest.forEachItemWithAfterExecution();
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-cancel.yml"})
|
||||
void concurrencyCancel() throws Exception {
|
||||
@@ -423,6 +430,24 @@ public abstract class AbstractRunnerTest {
|
||||
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem();
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-queue-fail.yml"})
|
||||
protected void concurrencyQueueRestarted() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueueRestarted();
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-queue-after-execution.yml"})
|
||||
void concurrencyQueueAfterExecution() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueueAfterExecution();
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-subflow.yml", "flows/valids/flow-concurrency-cancel.yml"})
|
||||
void flowConcurrencySubflow() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencySubflow();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ExecuteFlow("flows/valids/executable-fail.yml")
|
||||
void badExecutable(Execution execution) {
|
||||
|
||||
@@ -0,0 +1,58 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.annotation.Factory;
|
||||
import io.micronaut.context.annotation.Primary;
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
import io.micronaut.test.annotation.MockBean;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
class CustomVariableRendererTest {
|
||||
|
||||
@Inject
|
||||
private SecureVariableRendererFactory secureVariableRendererFactory;
|
||||
|
||||
@Inject
|
||||
private VariableRenderer renderer;
|
||||
|
||||
@Test
|
||||
void shouldUseCustomVariableRender() throws IllegalVariableEvaluationException {
|
||||
// When
|
||||
String result = renderer.render("{{ dummy }}", Map.of());
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("alternativeRender");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldUseCustomVariableRenderWhenUsingSecured() throws IllegalVariableEvaluationException {
|
||||
// Given
|
||||
VariableRenderer renderer = secureVariableRendererFactory.createOrGet();
|
||||
|
||||
// When
|
||||
String result = renderer.render("{{ dummy }}", Map.of());
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("alternativeRender");
|
||||
}
|
||||
|
||||
@MockBean(VariableRenderer.class)
|
||||
VariableRenderer testCustomRenderer(ApplicationContext applicationContext) {
|
||||
return new VariableRenderer(applicationContext, null) {
|
||||
|
||||
@Override
|
||||
protected String alternativeRender(Exception e, String inline, Map<String, Object> variables) {
|
||||
return "alternativeRender";
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -119,6 +119,7 @@ class ExecutionServiceTest {
|
||||
assertThat(restart.getState().getHistories()).hasSize(4);
|
||||
assertThat(restart.getTaskRunList().stream().filter(taskRun -> taskRun.getState().getCurrent() == State.Type.RESTARTED).count()).isGreaterThan(1L);
|
||||
assertThat(restart.getTaskRunList().stream().filter(taskRun -> taskRun.getState().getCurrent() == State.Type.RUNNING).count()).isGreaterThan(1L);
|
||||
|
||||
assertThat(restart.getTaskRunList().getFirst().getId()).isEqualTo(restart.getTaskRunList().getFirst().getId());
|
||||
assertThat(restart.getLabels()).contains(new Label(Label.RESTARTED, "true"));
|
||||
}
|
||||
@@ -413,9 +414,9 @@ class ExecutionServiceTest {
|
||||
|
||||
Execution killed = executionService.kill(execution, flow);
|
||||
|
||||
assertThat(killed.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
||||
assertThat(killed.getState().getCurrent()).isEqualTo(State.Type.KILLING);
|
||||
assertThat(killed.findTaskRunsByTaskId("pause").getFirst().getState().getCurrent()).isEqualTo(State.Type.KILLED);
|
||||
assertThat(killed.getState().getHistories()).hasSize(4);
|
||||
assertThat(killed.getState().getHistories()).hasSize(5);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -442,4 +443,22 @@ class ExecutionServiceTest {
|
||||
assertThat(killed.findTaskRunsByTaskId("pause").getFirst().getState().getCurrent()).isEqualTo(State.Type.KILLED);
|
||||
assertThat(killed.getState().getHistories()).hasSize(5);
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/change-state-errors.yaml"})
|
||||
void changeStateWithErrorBranch() throws Exception {
|
||||
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "change-state-errors");
|
||||
Flow flow = flowRepository.findByExecution(execution);
|
||||
|
||||
assertThat(execution.getTaskRunList()).hasSize(3);
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||
|
||||
Execution restart = executionService.changeTaskRunState(execution, flow, execution.findTaskRunsByTaskId("make_error").getFirst().getId(), State.Type.SUCCESS);
|
||||
|
||||
assertThat(restart.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
||||
assertThat(restart.getMetadata().getAttemptNumber()).isEqualTo(2);
|
||||
assertThat(restart.getState().getHistories()).hasSize(4);
|
||||
assertThat(restart.getTaskRunList()).hasSize(2);
|
||||
assertThat(restart.findTaskRunsByTaskId("make_error").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
}
|
||||
}
|
||||
@@ -9,6 +9,7 @@ import io.micronaut.context.annotation.Property;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.File;
|
||||
@@ -97,6 +98,35 @@ class FilesServiceTest {
|
||||
assertThat(outputs.size()).isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testOutputFilesWithSpecialCharacters(@TempDir Path tempDir) throws Exception {
|
||||
var runContext = runContextFactory.of();
|
||||
|
||||
Path fileWithSpace = tempDir.resolve("with space.txt");
|
||||
Path fileWithUnicode = tempDir.resolve("สวัสดี.txt");
|
||||
|
||||
Files.writeString(fileWithSpace, "content");
|
||||
Files.writeString(fileWithUnicode, "content");
|
||||
|
||||
Path targetFileWithSpace = runContext.workingDir().path().resolve("with space.txt");
|
||||
Path targetFileWithUnicode = runContext.workingDir().path().resolve("สวัสดี.txt");
|
||||
|
||||
Files.copy(fileWithSpace, targetFileWithSpace);
|
||||
Files.copy(fileWithUnicode, targetFileWithUnicode);
|
||||
|
||||
Map<String, URI> outputFiles = FilesService.outputFiles(
|
||||
runContext,
|
||||
List.of("with space.txt", "สวัสดี.txt")
|
||||
);
|
||||
|
||||
assertThat(outputFiles).hasSize(2);
|
||||
assertThat(outputFiles).containsKey("with space.txt");
|
||||
assertThat(outputFiles).containsKey("สวัสดี.txt");
|
||||
|
||||
assertThat(runContext.storage().getFile(outputFiles.get("with space.txt"))).isNotNull();
|
||||
assertThat(runContext.storage().getFile(outputFiles.get("สวัสดี.txt"))).isNotNull();
|
||||
}
|
||||
|
||||
private URI createFile() throws IOException {
|
||||
File tempFile = File.createTempFile("file", ".txt");
|
||||
Files.write(tempFile.toPath(), "Hello World".getBytes());
|
||||
|
||||
@@ -8,6 +8,7 @@ import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.services.ExecutionService;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -53,6 +54,9 @@ public class FlowConcurrencyCaseTest {
|
||||
@Named(QueueFactoryInterface.EXECUTION_NAMED)
|
||||
protected QueueInterface<Execution> executionQueue;
|
||||
|
||||
@Inject
|
||||
private ExecutionService executionService;
|
||||
|
||||
public void flowConcurrencyCancel() throws TimeoutException, QueueException, InterruptedException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
|
||||
Execution execution2 = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel");
|
||||
@@ -278,6 +282,161 @@ public class FlowConcurrencyCaseTest {
|
||||
assertThat(terminated.getState().getCurrent()).isEqualTo(Type.SUCCESS);
|
||||
}
|
||||
|
||||
public void flowConcurrencyQueueRestarted() throws Exception {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-fail", null, null, Duration.ofSeconds(30));
|
||||
Flow flow = flowRepository
|
||||
.findById(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-fail", Optional.empty())
|
||||
.orElseThrow();
|
||||
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
||||
executionQueue.emit(execution2);
|
||||
|
||||
assertThat(execution1.getState().isRunning()).isTrue();
|
||||
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CREATED);
|
||||
|
||||
var executionResult1 = new AtomicReference<Execution>();
|
||||
var executionResult2 = new AtomicReference<Execution>();
|
||||
|
||||
CountDownLatch latch1 = new CountDownLatch(2);
|
||||
AtomicReference<Execution> failedExecution = new AtomicReference<>();
|
||||
CountDownLatch latch2 = new CountDownLatch(1);
|
||||
CountDownLatch latch3 = new CountDownLatch(1);
|
||||
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
|
||||
if (e.getLeft().getId().equals(execution1.getId())) {
|
||||
executionResult1.set(e.getLeft());
|
||||
if (e.getLeft().getState().getCurrent() == Type.FAILED) {
|
||||
failedExecution.set(e.getLeft());
|
||||
latch1.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
if (e.getLeft().getId().equals(execution2.getId())) {
|
||||
executionResult2.set(e.getLeft());
|
||||
if (e.getLeft().getState().getCurrent() == State.Type.RUNNING) {
|
||||
latch2.countDown();
|
||||
}
|
||||
if (e.getLeft().getState().getCurrent() == Type.FAILED) {
|
||||
latch3.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue(latch2.await(1, TimeUnit.MINUTES));
|
||||
assertThat(failedExecution.get()).isNotNull();
|
||||
// here the first fail and the second is now running.
|
||||
// we restart the first one, it should be queued then fail again.
|
||||
Execution restarted = executionService.restart(failedExecution.get(), null);
|
||||
executionQueue.emit(restarted);
|
||||
|
||||
assertTrue(latch3.await(1, TimeUnit.MINUTES));
|
||||
assertTrue(latch1.await(1, TimeUnit.MINUTES));
|
||||
receive.blockLast();
|
||||
|
||||
assertThat(executionResult1.get().getState().getCurrent()).isEqualTo(Type.FAILED);
|
||||
// it should have been queued after restarted
|
||||
assertThat(executionResult1.get().getState().getHistories().stream().anyMatch(history -> history.getState() == Type.RESTARTED)).isTrue();
|
||||
assertThat(executionResult1.get().getState().getHistories().stream().anyMatch(history -> history.getState() == Type.QUEUED)).isTrue();
|
||||
assertThat(executionResult2.get().getState().getCurrent()).isEqualTo(Type.FAILED);
|
||||
assertThat(executionResult2.get().getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
|
||||
assertThat(executionResult2.get().getState().getHistories().get(1).getState()).isEqualTo(State.Type.QUEUED);
|
||||
assertThat(executionResult2.get().getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
|
||||
}
|
||||
|
||||
public void flowConcurrencyQueueAfterExecution() throws TimeoutException, QueueException, InterruptedException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-after-execution", null, null, Duration.ofSeconds(30));
|
||||
Flow flow = flowRepository
|
||||
.findById(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-after-execution", Optional.empty())
|
||||
.orElseThrow();
|
||||
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
||||
executionQueue.emit(execution2);
|
||||
|
||||
assertThat(execution1.getState().isRunning()).isTrue();
|
||||
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CREATED);
|
||||
|
||||
var executionResult1 = new AtomicReference<Execution>();
|
||||
var executionResult2 = new AtomicReference<Execution>();
|
||||
|
||||
CountDownLatch latch1 = new CountDownLatch(1);
|
||||
CountDownLatch latch2 = new CountDownLatch(1);
|
||||
CountDownLatch latch3 = new CountDownLatch(1);
|
||||
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
|
||||
if (e.getLeft().getId().equals(execution1.getId())) {
|
||||
executionResult1.set(e.getLeft());
|
||||
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
|
||||
latch1.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
if (e.getLeft().getId().equals(execution2.getId())) {
|
||||
executionResult2.set(e.getLeft());
|
||||
if (e.getLeft().getState().getCurrent() == State.Type.RUNNING) {
|
||||
latch2.countDown();
|
||||
}
|
||||
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
|
||||
latch3.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue(latch1.await(1, TimeUnit.MINUTES));
|
||||
assertTrue(latch2.await(1, TimeUnit.MINUTES));
|
||||
assertTrue(latch3.await(1, TimeUnit.MINUTES));
|
||||
receive.blockLast();
|
||||
|
||||
assertThat(executionResult1.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(executionResult2.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(executionResult2.get().getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
|
||||
assertThat(executionResult2.get().getState().getHistories().get(1).getState()).isEqualTo(State.Type.QUEUED);
|
||||
assertThat(executionResult2.get().getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
|
||||
}
|
||||
|
||||
public void flowConcurrencySubflow() throws TimeoutException, QueueException, InterruptedException {
|
||||
CountDownLatch successLatch = new CountDownLatch(1);
|
||||
CountDownLatch canceledLatch = new CountDownLatch(1);
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
|
||||
if (e.getLeft().getFlowId().equals("flow-concurrency-cancel")) {
|
||||
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
|
||||
successLatch.countDown();
|
||||
}
|
||||
if (e.getLeft().getState().getCurrent() == Type.CANCELLED) {
|
||||
canceledLatch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME we should fail if we receive the cancel execution again but on Kafka it happens
|
||||
});
|
||||
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-subflow", null, null, Duration.ofSeconds(30));
|
||||
Execution execution2 = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-subflow");
|
||||
|
||||
assertThat(execution1.getState().isRunning()).isTrue();
|
||||
assertThat(execution2.getState().getCurrent()).isEqualTo(Type.SUCCESS);
|
||||
|
||||
// assert we have one canceled subflow and one in success
|
||||
assertTrue(canceledLatch.await(1, TimeUnit.MINUTES));
|
||||
assertTrue(successLatch.await(1, TimeUnit.MINUTES));
|
||||
receive.blockLast();
|
||||
|
||||
// run another execution to be sure that everything work (purge is correctly done)
|
||||
CountDownLatch newSuccessLatch = new CountDownLatch(1);
|
||||
Flux<Execution> secondReceive = TestsUtils.receive(executionQueue, e -> {
|
||||
if (e.getLeft().getFlowId().equals("flow-concurrency-cancel")) {
|
||||
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
|
||||
newSuccessLatch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME we should fail if we receive the cancel execution again but on Kafka it happens
|
||||
});
|
||||
Execution execution3 = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-subflow");
|
||||
assertThat(execution3.getState().getCurrent()).isEqualTo(Type.SUCCESS);
|
||||
|
||||
// assert we have two successful subflow
|
||||
assertTrue(newSuccessLatch.await(1, TimeUnit.MINUTES));
|
||||
secondReceive.blockLast();
|
||||
}
|
||||
|
||||
private URI storageUpload() throws URISyntaxException, IOException {
|
||||
File tempFile = File.createTempFile("file", ".txt");
|
||||
|
||||
|
||||
@@ -83,4 +83,24 @@ class RunContextPropertyTest {
|
||||
runContextProperty = new RunContextProperty<>(Property.<Map<String, String>>builder().expression("{ \"key\": \"{{ key }}\"}").build(), runContext);
|
||||
assertThat(runContextProperty.asMap(String.class, String.class, Map.of("key", "value"))).containsEntry("key", "value");
|
||||
}
|
||||
|
||||
@Test
|
||||
void asShouldReturnCachedRenderedProperty() throws IllegalVariableEvaluationException {
|
||||
var runContext = runContextFactory.of();
|
||||
|
||||
var runContextProperty = new RunContextProperty<>(Property.<String>builder().expression("{{ variable }}").build(), runContext);
|
||||
|
||||
assertThat(runContextProperty.as(String.class, Map.of("variable", "value1"))).isEqualTo(Optional.of("value1"));
|
||||
assertThat(runContextProperty.as(String.class, Map.of("variable", "value2"))).isEqualTo(Optional.of("value1"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void asShouldNotReturnCachedRenderedPropertyWithSkipCache() throws IllegalVariableEvaluationException {
|
||||
var runContext = runContextFactory.of();
|
||||
|
||||
var runContextProperty = new RunContextProperty<>(Property.<String>builder().expression("{{ variable }}").build(), runContext);
|
||||
|
||||
assertThat(runContextProperty.as(String.class, Map.of("variable", "value1"))).isEqualTo(Optional.of("value1"));
|
||||
assertThat(runContextProperty.skipCache().as(String.class, Map.of("variable", "value2"))).isEqualTo(Optional.of("value2"));
|
||||
}
|
||||
}
|
||||
@@ -140,7 +140,7 @@ class RunContextTest {
|
||||
List<LogEntry> logs = new CopyOnWriteArrayList<>();
|
||||
Flux<LogEntry> receive = TestsUtils.receive(workerTaskLogQueue, either -> logs.add(either.getLeft()));
|
||||
|
||||
char[] chars = new char[1024 * 11];
|
||||
char[] chars = new char[1024 * 16];
|
||||
Arrays.fill(chars, 'a');
|
||||
|
||||
Map<String, Object> inputs = new HashMap<>(InputsTest.inputs);
|
||||
|
||||
@@ -3,7 +3,6 @@ package io.kestra.core.runners;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.micronaut.context.annotation.Property;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
|
||||
@@ -0,0 +1,270 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.secret.SecretNotFoundException;
|
||||
import io.kestra.core.secret.SecretService;
|
||||
import io.micronaut.test.annotation.MockBean;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
||||
|
||||
/**
|
||||
* Unit tests for SecureVariableRendererFactory.
|
||||
*
|
||||
* This class tests the factory's ability to create debug renderers that:
|
||||
* - Properly mask secret functions
|
||||
* - Maintain security by preventing secret value leakage
|
||||
* - Delegate to the base renderer for non-secret operations
|
||||
* - Handle errors appropriately
|
||||
*/
|
||||
@KestraTest
|
||||
class SecureVariableRendererFactoryTest {
|
||||
|
||||
@Inject
|
||||
private SecureVariableRendererFactory secureVariableRendererFactory;
|
||||
|
||||
@Inject
|
||||
private VariableRenderer renderer;
|
||||
|
||||
@MockBean(SecretService.class)
|
||||
SecretService testSecretService() {
|
||||
return new SecretService() {
|
||||
@Override
|
||||
public String findSecret(String tenantId, String namespace, String key) throws SecretNotFoundException, IOException {
|
||||
return switch (key) {
|
||||
case "MY_SECRET" -> "my-secret-value-12345";
|
||||
case "API_KEY" -> "api-key-value-67890";
|
||||
case "DB_PASSWORD" -> "db-password-secret";
|
||||
case "TOKEN" -> "token-value-abc123";
|
||||
case "KEY1" -> "secret-value-1";
|
||||
case "KEY2" -> "secret-value-2";
|
||||
case "JSON_SECRET" -> "{\"api_key\": \"secret123\", \"token\": \"token456\"}";
|
||||
default -> throw new SecretNotFoundException("Secret not found: " + key);
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateDebugRenderer() {
|
||||
// When
|
||||
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
|
||||
|
||||
// Then
|
||||
assertThat(debugRenderer).isNotNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateDebugRendererThatIsNotSameAsBaseRenderer() {
|
||||
// When
|
||||
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
|
||||
|
||||
// Then
|
||||
assertThat(debugRenderer).isNotSameAs(renderer);
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateDebugRendererThatMasksSecrets() throws IllegalVariableEvaluationException {
|
||||
// Given
|
||||
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
|
||||
Map<String, Object> context = Map.of(
|
||||
"flow", Map.of("namespace", "io.kestra.unittest")
|
||||
);
|
||||
|
||||
// When
|
||||
String result = debugRenderer.render("{{ secret('MY_SECRET') }}", context);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("******");
|
||||
assertThat(result).doesNotContain("my-secret-value-12345");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateDebugRendererThatMasksMultipleSecrets() throws IllegalVariableEvaluationException {
|
||||
// Given
|
||||
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
|
||||
Map<String, Object> context = Map.of(
|
||||
"flow", Map.of("namespace", "io.kestra.unittest")
|
||||
);
|
||||
|
||||
// When
|
||||
String result = debugRenderer.render(
|
||||
"API: {{ secret('API_KEY') }}, DB: {{ secret('DB_PASSWORD') }}, Token: {{ secret('TOKEN') }}",
|
||||
context
|
||||
);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("API: ******, DB: ******, Token: ******");
|
||||
assertThat(result).doesNotContain("api-key-value-67890");
|
||||
assertThat(result).doesNotContain("db-password-secret");
|
||||
assertThat(result).doesNotContain("token-value-abc123");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateDebugRendererThatDoesNotMaskNonSecretVariables() throws IllegalVariableEvaluationException {
|
||||
// Given
|
||||
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
|
||||
Map<String, Object> context = Map.of(
|
||||
"username", "testuser",
|
||||
"email", "test@example.com",
|
||||
"count", 42
|
||||
);
|
||||
|
||||
// When
|
||||
String result = debugRenderer.render(
|
||||
"User: {{ username }}, Email: {{ email }}, Count: {{ count }}",
|
||||
context
|
||||
);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("User: testuser, Email: test@example.com, Count: 42");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateDebugRendererThatMasksOnlySecretFunctions() throws IllegalVariableEvaluationException {
|
||||
// Given
|
||||
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
|
||||
Map<String, Object> context = Map.of(
|
||||
"flow", Map.of("namespace", "io.kestra.unittest"),
|
||||
"username", "testuser",
|
||||
"environment", "production"
|
||||
);
|
||||
|
||||
// When
|
||||
String result = debugRenderer.render(
|
||||
"User: {{ username }}, Env: {{ environment }}, Secret: {{ secret('MY_SECRET') }}",
|
||||
context
|
||||
);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("User: testuser, Env: production, Secret: ******");
|
||||
assertThat(result).contains("testuser");
|
||||
assertThat(result).contains("production");
|
||||
assertThat(result).doesNotContain("my-secret-value-12345");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateDebugRendererThatHandlesMissingSecrets() {
|
||||
// Given
|
||||
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
|
||||
Map<String, Object> context = Map.of(
|
||||
"flow", Map.of("namespace", "io.kestra.unittest")
|
||||
);
|
||||
|
||||
// When/Then
|
||||
assertThatThrownBy(() -> debugRenderer.render("{{ secret('NON_EXISTENT_SECRET') }}", context))
|
||||
.isInstanceOf(IllegalVariableEvaluationException.class)
|
||||
.hasMessageContaining("Secret not found: NON_EXISTENT_SECRET");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateDebugRendererThatMasksSecretsInComplexExpressions() throws IllegalVariableEvaluationException {
|
||||
// Given
|
||||
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
|
||||
Map<String, Object> context = Map.of(
|
||||
"flow", Map.of("namespace", "io.kestra.unittest")
|
||||
);
|
||||
|
||||
// When
|
||||
String result = debugRenderer.render(
|
||||
"{{ 'API Key: ' ~ secret('API_KEY') }}",
|
||||
context
|
||||
);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("API Key: ******");
|
||||
assertThat(result).doesNotContain("api-key-value-67890");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateDebugRendererThatMasksSecretsInConditionals() throws IllegalVariableEvaluationException {
|
||||
// Given
|
||||
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
|
||||
Map<String, Object> context = Map.of(
|
||||
"flow", Map.of("namespace", "io.kestra.unittest")
|
||||
);
|
||||
|
||||
// When
|
||||
String result = debugRenderer.render(
|
||||
"{{ secret('MY_SECRET') is defined ? 'Secret exists' : 'No secret' }}",
|
||||
context
|
||||
);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("Secret exists");
|
||||
assertThat(result).doesNotContain("my-secret-value-12345");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateDebugRendererThatMasksSecretsWithSubkeys() throws IllegalVariableEvaluationException {
|
||||
// Given
|
||||
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
|
||||
Map<String, Object> context = Map.of(
|
||||
"flow", Map.of("namespace", "io.kestra.unittest")
|
||||
);
|
||||
|
||||
// When
|
||||
String result = debugRenderer.render(
|
||||
"{{ secret('JSON_SECRET', subkey='api_key') }}",
|
||||
context
|
||||
);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("******");
|
||||
assertThat(result).doesNotContain("secret123");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateDebugRendererThatHandlesEmptyContext() throws IllegalVariableEvaluationException {
|
||||
// Given
|
||||
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
|
||||
Map<String, Object> emptyContext = Map.of();
|
||||
|
||||
// When
|
||||
String result = debugRenderer.render("Hello World", emptyContext);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("Hello World");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateDebugRendererThatHandlesNullValues() throws IllegalVariableEvaluationException {
|
||||
// Given
|
||||
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
|
||||
Map<String, Object> context = Map.of(
|
||||
"value", "test"
|
||||
);
|
||||
|
||||
// When
|
||||
String result = debugRenderer.render("{{ value }}", context);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("test");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateDebugRendererThatMasksSecretsInNestedRender() throws IllegalVariableEvaluationException {
|
||||
// Given
|
||||
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
|
||||
Map<String, Object> context = Map.of(
|
||||
"flow", Map.of("namespace", "io.kestra.unittest")
|
||||
);
|
||||
|
||||
// When - Using concatenation to avoid immediate evaluation
|
||||
String result = debugRenderer.render(
|
||||
"{{ render('{{s'~'ecret(\"MY_SECRET\")}}') }}",
|
||||
context
|
||||
);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("******");
|
||||
assertThat(result).doesNotContain("my-secret-value-12345");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.RunnableTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
@@ -18,6 +19,7 @@ import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
@@ -77,8 +79,12 @@ public class TaskCacheTest {
|
||||
@Plugin
|
||||
public static class CounterTask extends Task implements RunnableTask<CounterTask.Output> {
|
||||
|
||||
private String workingDir;
|
||||
|
||||
@Override
|
||||
public Output run(RunContext runContext) throws Exception {
|
||||
Map<String, Object> variables = Map.of("workingDir", runContext.workingDir().path().toString());
|
||||
runContext.render(this.workingDir, variables);
|
||||
return Output.builder()
|
||||
.counter(COUNTER.incrementAndGet())
|
||||
.build();
|
||||
|
||||
@@ -6,6 +6,8 @@ import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest(startRunner = true)
|
||||
@@ -31,4 +33,15 @@ class TaskWithRunIfTest {
|
||||
assertThat(execution.findTaskRunsByTaskId("log_test").getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ExecuteFlow("flows/valids/task-runif-executionupdating.yml")
|
||||
void executionUpdatingTask(Execution execution) {
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(execution.getTaskRunList()).hasSize(5);
|
||||
assertThat(execution.findTaskRunsByTaskId("skipSetVariables").getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
|
||||
assertThat(execution.findTaskRunsByTaskId("skipUnsetVariables").getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
|
||||
assertThat(execution.findTaskRunsByTaskId("unsetVariables").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(execution.findTaskRunsByTaskId("setVariables").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(execution.getVariables()).containsEntry("list", List.of(42));
|
||||
}
|
||||
}
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user