mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-26 14:00:23 -05:00
Compare commits
140 Commits
dependabot
...
v0.24.8
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cd080831c5 | ||
|
|
8282bf7940 | ||
|
|
0c494ebeef | ||
|
|
cc58a765e4 | ||
|
|
cbcd76416a | ||
|
|
e98732e121 | ||
|
|
19e265ee1a | ||
|
|
d7fbac8132 | ||
|
|
430bac5039 | ||
|
|
039ccf827a | ||
|
|
e15287a79b | ||
|
|
798ed061d4 | ||
|
|
9717faaade | ||
|
|
631f0c5cdb | ||
|
|
bbf619246e | ||
|
|
cb307780e4 | ||
|
|
95f2bbea1d | ||
|
|
5cb9a340a7 | ||
|
|
c5ccae287e | ||
|
|
40a9732dac | ||
|
|
ae7aefcc17 | ||
|
|
0a81f67981 | ||
|
|
c4757ed915 | ||
|
|
4577070e32 | ||
|
|
7bd519ddb4 | ||
|
|
62c85078b6 | ||
|
|
3718be9658 | ||
|
|
2df6c1b730 | ||
|
|
9af86ea677 | ||
|
|
8601905994 | ||
|
|
9de1a15d02 | ||
|
|
25b056ebb3 | ||
|
|
87d8f9867f | ||
|
|
a00c1f8397 | ||
|
|
f4470095ff | ||
|
|
cbfaa8815d | ||
|
|
10e55bbb77 | ||
|
|
59d5d4cb91 | ||
|
|
e8ee3b0a84 | ||
|
|
602ff849e3 | ||
|
|
155bdca83f | ||
|
|
faaaeada3a | ||
|
|
6ef35974d7 | ||
|
|
46f9bb768f | ||
|
|
ab87f63e8c | ||
|
|
cdb73ccbd7 | ||
|
|
8fc936e0a3 | ||
|
|
1e0ebc94b8 | ||
|
|
5318592eff | ||
|
|
2da08f160d | ||
|
|
8cbc9e7aff | ||
|
|
f8e15d103f | ||
|
|
49794a4f2a | ||
|
|
bafa5fe03c | ||
|
|
208b244f0f | ||
|
|
b93976091d | ||
|
|
eec52d76f0 | ||
|
|
b96fd87572 | ||
|
|
1aa5bfab43 | ||
|
|
c4572e86a5 | ||
|
|
f2f97bb70c | ||
|
|
804c740d3c | ||
|
|
75cd4f44e0 | ||
|
|
f167a2a2bb | ||
|
|
08d9416e3a | ||
|
|
2a879c617c | ||
|
|
3227ca7c11 | ||
|
|
428a52ce02 | ||
|
|
f58bc4caba | ||
|
|
e99ae9513f | ||
|
|
c8b51fcacf | ||
|
|
813b2f6439 | ||
|
|
c6b5bca25b | ||
|
|
de35d2cdb9 | ||
|
|
a6ffbd59d0 | ||
|
|
568740a214 | ||
|
|
aa0d2c545f | ||
|
|
cda77d5146 | ||
|
|
d4fd1f61ba | ||
|
|
9859ea5eb6 | ||
|
|
aca374a28f | ||
|
|
c413ba95e1 | ||
|
|
9c6b92619e | ||
|
|
8173e8df51 | ||
|
|
5c95505911 | ||
|
|
33f0b533bb | ||
|
|
23e35a7f97 | ||
|
|
0357321c58 | ||
|
|
5c08403398 | ||
|
|
a63cb71218 | ||
|
|
317885b91c | ||
|
|
87637302e4 | ||
|
|
056faaaf9f | ||
|
|
54c74a1328 | ||
|
|
fae0c88c5e | ||
|
|
db5d83d1cb | ||
|
|
066b947762 | ||
|
|
b6597475b1 | ||
|
|
f2610baf15 | ||
|
|
b619bf76d8 | ||
|
|
117f453a77 | ||
|
|
053d6276ff | ||
|
|
3870eca70b | ||
|
|
afd7c216f9 | ||
|
|
59a17e88e7 | ||
|
|
99f8dca1c2 | ||
|
|
1068c9fe51 | ||
|
|
ea6d30df7c | ||
|
|
04ba7363c2 | ||
|
|
281a987944 | ||
|
|
c9ce54b0be | ||
|
|
ccd9baef3c | ||
|
|
97869b9c75 | ||
|
|
1c681c1492 | ||
|
|
de2a446f93 | ||
|
|
d778947017 | ||
|
|
3f97845fdd | ||
|
|
631cd169a1 | ||
|
|
1648fa076c | ||
|
|
474806882e | ||
|
|
65467bd118 | ||
|
|
387bbb80ac | ||
|
|
19d4c64f19 | ||
|
|
809c0a228c | ||
|
|
6a045900fb | ||
|
|
4ada5fe8f3 | ||
|
|
998087ca30 | ||
|
|
146338e48f | ||
|
|
de177b925e | ||
|
|
04bfb19095 | ||
|
|
c913c48785 | ||
|
|
0d5b593d42 | ||
|
|
83f92535c5 | ||
|
|
fd6a0a6c11 | ||
|
|
104c4c97b4 | ||
|
|
21cd21269f | ||
|
|
679befa2fe | ||
|
|
8a0ecdeb8a | ||
|
|
ee8762e138 | ||
|
|
d16324f265 |
8
.github/workflows/auto-translate-ui-keys.yml
vendored
8
.github/workflows/auto-translate-ui-keys.yml
vendored
@@ -2,7 +2,7 @@ name: Auto-Translate UI keys and create PR
|
|||||||
|
|
||||||
on:
|
on:
|
||||||
schedule:
|
schedule:
|
||||||
- cron: "0 9-21 * * *" # Every hour from 9 AM to 9 PM
|
- cron: "0 9-21/3 * * *" # Every 3 hours from 9 AM to 9 PM
|
||||||
workflow_dispatch:
|
workflow_dispatch:
|
||||||
inputs:
|
inputs:
|
||||||
retranslate_modified_keys:
|
retranslate_modified_keys:
|
||||||
@@ -20,13 +20,13 @@ jobs:
|
|||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
timeout-minutes: 10
|
timeout-minutes: 10
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v4
|
- uses: actions/checkout@v5
|
||||||
name: Checkout
|
name: Checkout
|
||||||
with:
|
with:
|
||||||
fetch-depth: 0
|
fetch-depth: 0
|
||||||
|
|
||||||
- name: Set up Python
|
- name: Set up Python
|
||||||
uses: actions/setup-python@v5
|
uses: actions/setup-python@v6
|
||||||
with:
|
with:
|
||||||
python-version: "3.x"
|
python-version: "3.x"
|
||||||
|
|
||||||
@@ -39,7 +39,7 @@ jobs:
|
|||||||
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
|
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
|
||||||
|
|
||||||
- name: Set up Node
|
- name: Set up Node
|
||||||
uses: actions/setup-node@v4
|
uses: actions/setup-node@v5
|
||||||
with:
|
with:
|
||||||
node-version: "20.x"
|
node-version: "20.x"
|
||||||
|
|
||||||
|
|||||||
4
.github/workflows/codeql-analysis.yml
vendored
4
.github/workflows/codeql-analysis.yml
vendored
@@ -27,7 +27,7 @@ jobs:
|
|||||||
|
|
||||||
steps:
|
steps:
|
||||||
- name: Checkout repository
|
- name: Checkout repository
|
||||||
uses: actions/checkout@v4
|
uses: actions/checkout@v5
|
||||||
with:
|
with:
|
||||||
# We must fetch at least the immediate parents so that if this is
|
# We must fetch at least the immediate parents so that if this is
|
||||||
# a pull request then we can checkout the head.
|
# a pull request then we can checkout the head.
|
||||||
@@ -50,7 +50,7 @@ jobs:
|
|||||||
|
|
||||||
# Set up JDK
|
# Set up JDK
|
||||||
- name: Set up JDK
|
- name: Set up JDK
|
||||||
uses: actions/setup-java@v4
|
uses: actions/setup-java@v5
|
||||||
if: ${{ matrix.language == 'java' }}
|
if: ${{ matrix.language == 'java' }}
|
||||||
with:
|
with:
|
||||||
distribution: 'temurin'
|
distribution: 'temurin'
|
||||||
|
|||||||
147
.github/workflows/docker.yml
vendored
147
.github/workflows/docker.yml
vendored
@@ -1,147 +0,0 @@
|
|||||||
name: Create Docker images on Release
|
|
||||||
|
|
||||||
on:
|
|
||||||
workflow_dispatch:
|
|
||||||
inputs:
|
|
||||||
retag-latest:
|
|
||||||
description: 'Retag latest Docker images'
|
|
||||||
required: true
|
|
||||||
type: string
|
|
||||||
default: "false"
|
|
||||||
options:
|
|
||||||
- "true"
|
|
||||||
- "false"
|
|
||||||
release-tag:
|
|
||||||
description: 'Kestra Release Tag'
|
|
||||||
required: false
|
|
||||||
type: string
|
|
||||||
plugin-version:
|
|
||||||
description: 'Plugin version'
|
|
||||||
required: false
|
|
||||||
type: string
|
|
||||||
default: "LATEST"
|
|
||||||
env:
|
|
||||||
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
|
||||||
jobs:
|
|
||||||
plugins:
|
|
||||||
name: List Plugins
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
outputs:
|
|
||||||
plugins: ${{ steps.plugins.outputs.plugins }}
|
|
||||||
steps:
|
|
||||||
# Checkout
|
|
||||||
- uses: actions/checkout@v4
|
|
||||||
|
|
||||||
# Get Plugins List
|
|
||||||
- name: Get Plugins List
|
|
||||||
uses: ./.github/actions/plugins-list
|
|
||||||
id: plugins
|
|
||||||
with:
|
|
||||||
plugin-version: ${{ env.PLUGIN_VERSION }}
|
|
||||||
docker:
|
|
||||||
name: Publish Docker
|
|
||||||
needs: [ plugins ]
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
strategy:
|
|
||||||
matrix:
|
|
||||||
image:
|
|
||||||
- name: "-no-plugins"
|
|
||||||
plugins: ""
|
|
||||||
packages: jattach
|
|
||||||
python-libs: ""
|
|
||||||
- name: ""
|
|
||||||
plugins: ${{needs.plugins.outputs.plugins}}
|
|
||||||
packages: python3 python-is-python3 python3-pip curl jattach
|
|
||||||
python-libs: kestra
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v4
|
|
||||||
|
|
||||||
# Vars
|
|
||||||
- name: Set image name
|
|
||||||
id: vars
|
|
||||||
run: |
|
|
||||||
if [[ "${{ inputs.release-tag }}" == "" ]]; then
|
|
||||||
TAG=${GITHUB_REF#refs/*/}
|
|
||||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
|
||||||
else
|
|
||||||
TAG="${{ inputs.release-tag }}"
|
|
||||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
|
||||||
fi
|
|
||||||
|
|
||||||
if [[ "${{ env.PLUGIN_VERSION }}" == *"-SNAPSHOT" ]]; then
|
|
||||||
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ ${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT;
|
|
||||||
else
|
|
||||||
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
|
|
||||||
fi
|
|
||||||
# Download release
|
|
||||||
- name: Download release
|
|
||||||
uses: robinraju/release-downloader@v1.12
|
|
||||||
with:
|
|
||||||
tag: ${{steps.vars.outputs.tag}}
|
|
||||||
fileName: 'kestra-*'
|
|
||||||
out-file-path: build/executable
|
|
||||||
|
|
||||||
- name: Copy exe to image
|
|
||||||
run: |
|
|
||||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
|
||||||
|
|
||||||
# Docker setup
|
|
||||||
- name: Set up QEMU
|
|
||||||
uses: docker/setup-qemu-action@v3
|
|
||||||
|
|
||||||
- name: Docker - Fix Qemu
|
|
||||||
shell: bash
|
|
||||||
run: |
|
|
||||||
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
|
|
||||||
|
|
||||||
- name: Set up Docker Buildx
|
|
||||||
uses: docker/setup-buildx-action@v3
|
|
||||||
|
|
||||||
# Docker Login
|
|
||||||
- name: Login to DockerHub
|
|
||||||
uses: docker/login-action@v3
|
|
||||||
with:
|
|
||||||
username: ${{ secrets.DOCKERHUB_USERNAME }}
|
|
||||||
password: ${{ secrets.DOCKERHUB_PASSWORD }}
|
|
||||||
|
|
||||||
# Docker Build and push
|
|
||||||
- name: Push to Docker Hub
|
|
||||||
uses: docker/build-push-action@v6
|
|
||||||
with:
|
|
||||||
context: .
|
|
||||||
push: true
|
|
||||||
tags: ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }}
|
|
||||||
platforms: linux/amd64,linux/arm64
|
|
||||||
build-args: |
|
|
||||||
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }}
|
|
||||||
APT_PACKAGES=${{ matrix.image.packages }}
|
|
||||||
PYTHON_LIBRARIES=${{ matrix.image.python-libs }}
|
|
||||||
|
|
||||||
- name: Install regctl
|
|
||||||
if: github.event.inputs.retag-latest == 'true'
|
|
||||||
uses: regclient/actions/regctl-installer@main
|
|
||||||
|
|
||||||
- name: Retag to latest
|
|
||||||
if: github.event.inputs.retag-latest == 'true'
|
|
||||||
run: |
|
|
||||||
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
|
|
||||||
|
|
||||||
end:
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
needs:
|
|
||||||
- docker
|
|
||||||
if: always()
|
|
||||||
env:
|
|
||||||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
|
|
||||||
steps:
|
|
||||||
|
|
||||||
# Slack
|
|
||||||
- name: Slack notification
|
|
||||||
uses: Gamesight/slack-workflow-status@master
|
|
||||||
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
|
|
||||||
with:
|
|
||||||
repo_token: ${{ secrets.GITHUB_TOKEN }}
|
|
||||||
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
|
|
||||||
name: GitHub Actions
|
|
||||||
icon_emoji: ':github-actions:'
|
|
||||||
channel: 'C02DQ1A7JLR' # _int_git channel
|
|
||||||
6
.github/workflows/e2e.yml
vendored
6
.github/workflows/e2e.yml
vendored
@@ -19,7 +19,7 @@ on:
|
|||||||
default: "no input"
|
default: "no input"
|
||||||
jobs:
|
jobs:
|
||||||
check:
|
check:
|
||||||
timeout-minutes: 10
|
timeout-minutes: 15
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
env:
|
env:
|
||||||
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
|
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
|
||||||
@@ -32,12 +32,12 @@ jobs:
|
|||||||
password: ${{ github.token }}
|
password: ${{ github.token }}
|
||||||
|
|
||||||
- name: Checkout kestra
|
- name: Checkout kestra
|
||||||
uses: actions/checkout@v4
|
uses: actions/checkout@v5
|
||||||
with:
|
with:
|
||||||
path: kestra
|
path: kestra
|
||||||
|
|
||||||
# Setup build
|
# Setup build
|
||||||
- uses: kestra-io/actions/.github/actions/setup-build@main
|
- uses: kestra-io/actions/composite/setup-build@main
|
||||||
name: Setup - Build
|
name: Setup - Build
|
||||||
id: build
|
id: build
|
||||||
with:
|
with:
|
||||||
|
|||||||
14
.github/workflows/gradle-release-plugins.yml
vendored
14
.github/workflows/gradle-release-plugins.yml
vendored
@@ -4,7 +4,7 @@ on:
|
|||||||
workflow_dispatch:
|
workflow_dispatch:
|
||||||
inputs:
|
inputs:
|
||||||
releaseVersion:
|
releaseVersion:
|
||||||
description: 'The release version (e.g., 0.21.0-rc1)'
|
description: 'The release version (e.g., 0.21.0)'
|
||||||
required: true
|
required: true
|
||||||
type: string
|
type: string
|
||||||
nextVersion:
|
nextVersion:
|
||||||
@@ -21,25 +21,17 @@ jobs:
|
|||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
# Checkout
|
# Checkout
|
||||||
- uses: actions/checkout@v4
|
- uses: actions/checkout@v5
|
||||||
with:
|
with:
|
||||||
fetch-depth: 0
|
fetch-depth: 0
|
||||||
|
|
||||||
# Checkout GitHub Actions
|
|
||||||
- uses: actions/checkout@v4
|
|
||||||
with:
|
|
||||||
repository: kestra-io/actions
|
|
||||||
path: actions
|
|
||||||
ref: main
|
|
||||||
|
|
||||||
# Setup build
|
# Setup build
|
||||||
- uses: ./actions/.github/actions/setup-build
|
- uses: kestra-io/actions/composite/setup-build@main
|
||||||
id: build
|
id: build
|
||||||
with:
|
with:
|
||||||
java-enabled: true
|
java-enabled: true
|
||||||
node-enabled: true
|
node-enabled: true
|
||||||
python-enabled: true
|
python-enabled: true
|
||||||
caches-enabled: true
|
|
||||||
|
|
||||||
# Get Plugins List
|
# Get Plugins List
|
||||||
- name: Get Plugins List
|
- name: Get Plugins List
|
||||||
|
|||||||
18
.github/workflows/gradle-release.yml
vendored
18
.github/workflows/gradle-release.yml
vendored
@@ -4,7 +4,7 @@ on:
|
|||||||
workflow_dispatch:
|
workflow_dispatch:
|
||||||
inputs:
|
inputs:
|
||||||
releaseVersion:
|
releaseVersion:
|
||||||
description: 'The release version (e.g., 0.21.0-rc1)'
|
description: 'The release version (e.g., 0.21.0)'
|
||||||
required: true
|
required: true
|
||||||
type: string
|
type: string
|
||||||
nextVersion:
|
nextVersion:
|
||||||
@@ -23,8 +23,8 @@ jobs:
|
|||||||
# Checks
|
# Checks
|
||||||
- name: Check Inputs
|
- name: Check Inputs
|
||||||
run: |
|
run: |
|
||||||
if ! [[ "$RELEASE_VERSION" =~ ^[0-9]+(\.[0-9]+)\.0-rc[01](-SNAPSHOT)?$ ]]; then
|
if ! [[ "$RELEASE_VERSION" =~ ^[0-9]+(\.[0-9]+)\.0$ ]]; then
|
||||||
echo "Invalid release version. Must match regex: ^[0-9]+(\.[0-9]+)\.0-rc[01](-SNAPSHOT)?$"
|
echo "Invalid release version. Must match regex: ^[0-9]+(\.[0-9]+)\.0$"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
@@ -33,20 +33,13 @@ jobs:
|
|||||||
exit 1;
|
exit 1;
|
||||||
fi
|
fi
|
||||||
# Checkout
|
# Checkout
|
||||||
- uses: actions/checkout@v4
|
- uses: actions/checkout@v5
|
||||||
with:
|
with:
|
||||||
fetch-depth: 0
|
fetch-depth: 0
|
||||||
path: kestra
|
path: kestra
|
||||||
|
|
||||||
# Checkout GitHub Actions
|
|
||||||
- uses: actions/checkout@v4
|
|
||||||
with:
|
|
||||||
repository: kestra-io/actions
|
|
||||||
path: actions
|
|
||||||
ref: main
|
|
||||||
|
|
||||||
# Setup build
|
# Setup build
|
||||||
- uses: ./actions/.github/actions/setup-build
|
- uses: kestra-io/actions/composite/setup-build@main
|
||||||
id: build
|
id: build
|
||||||
with:
|
with:
|
||||||
java-enabled: true
|
java-enabled: true
|
||||||
@@ -78,7 +71,6 @@ jobs:
|
|||||||
git checkout develop;
|
git checkout develop;
|
||||||
|
|
||||||
if [[ "$RELEASE_VERSION" == *"-SNAPSHOT" ]]; then
|
if [[ "$RELEASE_VERSION" == *"-SNAPSHOT" ]]; then
|
||||||
# -SNAPSHOT qualifier maybe used to test release-candidates
|
|
||||||
./gradlew release -Prelease.useAutomaticVersion=true \
|
./gradlew release -Prelease.useAutomaticVersion=true \
|
||||||
-Prelease.releaseVersion="${RELEASE_VERSION}" \
|
-Prelease.releaseVersion="${RELEASE_VERSION}" \
|
||||||
-Prelease.newVersion="${NEXT_VERSION}" \
|
-Prelease.newVersion="${NEXT_VERSION}" \
|
||||||
|
|||||||
33
.github/workflows/main.yml
vendored
33
.github/workflows/main.yml
vendored
@@ -3,10 +3,17 @@ name: Main Workflow
|
|||||||
on:
|
on:
|
||||||
workflow_dispatch:
|
workflow_dispatch:
|
||||||
inputs:
|
inputs:
|
||||||
|
skip-test:
|
||||||
|
description: 'Skip test'
|
||||||
|
type: choice
|
||||||
|
required: true
|
||||||
|
default: 'false'
|
||||||
|
options:
|
||||||
|
- "true"
|
||||||
|
- "false"
|
||||||
plugin-version:
|
plugin-version:
|
||||||
description: "Kestra version"
|
description: "plugins version"
|
||||||
default: 'LATEST'
|
required: false
|
||||||
required: true
|
|
||||||
type: string
|
type: string
|
||||||
push:
|
push:
|
||||||
branches:
|
branches:
|
||||||
@@ -25,16 +32,17 @@ jobs:
|
|||||||
tests:
|
tests:
|
||||||
name: Execute tests
|
name: Execute tests
|
||||||
uses: ./.github/workflows/workflow-test.yml
|
uses: ./.github/workflows/workflow-test.yml
|
||||||
|
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
|
||||||
with:
|
with:
|
||||||
report-status: false
|
report-status: false
|
||||||
|
|
||||||
release:
|
release:
|
||||||
name: Release
|
name: Release
|
||||||
needs: [tests]
|
needs: [tests]
|
||||||
if: "!startsWith(github.ref, 'refs/heads/releases')"
|
if: "!failure() && !cancelled() && !startsWith(github.ref, 'refs/heads/releases')"
|
||||||
uses: ./.github/workflows/workflow-release.yml
|
uses: ./.github/workflows/workflow-release.yml
|
||||||
with:
|
with:
|
||||||
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
plugin-version: ${{ inputs.plugin-version != '' && inputs.plugin-version || (github.ref == 'refs/heads/develop' && 'LATEST-SNAPSHOT' || 'LATEST') }}
|
||||||
secrets:
|
secrets:
|
||||||
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
|
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||||
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
|
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||||
@@ -43,14 +51,14 @@ jobs:
|
|||||||
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
|
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
|
||||||
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
|
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
|
||||||
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
|
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
|
||||||
|
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||||
|
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
|
||||||
|
|
||||||
end:
|
end:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
needs:
|
needs:
|
||||||
- release
|
- release
|
||||||
if: always()
|
if: always()
|
||||||
env:
|
|
||||||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
|
|
||||||
steps:
|
steps:
|
||||||
- name: Trigger EE Workflow
|
- name: Trigger EE Workflow
|
||||||
uses: peter-evans/repository-dispatch@v3
|
uses: peter-evans/repository-dispatch@v3
|
||||||
@@ -60,14 +68,9 @@ jobs:
|
|||||||
repository: kestra-io/kestra-ee
|
repository: kestra-io/kestra-ee
|
||||||
event-type: "oss-updated"
|
event-type: "oss-updated"
|
||||||
|
|
||||||
|
|
||||||
# Slack
|
# Slack
|
||||||
- name: Slack - Notification
|
- name: Slack - Notification
|
||||||
uses: Gamesight/slack-workflow-status@master
|
if: ${{ failure() && env.SLACK_WEBHOOK_URL != 0 && (github.ref == 'refs/heads/master' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/develop') }}
|
||||||
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
|
uses: kestra-io/actions/composite/slack-status@main
|
||||||
with:
|
with:
|
||||||
repo_token: ${{ secrets.GITHUB_TOKEN }}
|
webhook-url: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||||
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
|
|
||||||
name: GitHub Actions
|
|
||||||
icon_emoji: ":github-actions:"
|
|
||||||
channel: "C02DQ1A7JLR" # _int_git channel
|
|
||||||
|
|||||||
17
.github/workflows/pull-request.yml
vendored
17
.github/workflows/pull-request.yml
vendored
@@ -4,6 +4,7 @@ on:
|
|||||||
pull_request:
|
pull_request:
|
||||||
branches:
|
branches:
|
||||||
- develop
|
- develop
|
||||||
|
- releases/*
|
||||||
|
|
||||||
concurrency:
|
concurrency:
|
||||||
group: ${{ github.workflow }}-${{ github.ref_name }}-pr
|
group: ${{ github.workflow }}-${{ github.ref_name }}-pr
|
||||||
@@ -60,19 +61,3 @@ jobs:
|
|||||||
name: E2E - Tests
|
name: E2E - Tests
|
||||||
uses: ./.github/workflows/e2e.yml
|
uses: ./.github/workflows/e2e.yml
|
||||||
|
|
||||||
end:
|
|
||||||
name: End
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
if: always()
|
|
||||||
needs: [frontend, backend]
|
|
||||||
steps:
|
|
||||||
# Slack
|
|
||||||
- name: Slack notification
|
|
||||||
uses: Gamesight/slack-workflow-status@master
|
|
||||||
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
|
|
||||||
with:
|
|
||||||
repo_token: ${{ secrets.GITHUB_TOKEN }}
|
|
||||||
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
|
|
||||||
name: GitHub Actions
|
|
||||||
icon_emoji: ":github-actions:"
|
|
||||||
channel: "C02DQ1A7JLR"
|
|
||||||
2
.github/workflows/setversion-tag-plugins.yml
vendored
2
.github/workflows/setversion-tag-plugins.yml
vendored
@@ -17,7 +17,7 @@ jobs:
|
|||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
# Checkout
|
# Checkout
|
||||||
- uses: actions/checkout@v4
|
- uses: actions/checkout@v5
|
||||||
with:
|
with:
|
||||||
fetch-depth: 0
|
fetch-depth: 0
|
||||||
|
|
||||||
|
|||||||
7
.github/workflows/setversion-tag.yml
vendored
7
.github/workflows/setversion-tag.yml
vendored
@@ -34,11 +34,14 @@ jobs:
|
|||||||
fi
|
fi
|
||||||
|
|
||||||
# Checkout
|
# Checkout
|
||||||
- uses: actions/checkout@v4
|
- name: Checkout
|
||||||
|
uses: actions/checkout@v5
|
||||||
with:
|
with:
|
||||||
fetch-depth: 0
|
fetch-depth: 0
|
||||||
|
token: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||||
|
|
||||||
- name: Configure Git
|
# Configure
|
||||||
|
- name: Git - Configure
|
||||||
run: |
|
run: |
|
||||||
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
|
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
|
||||||
git config --global user.name "github-actions[bot]"
|
git config --global user.name "github-actions[bot]"
|
||||||
|
|||||||
36
.github/workflows/vulnerabilities-check.yml
vendored
36
.github/workflows/vulnerabilities-check.yml
vendored
@@ -17,17 +17,10 @@ jobs:
|
|||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
# Checkout
|
# Checkout
|
||||||
- uses: actions/checkout@v4
|
- uses: actions/checkout@v5
|
||||||
with:
|
with:
|
||||||
fetch-depth: 0
|
fetch-depth: 0
|
||||||
|
|
||||||
# Checkout GitHub Actions
|
|
||||||
- uses: actions/checkout@v4
|
|
||||||
with:
|
|
||||||
repository: kestra-io/actions
|
|
||||||
path: actions
|
|
||||||
ref: main
|
|
||||||
|
|
||||||
# Setup build
|
# Setup build
|
||||||
- uses: ./actions/.github/actions/setup-build
|
- uses: ./actions/.github/actions/setup-build
|
||||||
id: build
|
id: build
|
||||||
@@ -66,19 +59,12 @@ jobs:
|
|||||||
actions: read
|
actions: read
|
||||||
steps:
|
steps:
|
||||||
# Checkout
|
# Checkout
|
||||||
- uses: actions/checkout@v4
|
- uses: actions/checkout@v5
|
||||||
with:
|
with:
|
||||||
fetch-depth: 0
|
fetch-depth: 0
|
||||||
|
|
||||||
# Checkout GitHub Actions
|
|
||||||
- uses: actions/checkout@v4
|
|
||||||
with:
|
|
||||||
repository: kestra-io/actions
|
|
||||||
path: actions
|
|
||||||
ref: main
|
|
||||||
|
|
||||||
# Setup build
|
# Setup build
|
||||||
- uses: ./actions/.github/actions/setup-build
|
- uses: kestra-io/actions/composite/setup-build@main
|
||||||
id: build
|
id: build
|
||||||
with:
|
with:
|
||||||
java-enabled: false
|
java-enabled: false
|
||||||
@@ -87,7 +73,7 @@ jobs:
|
|||||||
|
|
||||||
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
|
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
|
||||||
- name: Docker Vulnerabilities Check
|
- name: Docker Vulnerabilities Check
|
||||||
uses: aquasecurity/trivy-action@0.32.0
|
uses: aquasecurity/trivy-action@0.33.1
|
||||||
with:
|
with:
|
||||||
image-ref: kestra/kestra:develop
|
image-ref: kestra/kestra:develop
|
||||||
format: 'template'
|
format: 'template'
|
||||||
@@ -111,28 +97,20 @@ jobs:
|
|||||||
actions: read
|
actions: read
|
||||||
steps:
|
steps:
|
||||||
# Checkout
|
# Checkout
|
||||||
- uses: actions/checkout@v4
|
- uses: actions/checkout@v5
|
||||||
with:
|
with:
|
||||||
fetch-depth: 0
|
fetch-depth: 0
|
||||||
|
|
||||||
# Checkout GitHub Actions
|
|
||||||
- uses: actions/checkout@v4
|
|
||||||
with:
|
|
||||||
repository: kestra-io/actions
|
|
||||||
path: actions
|
|
||||||
ref: main
|
|
||||||
|
|
||||||
# Setup build
|
# Setup build
|
||||||
- uses: ./actions/.github/actions/setup-build
|
- uses: kestra-io/actions/composite/setup-build@main
|
||||||
id: build
|
id: build
|
||||||
with:
|
with:
|
||||||
java-enabled: false
|
java-enabled: false
|
||||||
node-enabled: false
|
node-enabled: false
|
||||||
caches-enabled: true
|
|
||||||
|
|
||||||
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
|
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
|
||||||
- name: Docker Vulnerabilities Check
|
- name: Docker Vulnerabilities Check
|
||||||
uses: aquasecurity/trivy-action@0.32.0
|
uses: aquasecurity/trivy-action@0.33.1
|
||||||
with:
|
with:
|
||||||
image-ref: kestra/kestra:latest
|
image-ref: kestra/kestra:latest
|
||||||
format: table
|
format: table
|
||||||
|
|||||||
88
.github/workflows/workflow-backend-test.yml
vendored
88
.github/workflows/workflow-backend-test.yml
vendored
@@ -20,6 +20,7 @@ permissions:
|
|||||||
contents: write
|
contents: write
|
||||||
checks: write
|
checks: write
|
||||||
actions: read
|
actions: read
|
||||||
|
pull-requests: write
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
test:
|
test:
|
||||||
@@ -29,13 +30,13 @@ jobs:
|
|||||||
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
|
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
|
||||||
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
|
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v4
|
- uses: actions/checkout@v5
|
||||||
name: Checkout - Current ref
|
name: Checkout - Current ref
|
||||||
with:
|
with:
|
||||||
fetch-depth: 0
|
fetch-depth: 0
|
||||||
|
|
||||||
# Setup build
|
# Setup build
|
||||||
- uses: kestra-io/actions/.github/actions/setup-build@main
|
- uses: kestra-io/actions/composite/setup-build@main
|
||||||
name: Setup - Build
|
name: Setup - Build
|
||||||
id: build
|
id: build
|
||||||
with:
|
with:
|
||||||
@@ -59,84 +60,15 @@ jobs:
|
|||||||
export GOOGLE_APPLICATION_CREDENTIALS=$HOME/.gcp-service-account.json
|
export GOOGLE_APPLICATION_CREDENTIALS=$HOME/.gcp-service-account.json
|
||||||
./gradlew check javadoc --parallel
|
./gradlew check javadoc --parallel
|
||||||
|
|
||||||
# report test
|
- name: comment PR with test report
|
||||||
- name: Test - Publish Test Results
|
if: ${{ !cancelled() && github.event_name == 'pull_request' }}
|
||||||
uses: dorny/test-reporter@v2
|
|
||||||
if: always()
|
|
||||||
with:
|
|
||||||
name: Java Tests Report
|
|
||||||
reporter: java-junit
|
|
||||||
path: '**/build/test-results/test/TEST-*.xml'
|
|
||||||
list-suites: 'failed'
|
|
||||||
list-tests: 'failed'
|
|
||||||
fail-on-error: 'false'
|
|
||||||
token: ${{ secrets.GITHUB_AUTH_TOKEN }}
|
|
||||||
|
|
||||||
# Sonar
|
|
||||||
- name: Test - Analyze with Sonar
|
|
||||||
if: env.SONAR_TOKEN != ''
|
|
||||||
env:
|
env:
|
||||||
GITHUB_TOKEN: ${{ secrets.GITHUB_AUTH_TOKEN }}
|
GITHUB_TOKEN: ${{ secrets.GITHUB_AUTH_TOKEN }}
|
||||||
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
|
run: npx --yes @kestra-io/kestra-devtools generateTestReportSummary --only-errors --ci $(pwd)
|
||||||
shell: bash
|
|
||||||
run: ./gradlew sonar --info
|
|
||||||
|
|
||||||
# GCP
|
# Report Java
|
||||||
- name: GCP - Auth with unit test account
|
- name: Report - Java
|
||||||
id: auth
|
uses: kestra-io/actions/composite/report-java@main
|
||||||
if: always() && env.GOOGLE_SERVICE_ACCOUNT != ''
|
|
||||||
continue-on-error: true
|
|
||||||
uses: "google-github-actions/auth@v2"
|
|
||||||
with:
|
|
||||||
credentials_json: "${{ secrets.GOOGLE_SERVICE_ACCOUNT }}"
|
|
||||||
|
|
||||||
- name: GCP - Setup Cloud SDK
|
|
||||||
if: env.GOOGLE_SERVICE_ACCOUNT != ''
|
|
||||||
uses: "google-github-actions/setup-gcloud@v2"
|
|
||||||
|
|
||||||
# Allure check
|
|
||||||
- uses: rlespinasse/github-slug-action@v5
|
|
||||||
name: Allure - Generate slug variables
|
|
||||||
|
|
||||||
- name: Allure - Publish report
|
|
||||||
uses: andrcuns/allure-publish-action@v2.9.0
|
|
||||||
if: always() && env.GOOGLE_SERVICE_ACCOUNT != ''
|
|
||||||
continue-on-error: true
|
|
||||||
env:
|
|
||||||
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_AUTH_TOKEN }}
|
|
||||||
JAVA_HOME: /usr/lib/jvm/default-jvm/
|
|
||||||
with:
|
|
||||||
storageType: gcs
|
|
||||||
resultsGlob: "**/build/allure-results"
|
|
||||||
bucket: internal-kestra-host
|
|
||||||
baseUrl: "https://internal.dev.kestra.io"
|
|
||||||
prefix: ${{ format('{0}/{1}', github.repository, 'allure/java') }}
|
|
||||||
copyLatest: true
|
|
||||||
ignoreMissingResults: true
|
|
||||||
|
|
||||||
# Jacoco
|
|
||||||
- name: Jacoco - Copy reports
|
|
||||||
if: env.GOOGLE_SERVICE_ACCOUNT != ''
|
|
||||||
continue-on-error: true
|
|
||||||
shell: bash
|
|
||||||
run: |
|
|
||||||
mv build/reports/jacoco/testCodeCoverageReport build/reports/jacoco/test/
|
|
||||||
mv build/reports/jacoco/test/testCodeCoverageReport.xml build/reports/jacoco/test/jacocoTestReport.xml
|
|
||||||
gsutil -m rsync -d -r build/reports/jacoco/test/ gs://internal-kestra-host/${{ format('{0}/{1}', github.repository, 'jacoco') }}
|
|
||||||
|
|
||||||
# Codecov
|
|
||||||
- name: Codecov - Upload coverage reports
|
|
||||||
uses: codecov/codecov-action@v5
|
|
||||||
if: ${{ !cancelled() }}
|
if: ${{ !cancelled() }}
|
||||||
continue-on-error: true
|
|
||||||
with:
|
with:
|
||||||
token: ${{ secrets.CODECOV_TOKEN }}
|
secrets: ${{ toJSON(secrets) }}
|
||||||
flags: backend
|
|
||||||
|
|
||||||
- name: Codecov - Upload test results
|
|
||||||
uses: codecov/test-results-action@v1
|
|
||||||
if: ${{ !cancelled() }}
|
|
||||||
continue-on-error: true
|
|
||||||
with:
|
|
||||||
token: ${{ secrets.CODECOV_TOKEN }}
|
|
||||||
flags: backend
|
|
||||||
78
.github/workflows/workflow-build-artifacts.yml
vendored
78
.github/workflows/workflow-build-artifacts.yml
vendored
@@ -1,23 +1,7 @@
|
|||||||
name: Build Artifacts
|
name: Build Artifacts
|
||||||
|
|
||||||
on:
|
on:
|
||||||
workflow_call:
|
workflow_call: {}
|
||||||
inputs:
|
|
||||||
plugin-version:
|
|
||||||
description: "Kestra version"
|
|
||||||
default: 'LATEST'
|
|
||||||
required: true
|
|
||||||
type: string
|
|
||||||
outputs:
|
|
||||||
docker-tag:
|
|
||||||
value: ${{ jobs.build.outputs.docker-tag }}
|
|
||||||
description: "The Docker image Tag for Kestra"
|
|
||||||
docker-artifact-name:
|
|
||||||
value: ${{ jobs.build.outputs.docker-artifact-name }}
|
|
||||||
description: "The GitHub artifact containing the Kestra docker image name."
|
|
||||||
plugins:
|
|
||||||
value: ${{ jobs.build.outputs.plugins }}
|
|
||||||
description: "The Kestra plugins list used for the build."
|
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
build:
|
build:
|
||||||
@@ -31,7 +15,7 @@ jobs:
|
|||||||
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||||
steps:
|
steps:
|
||||||
- name: Checkout - Current ref
|
- name: Checkout - Current ref
|
||||||
uses: actions/checkout@v4
|
uses: actions/checkout@v5
|
||||||
with:
|
with:
|
||||||
fetch-depth: 0
|
fetch-depth: 0
|
||||||
|
|
||||||
@@ -42,7 +26,7 @@ jobs:
|
|||||||
run: npm ci
|
run: npm ci
|
||||||
|
|
||||||
# Setup build
|
# Setup build
|
||||||
- uses: kestra-io/actions/.github/actions/setup-build@main
|
- uses: kestra-io/actions/composite/setup-build@main
|
||||||
name: Setup - Build
|
name: Setup - Build
|
||||||
id: build
|
id: build
|
||||||
with:
|
with:
|
||||||
@@ -82,55 +66,6 @@ jobs:
|
|||||||
run: |
|
run: |
|
||||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||||
|
|
||||||
# Docker Tag
|
|
||||||
- name: Setup - Docker vars
|
|
||||||
id: vars
|
|
||||||
shell: bash
|
|
||||||
run: |
|
|
||||||
TAG=${GITHUB_REF#refs/*/}
|
|
||||||
if [[ $TAG = "master" ]]
|
|
||||||
then
|
|
||||||
TAG="latest";
|
|
||||||
elif [[ $TAG = "develop" ]]
|
|
||||||
then
|
|
||||||
TAG="develop";
|
|
||||||
elif [[ $TAG = v* ]]
|
|
||||||
then
|
|
||||||
TAG="${TAG}";
|
|
||||||
else
|
|
||||||
TAG="build-${{ github.run_id }}";
|
|
||||||
fi
|
|
||||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
|
||||||
echo "artifact=docker-kestra-${TAG}" >> $GITHUB_OUTPUT
|
|
||||||
|
|
||||||
# Docker setup
|
|
||||||
- name: Docker - Setup QEMU
|
|
||||||
uses: docker/setup-qemu-action@v3
|
|
||||||
|
|
||||||
- name: Docker - Fix Qemu
|
|
||||||
shell: bash
|
|
||||||
run: |
|
|
||||||
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
|
|
||||||
|
|
||||||
- name: Docker - Setup Buildx
|
|
||||||
uses: docker/setup-buildx-action@v3
|
|
||||||
|
|
||||||
# Docker Build
|
|
||||||
- name: Docker - Build & export image
|
|
||||||
uses: docker/build-push-action@v6
|
|
||||||
if: "!startsWith(github.ref, 'refs/tags/v')"
|
|
||||||
with:
|
|
||||||
context: .
|
|
||||||
push: false
|
|
||||||
file: Dockerfile
|
|
||||||
tags: |
|
|
||||||
kestra/kestra:${{ steps.vars.outputs.tag }}
|
|
||||||
build-args: |
|
|
||||||
KESTRA_PLUGINS=${{ steps.plugins.outputs.plugins }}
|
|
||||||
APT_PACKAGES=${{ env.DOCKER_APT_PACKAGES }}
|
|
||||||
PYTHON_LIBRARIES=${{ env.DOCKER_PYTHON_LIBRARIES }}
|
|
||||||
outputs: type=docker,dest=/tmp/${{ steps.vars.outputs.artifact }}.tar
|
|
||||||
|
|
||||||
# Upload artifacts
|
# Upload artifacts
|
||||||
- name: Artifacts - Upload JAR
|
- name: Artifacts - Upload JAR
|
||||||
uses: actions/upload-artifact@v4
|
uses: actions/upload-artifact@v4
|
||||||
@@ -143,10 +78,3 @@ jobs:
|
|||||||
with:
|
with:
|
||||||
name: exe
|
name: exe
|
||||||
path: build/executable/
|
path: build/executable/
|
||||||
|
|
||||||
- name: Artifacts - Upload Docker
|
|
||||||
uses: actions/upload-artifact@v4
|
|
||||||
if: "!startsWith(github.ref, 'refs/tags/v')"
|
|
||||||
with:
|
|
||||||
name: ${{ steps.vars.outputs.artifact }}
|
|
||||||
path: /tmp/${{ steps.vars.outputs.artifact }}.tar
|
|
||||||
|
|||||||
2
.github/workflows/workflow-frontend-test.yml
vendored
2
.github/workflows/workflow-frontend-test.yml
vendored
@@ -20,7 +20,7 @@ jobs:
|
|||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
- name: Checkout
|
- name: Checkout
|
||||||
uses: actions/checkout@v4
|
uses: actions/checkout@v5
|
||||||
|
|
||||||
- name: Cache Node Modules
|
- name: Cache Node Modules
|
||||||
id: cache-node-modules
|
id: cache-node-modules
|
||||||
|
|||||||
31
.github/workflows/workflow-github-release.yml
vendored
31
.github/workflows/workflow-github-release.yml
vendored
@@ -1,14 +1,17 @@
|
|||||||
name: Github - Release
|
name: Github - Release
|
||||||
|
|
||||||
on:
|
on:
|
||||||
|
workflow_dispatch:
|
||||||
workflow_call:
|
workflow_call:
|
||||||
secrets:
|
secrets:
|
||||||
GH_PERSONAL_TOKEN:
|
GH_PERSONAL_TOKEN:
|
||||||
description: "The Github personal token."
|
description: "The Github personal token."
|
||||||
required: true
|
required: true
|
||||||
push:
|
SLACK_RELEASES_WEBHOOK_URL:
|
||||||
tags:
|
description: "The Slack webhook URL."
|
||||||
- '*'
|
required: true
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
publish:
|
publish:
|
||||||
@@ -17,25 +20,16 @@ jobs:
|
|||||||
steps:
|
steps:
|
||||||
# Check out
|
# Check out
|
||||||
- name: Checkout - Repository
|
- name: Checkout - Repository
|
||||||
uses: actions/checkout@v4
|
uses: actions/checkout@v5
|
||||||
with:
|
with:
|
||||||
fetch-depth: 0
|
fetch-depth: 0
|
||||||
submodules: true
|
submodules: true
|
||||||
|
|
||||||
# Checkout GitHub Actions
|
|
||||||
- name: Checkout - Actions
|
|
||||||
uses: actions/checkout@v4
|
|
||||||
with:
|
|
||||||
repository: kestra-io/actions
|
|
||||||
sparse-checkout-cone-mode: true
|
|
||||||
path: actions
|
|
||||||
sparse-checkout: |
|
|
||||||
.github/actions
|
|
||||||
|
|
||||||
# Download Exec
|
# Download Exec
|
||||||
# Must be done after checkout actions
|
# Must be done after checkout actions
|
||||||
- name: Artifacts - Download executable
|
- name: Artifacts - Download executable
|
||||||
uses: actions/download-artifact@v4
|
uses: actions/download-artifact@v5
|
||||||
if: startsWith(github.ref, 'refs/tags/v')
|
if: startsWith(github.ref, 'refs/tags/v')
|
||||||
with:
|
with:
|
||||||
name: exe
|
name: exe
|
||||||
@@ -56,7 +50,7 @@ jobs:
|
|||||||
|
|
||||||
# GitHub Release
|
# GitHub Release
|
||||||
- name: Create GitHub release
|
- name: Create GitHub release
|
||||||
uses: ./actions/.github/actions/github-release
|
uses: kestra-io/actions/composite/github-release@main
|
||||||
if: ${{ startsWith(github.ref, 'refs/tags/v') }}
|
if: ${{ startsWith(github.ref, 'refs/tags/v') }}
|
||||||
env:
|
env:
|
||||||
MAKE_LATEST: ${{ steps.is_latest.outputs.latest }}
|
MAKE_LATEST: ${{ steps.is_latest.outputs.latest }}
|
||||||
@@ -76,3 +70,10 @@ jobs:
|
|||||||
"github_repository": "${{ github.repository }}",
|
"github_repository": "${{ github.repository }}",
|
||||||
"github_actor": "${{ github.actor }}"
|
"github_actor": "${{ github.actor }}"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
- name: Merge Release Notes
|
||||||
|
if: ${{ startsWith(github.ref, 'refs/tags/v') }}
|
||||||
|
uses: kestra-io/actions/composite/github-release-note-merge@main
|
||||||
|
env:
|
||||||
|
GITHUB_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||||
|
RELEASE_TAG: ${{ github.ref_name }}
|
||||||
|
|||||||
218
.github/workflows/workflow-publish-docker.yml
vendored
218
.github/workflows/workflow-publish-docker.yml
vendored
@@ -1,22 +1,45 @@
|
|||||||
name: Publish - Docker
|
name: Create Docker images on Release
|
||||||
|
|
||||||
on:
|
on:
|
||||||
workflow_dispatch:
|
workflow_dispatch:
|
||||||
inputs:
|
inputs:
|
||||||
plugin-version:
|
retag-latest:
|
||||||
description: "Kestra version"
|
description: 'Retag latest Docker images'
|
||||||
default: 'LATEST'
|
required: true
|
||||||
|
type: choice
|
||||||
|
default: "false"
|
||||||
|
options:
|
||||||
|
- "true"
|
||||||
|
- "false"
|
||||||
|
retag-lts:
|
||||||
|
description: 'Retag LTS Docker images'
|
||||||
|
required: true
|
||||||
|
type: choice
|
||||||
|
default: "false"
|
||||||
|
options:
|
||||||
|
- "true"
|
||||||
|
- "false"
|
||||||
|
release-tag:
|
||||||
|
description: 'Kestra Release Tag (by default, deduced with the ref)'
|
||||||
required: false
|
required: false
|
||||||
type: string
|
type: string
|
||||||
|
plugin-version:
|
||||||
|
description: 'Plugin version'
|
||||||
|
required: false
|
||||||
|
type: string
|
||||||
|
default: "LATEST"
|
||||||
force-download-artifact:
|
force-download-artifact:
|
||||||
description: 'Force download artifact'
|
description: 'Force download artifact'
|
||||||
required: false
|
required: false
|
||||||
type: string
|
type: choice
|
||||||
default: "true"
|
default: "true"
|
||||||
|
options:
|
||||||
|
- "true"
|
||||||
|
- "false"
|
||||||
workflow_call:
|
workflow_call:
|
||||||
inputs:
|
inputs:
|
||||||
plugin-version:
|
plugin-version:
|
||||||
description: "Kestra version"
|
description: "Plugin version"
|
||||||
default: 'LATEST'
|
default: 'LATEST'
|
||||||
required: false
|
required: false
|
||||||
type: string
|
type: string
|
||||||
@@ -33,47 +56,93 @@ on:
|
|||||||
description: "The Dockerhub password."
|
description: "The Dockerhub password."
|
||||||
required: true
|
required: true
|
||||||
|
|
||||||
|
env:
|
||||||
|
PLUGIN_VERSION: ${{ inputs.plugin-version != null && inputs.plugin-version || 'LATEST' }}
|
||||||
jobs:
|
jobs:
|
||||||
|
plugins:
|
||||||
|
name: List Plugins
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
outputs:
|
||||||
|
plugins: ${{ steps.plugins.outputs.plugins }}
|
||||||
|
steps:
|
||||||
|
# Checkout
|
||||||
|
- uses: actions/checkout@v5
|
||||||
|
|
||||||
|
# Get Plugins List
|
||||||
|
- name: Get Plugins List
|
||||||
|
uses: ./.github/actions/plugins-list
|
||||||
|
id: plugins
|
||||||
|
with: # remap LATEST-SNAPSHOT to LATEST
|
||||||
|
plugin-version: ${{ env.PLUGIN_VERSION == 'LATEST-SNAPSHOT' && 'LATEST' || env.PLUGIN_VERSION }}
|
||||||
|
|
||||||
# ********************************************************************************************************************
|
# ********************************************************************************************************************
|
||||||
# Build
|
# Build
|
||||||
# ********************************************************************************************************************
|
# ********************************************************************************************************************
|
||||||
build-artifacts:
|
build-artifacts:
|
||||||
name: Build Artifacts
|
name: Build Artifacts
|
||||||
if: ${{ github.event.inputs.force-download-artifact == 'true' }}
|
if: ${{ inputs.force-download-artifact == 'true' }}
|
||||||
uses: ./.github/workflows/workflow-build-artifacts.yml
|
uses: ./.github/workflows/workflow-build-artifacts.yml
|
||||||
with:
|
|
||||||
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
docker:
|
||||||
# ********************************************************************************************************************
|
name: Publish Docker
|
||||||
# Docker
|
needs: [ plugins, build-artifacts ]
|
||||||
# ********************************************************************************************************************
|
if: always()
|
||||||
publish:
|
|
||||||
name: Publish - Docker
|
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
needs: build-artifacts
|
|
||||||
if: |
|
|
||||||
always() &&
|
|
||||||
(needs.build-artifacts.result == 'success' ||
|
|
||||||
github.event.inputs.force-download-artifact != 'true')
|
|
||||||
env:
|
|
||||||
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
|
||||||
strategy:
|
strategy:
|
||||||
matrix:
|
matrix:
|
||||||
image:
|
image:
|
||||||
- tag: -no-plugins
|
- name: "-no-plugins"
|
||||||
|
plugins: ""
|
||||||
packages: jattach
|
packages: jattach
|
||||||
plugins: false
|
python-libs: ""
|
||||||
python-libraries: ""
|
- name: ""
|
||||||
|
plugins: ${{needs.plugins.outputs.plugins}}
|
||||||
- tag: ""
|
packages: python3 python-is-python3 python3-pip curl jattach
|
||||||
plugins: true
|
python-libs: kestra
|
||||||
packages: python3 python3-venv python-is-python3 python3-pip nodejs npm curl zip unzip jattach
|
|
||||||
python-libraries: kestra
|
|
||||||
steps:
|
steps:
|
||||||
- name: Checkout - Current ref
|
- uses: actions/checkout@v5
|
||||||
uses: actions/checkout@v4
|
|
||||||
|
# Vars
|
||||||
|
- name: Set image name
|
||||||
|
id: vars
|
||||||
|
run: |
|
||||||
|
if [[ "${{ inputs.release-tag }}" == "" ]]; then
|
||||||
|
TAG=${GITHUB_REF#refs/*/}
|
||||||
|
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||||
|
else
|
||||||
|
TAG="${{ inputs.release-tag }}"
|
||||||
|
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||||
|
fi
|
||||||
|
|
||||||
|
if [[ $GITHUB_REF == refs/tags/* ]]; then
|
||||||
|
if [[ $TAG =~ ^v[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
|
||||||
|
# this will remove the patch version number
|
||||||
|
MINOR_SEMVER=${TAG%.*}
|
||||||
|
echo "minor_semver=${MINOR_SEMVER}" >> $GITHUB_OUTPUT
|
||||||
|
else
|
||||||
|
echo "Tag '$TAG' is not a valid semver (vMAJOR.MINOR.PATCH), skipping minor_semver"
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
|
||||||
|
if [[ "${{ env.PLUGIN_VERSION }}" == *"-SNAPSHOT" ]]; then
|
||||||
|
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ ${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT;
|
||||||
|
else
|
||||||
|
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Download executable from artifact
|
||||||
|
- name: Artifacts - Download executable
|
||||||
|
uses: actions/download-artifact@v5
|
||||||
|
with:
|
||||||
|
name: exe
|
||||||
|
path: build/executable
|
||||||
|
|
||||||
|
- name: Copy exe to image
|
||||||
|
run: |
|
||||||
|
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||||
|
|
||||||
# Docker setup
|
# Docker setup
|
||||||
- name: Docker - Setup QEMU
|
- name: Set up QEMU
|
||||||
uses: docker/setup-qemu-action@v3
|
uses: docker/setup-qemu-action@v3
|
||||||
|
|
||||||
- name: Docker - Fix Qemu
|
- name: Docker - Fix Qemu
|
||||||
@@ -81,66 +150,59 @@ jobs:
|
|||||||
run: |
|
run: |
|
||||||
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
|
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
|
||||||
|
|
||||||
- name: Docker - Setup Docker Buildx
|
- name: Set up Docker Buildx
|
||||||
uses: docker/setup-buildx-action@v3
|
uses: docker/setup-buildx-action@v3
|
||||||
|
|
||||||
# Docker Login
|
# Docker Login
|
||||||
- name: Docker - Login to DockerHub
|
- name: Login to DockerHub
|
||||||
uses: docker/login-action@v3
|
uses: docker/login-action@v3
|
||||||
with:
|
with:
|
||||||
username: ${{ secrets.DOCKERHUB_USERNAME }}
|
username: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||||
password: ${{ secrets.DOCKERHUB_PASSWORD }}
|
password: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||||
|
|
||||||
|
|
||||||
# # Get Plugins List
|
|
||||||
- name: Plugins - Get List
|
|
||||||
uses: ./.github/actions/plugins-list
|
|
||||||
id: plugins-list
|
|
||||||
if: ${{ matrix.image.plugins}}
|
|
||||||
with:
|
|
||||||
plugin-version: ${{ env.PLUGIN_VERSION }}
|
|
||||||
|
|
||||||
# Vars
|
|
||||||
- name: Docker - Set variables
|
|
||||||
shell: bash
|
|
||||||
id: vars
|
|
||||||
run: |
|
|
||||||
TAG=${GITHUB_REF#refs/*/}
|
|
||||||
PLUGINS="${{ matrix.image.plugins == true && steps.plugins-list.outputs.plugins || '' }}"
|
|
||||||
if [[ $TAG == v* ]]; then
|
|
||||||
TAG="${TAG}";
|
|
||||||
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
|
|
||||||
elif [[ $TAG = "develop" ]]; then
|
|
||||||
TAG="develop";
|
|
||||||
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ $PLUGINS" >> $GITHUB_OUTPUT
|
|
||||||
else
|
|
||||||
TAG="build-${{ github.run_id }}";
|
|
||||||
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ $PLUGINS" >> $GITHUB_OUTPUT
|
|
||||||
fi
|
|
||||||
|
|
||||||
echo "tag=${TAG}${{ matrix.image.tag }}" >> $GITHUB_OUTPUT
|
|
||||||
|
|
||||||
# Build Docker Image
|
|
||||||
- name: Artifacts - Download executable
|
|
||||||
uses: actions/download-artifact@v4
|
|
||||||
with:
|
|
||||||
name: exe
|
|
||||||
path: build/executable
|
|
||||||
|
|
||||||
- name: Docker - Copy exe to image
|
|
||||||
shell: bash
|
|
||||||
run: |
|
|
||||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
|
||||||
|
|
||||||
# Docker Build and push
|
# Docker Build and push
|
||||||
- name: Docker - Build image
|
- name: Push to Docker Hub
|
||||||
uses: docker/build-push-action@v6
|
uses: docker/build-push-action@v6
|
||||||
with:
|
with:
|
||||||
context: .
|
context: .
|
||||||
push: true
|
push: true
|
||||||
tags: kestra/kestra:${{ steps.vars.outputs.tag }}
|
tags: ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }}
|
||||||
platforms: linux/amd64,linux/arm64
|
platforms: linux/amd64,linux/arm64
|
||||||
build-args: |
|
build-args: |
|
||||||
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }}
|
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }}
|
||||||
APT_PACKAGES=${{ matrix.image.packages }}
|
APT_PACKAGES=${{ matrix.image.packages }}
|
||||||
PYTHON_LIBRARIES=${{ matrix.image.python-libraries }}
|
PYTHON_LIBRARIES=${{ matrix.image.python-libs }}
|
||||||
|
|
||||||
|
- name: Install regctl
|
||||||
|
if: startsWith(github.ref, 'refs/tags/v')
|
||||||
|
uses: regclient/actions/regctl-installer@main
|
||||||
|
|
||||||
|
- name: Retag to minor semver version
|
||||||
|
if: startsWith(github.ref, 'refs/tags/v') && steps.vars.outputs.minor_semver != ''
|
||||||
|
run: |
|
||||||
|
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.minor_semver, matrix.image.name) }}
|
||||||
|
|
||||||
|
- name: Retag to latest
|
||||||
|
if: startsWith(github.ref, 'refs/tags/v') && inputs.retag-latest == 'true'
|
||||||
|
run: |
|
||||||
|
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
|
||||||
|
|
||||||
|
- name: Retag to LTS
|
||||||
|
if: startsWith(github.ref, 'refs/tags/v') && inputs.retag-lts == 'true'
|
||||||
|
run: |
|
||||||
|
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest-lts{0}', matrix.image.name) }}
|
||||||
|
|
||||||
|
end:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
needs:
|
||||||
|
- docker
|
||||||
|
if: always()
|
||||||
|
env:
|
||||||
|
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||||
|
steps:
|
||||||
|
- name: Slack notification
|
||||||
|
if: ${{ failure() && env.SLACK_WEBHOOK_URL != 0 }}
|
||||||
|
uses: kestra-io/actions/composite/slack-status@main
|
||||||
|
with:
|
||||||
|
webhook-url: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||||
|
|
||||||
|
|||||||
4
.github/workflows/workflow-publish-maven.yml
vendored
4
.github/workflows/workflow-publish-maven.yml
vendored
@@ -25,11 +25,11 @@ jobs:
|
|||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
- name: Checkout - Current ref
|
- name: Checkout - Current ref
|
||||||
uses: actions/checkout@v4
|
uses: actions/checkout@v5
|
||||||
|
|
||||||
# Setup build
|
# Setup build
|
||||||
- name: Setup - Build
|
- name: Setup - Build
|
||||||
uses: kestra-io/actions/.github/actions/setup-build@main
|
uses: kestra-io/actions/composite/setup-build@main
|
||||||
id: build
|
id: build
|
||||||
with:
|
with:
|
||||||
java-enabled: true
|
java-enabled: true
|
||||||
|
|||||||
16
.github/workflows/workflow-pullrequest-delete-docker.yml
vendored
Normal file
16
.github/workflows/workflow-pullrequest-delete-docker.yml
vendored
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
name: Pull Request - Delete Docker
|
||||||
|
|
||||||
|
on:
|
||||||
|
pull_request:
|
||||||
|
types: [closed]
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
publish:
|
||||||
|
name: Pull Request - Delete Docker
|
||||||
|
if: github.repository == 'kestra-io/kestra' # prevent running on forks
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- uses: dataaxiom/ghcr-cleanup-action@v1
|
||||||
|
with:
|
||||||
|
package: kestra-pr
|
||||||
|
delete-tags: ${{ github.event.pull_request.number }}
|
||||||
78
.github/workflows/workflow-pullrequest-publish-docker.yml
vendored
Normal file
78
.github/workflows/workflow-pullrequest-publish-docker.yml
vendored
Normal file
@@ -0,0 +1,78 @@
|
|||||||
|
name: Pull Request - Publish Docker
|
||||||
|
|
||||||
|
on:
|
||||||
|
pull_request:
|
||||||
|
branches:
|
||||||
|
- develop
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
build-artifacts:
|
||||||
|
name: Build Artifacts
|
||||||
|
if: github.repository == 'kestra-io/kestra' # prevent running on forks
|
||||||
|
uses: ./.github/workflows/workflow-build-artifacts.yml
|
||||||
|
|
||||||
|
publish:
|
||||||
|
name: Publish Docker
|
||||||
|
if: github.repository == 'kestra-io/kestra' # prevent running on forks
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
needs: build-artifacts
|
||||||
|
env:
|
||||||
|
GITHUB_IMAGE_PATH: "ghcr.io/kestra-io/kestra-pr"
|
||||||
|
steps:
|
||||||
|
- name: Checkout - Current ref
|
||||||
|
uses: actions/checkout@v5
|
||||||
|
with:
|
||||||
|
fetch-depth: 0
|
||||||
|
|
||||||
|
# Docker setup
|
||||||
|
- name: Docker - Setup QEMU
|
||||||
|
uses: docker/setup-qemu-action@v3
|
||||||
|
|
||||||
|
- name: Docker - Setup Docker Buildx
|
||||||
|
uses: docker/setup-buildx-action@v3
|
||||||
|
|
||||||
|
# Docker Login
|
||||||
|
- name: Login to GHCR
|
||||||
|
uses: docker/login-action@v3
|
||||||
|
with:
|
||||||
|
registry: ghcr.io
|
||||||
|
username: ${{ github.actor }}
|
||||||
|
password: ${{ secrets.GITHUB_TOKEN }}
|
||||||
|
|
||||||
|
# Build Docker Image
|
||||||
|
- name: Artifacts - Download executable
|
||||||
|
uses: actions/download-artifact@v5
|
||||||
|
with:
|
||||||
|
name: exe
|
||||||
|
path: build/executable
|
||||||
|
|
||||||
|
- name: Docker - Copy exe to image
|
||||||
|
shell: bash
|
||||||
|
run: |
|
||||||
|
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||||
|
|
||||||
|
- name: Docker - Build image
|
||||||
|
uses: docker/build-push-action@v6
|
||||||
|
with:
|
||||||
|
context: .
|
||||||
|
file: ./Dockerfile.pr
|
||||||
|
push: true
|
||||||
|
tags: ${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }}
|
||||||
|
platforms: linux/amd64,linux/arm64
|
||||||
|
|
||||||
|
# Add comment on pull request
|
||||||
|
- name: Add comment to PR
|
||||||
|
uses: actions/github-script@v8
|
||||||
|
with:
|
||||||
|
github-token: ${{ secrets.GITHUB_TOKEN }}
|
||||||
|
script: |
|
||||||
|
await github.rest.issues.createComment({
|
||||||
|
issue_number: context.issue.number,
|
||||||
|
owner: context.repo.owner,
|
||||||
|
repo: context.repo.repo,
|
||||||
|
body: `**🐋 Docker image**: \`${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }}\`\n` +
|
||||||
|
`\n` +
|
||||||
|
`\`\`\`bash\n` +
|
||||||
|
`docker run --pull=always --rm -it -p 8080:8080 --user=root -v /var/run/docker.sock:/var/run/docker.sock -v /tmp:/tmp ${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }} server local\n` +
|
||||||
|
`\`\`\``
|
||||||
|
})
|
||||||
17
.github/workflows/workflow-release.yml
vendored
17
.github/workflows/workflow-release.yml
vendored
@@ -4,7 +4,7 @@ on:
|
|||||||
workflow_dispatch:
|
workflow_dispatch:
|
||||||
inputs:
|
inputs:
|
||||||
plugin-version:
|
plugin-version:
|
||||||
description: "Kestra version"
|
description: "plugins version"
|
||||||
default: 'LATEST'
|
default: 'LATEST'
|
||||||
required: false
|
required: false
|
||||||
type: string
|
type: string
|
||||||
@@ -16,7 +16,7 @@ on:
|
|||||||
workflow_call:
|
workflow_call:
|
||||||
inputs:
|
inputs:
|
||||||
plugin-version:
|
plugin-version:
|
||||||
description: "Kestra version"
|
description: "plugins version"
|
||||||
default: 'LATEST'
|
default: 'LATEST'
|
||||||
required: false
|
required: false
|
||||||
type: string
|
type: string
|
||||||
@@ -42,21 +42,25 @@ on:
|
|||||||
SONATYPE_GPG_FILE:
|
SONATYPE_GPG_FILE:
|
||||||
description: "The Sonatype GPG file."
|
description: "The Sonatype GPG file."
|
||||||
required: true
|
required: true
|
||||||
|
GH_PERSONAL_TOKEN:
|
||||||
|
description: "GH personnal Token."
|
||||||
|
required: true
|
||||||
|
SLACK_RELEASES_WEBHOOK_URL:
|
||||||
|
description: "Slack webhook for releases channel."
|
||||||
|
required: true
|
||||||
jobs:
|
jobs:
|
||||||
build-artifacts:
|
build-artifacts:
|
||||||
name: Build - Artifacts
|
name: Build - Artifacts
|
||||||
uses: ./.github/workflows/workflow-build-artifacts.yml
|
uses: ./.github/workflows/workflow-build-artifacts.yml
|
||||||
with:
|
|
||||||
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
|
||||||
|
|
||||||
Docker:
|
Docker:
|
||||||
name: Publish Docker
|
name: Publish Docker
|
||||||
needs: build-artifacts
|
needs: build-artifacts
|
||||||
uses: ./.github/workflows/workflow-publish-docker.yml
|
uses: ./.github/workflows/workflow-publish-docker.yml
|
||||||
if: startsWith(github.ref, 'refs/heads/develop') || github.event.inputs.publish-docker == 'true'
|
if: github.ref == 'refs/heads/develop' || inputs.publish-docker == 'true'
|
||||||
with:
|
with:
|
||||||
force-download-artifact: 'false'
|
force-download-artifact: 'false'
|
||||||
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
plugin-version: ${{ inputs.plugin-version != null && inputs.plugin-version || 'LATEST' }}
|
||||||
secrets:
|
secrets:
|
||||||
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
|
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||||
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
|
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||||
@@ -78,3 +82,4 @@ jobs:
|
|||||||
uses: ./.github/workflows/workflow-github-release.yml
|
uses: ./.github/workflows/workflow-github-release.yml
|
||||||
secrets:
|
secrets:
|
||||||
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||||
|
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
|
||||||
16
.github/workflows/workflow-test.yml
vendored
16
.github/workflows/workflow-test.yml
vendored
@@ -27,7 +27,7 @@ jobs:
|
|||||||
ui: ${{ steps.changes.outputs.ui }}
|
ui: ${{ steps.changes.outputs.ui }}
|
||||||
backend: ${{ steps.changes.outputs.backend }}
|
backend: ${{ steps.changes.outputs.backend }}
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v4
|
- uses: actions/checkout@v5
|
||||||
if: "!startsWith(github.ref, 'refs/tags/v')"
|
if: "!startsWith(github.ref, 'refs/tags/v')"
|
||||||
- uses: dorny/paths-filter@v3
|
- uses: dorny/paths-filter@v3
|
||||||
if: "!startsWith(github.ref, 'refs/tags/v')"
|
if: "!startsWith(github.ref, 'refs/tags/v')"
|
||||||
@@ -84,14 +84,12 @@ jobs:
|
|||||||
name: Notify - Slack
|
name: Notify - Slack
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
needs: [ frontend, backend ]
|
needs: [ frontend, backend ]
|
||||||
if: github.event_name == 'schedule'
|
|
||||||
steps:
|
steps:
|
||||||
- name: Notify failed CI
|
- name: Notify failed CI
|
||||||
id: send-ci-failed
|
|
||||||
if: |
|
if: |
|
||||||
always() && (needs.frontend.result != 'success' ||
|
always() &&
|
||||||
needs.backend.result != 'success')
|
(needs.frontend.result != 'success' || needs.backend.result != 'success') &&
|
||||||
uses: kestra-io/actions/.github/actions/send-ci-failed@main
|
(github.ref == 'refs/heads/master' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/develop')
|
||||||
env:
|
uses: kestra-io/actions/composite/slack-status@main
|
||||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
with:
|
||||||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
|
webhook-url: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||||
|
|||||||
5
.plugins
5
.plugins
@@ -87,13 +87,18 @@
|
|||||||
#plugin-powerbi:io.kestra.plugin:plugin-powerbi:LATEST
|
#plugin-powerbi:io.kestra.plugin:plugin-powerbi:LATEST
|
||||||
#plugin-pulsar:io.kestra.plugin:plugin-pulsar:LATEST
|
#plugin-pulsar:io.kestra.plugin:plugin-pulsar:LATEST
|
||||||
#plugin-redis:io.kestra.plugin:plugin-redis:LATEST
|
#plugin-redis:io.kestra.plugin:plugin-redis:LATEST
|
||||||
|
#plugin-scripts:io.kestra.plugin:plugin-script-bun:LATEST
|
||||||
|
#plugin-scripts:io.kestra.plugin:plugin-script-deno:LATEST
|
||||||
#plugin-scripts:io.kestra.plugin:plugin-script-go:LATEST
|
#plugin-scripts:io.kestra.plugin:plugin-script-go:LATEST
|
||||||
#plugin-scripts:io.kestra.plugin:plugin-script-groovy:LATEST
|
#plugin-scripts:io.kestra.plugin:plugin-script-groovy:LATEST
|
||||||
#plugin-scripts:io.kestra.plugin:plugin-script-jbang:LATEST
|
#plugin-scripts:io.kestra.plugin:plugin-script-jbang:LATEST
|
||||||
#plugin-scripts:io.kestra.plugin:plugin-script-julia:LATEST
|
#plugin-scripts:io.kestra.plugin:plugin-script-julia:LATEST
|
||||||
#plugin-scripts:io.kestra.plugin:plugin-script-jython:LATEST
|
#plugin-scripts:io.kestra.plugin:plugin-script-jython:LATEST
|
||||||
|
#plugin-scripts:io.kestra.plugin:plugin-script-lua:LATEST
|
||||||
#plugin-scripts:io.kestra.plugin:plugin-script-nashorn:LATEST
|
#plugin-scripts:io.kestra.plugin:plugin-script-nashorn:LATEST
|
||||||
#plugin-scripts:io.kestra.plugin:plugin-script-node:LATEST
|
#plugin-scripts:io.kestra.plugin:plugin-script-node:LATEST
|
||||||
|
#plugin-scripts:io.kestra.plugin:plugin-script-perl:LATEST
|
||||||
|
#plugin-scripts:io.kestra.plugin:plugin-script-php:LATEST
|
||||||
#plugin-scripts:io.kestra.plugin:plugin-script-powershell:LATEST
|
#plugin-scripts:io.kestra.plugin:plugin-script-powershell:LATEST
|
||||||
#plugin-scripts:io.kestra.plugin:plugin-script-python:LATEST
|
#plugin-scripts:io.kestra.plugin:plugin-script-python:LATEST
|
||||||
#plugin-scripts:io.kestra.plugin:plugin-script-r:LATEST
|
#plugin-scripts:io.kestra.plugin:plugin-script-r:LATEST
|
||||||
|
|||||||
@@ -18,6 +18,10 @@ micronaut:
|
|||||||
root:
|
root:
|
||||||
paths: classpath:root
|
paths: classpath:root
|
||||||
mapping: /**
|
mapping: /**
|
||||||
|
codec:
|
||||||
|
json:
|
||||||
|
additional-types:
|
||||||
|
- application/scim+json
|
||||||
server:
|
server:
|
||||||
max-request-size: 10GB
|
max-request-size: 10GB
|
||||||
multipart:
|
multipart:
|
||||||
@@ -78,8 +82,19 @@ micronaut:
|
|||||||
type: scheduled
|
type: scheduled
|
||||||
core-pool-size: 1
|
core-pool-size: 1
|
||||||
|
|
||||||
# Disable OpenTelemetry metrics by default, users that need it must enable it and configure the collector URL.
|
|
||||||
metrics:
|
metrics:
|
||||||
|
binders:
|
||||||
|
retry:
|
||||||
|
enabled: true
|
||||||
|
netty:
|
||||||
|
queues:
|
||||||
|
enabled: true
|
||||||
|
bytebuf-allocators:
|
||||||
|
enabled: true
|
||||||
|
channels:
|
||||||
|
enabled: true
|
||||||
|
|
||||||
|
# Disable OpenTelemetry metrics by default, users that need it must enable it and configure the collector URL.
|
||||||
export:
|
export:
|
||||||
otlp:
|
otlp:
|
||||||
enabled: false
|
enabled: false
|
||||||
@@ -92,6 +107,8 @@ jackson:
|
|||||||
serialization-inclusion: non_null
|
serialization-inclusion: non_null
|
||||||
deserialization:
|
deserialization:
|
||||||
FAIL_ON_UNKNOWN_PROPERTIES: false
|
FAIL_ON_UNKNOWN_PROPERTIES: false
|
||||||
|
mapper:
|
||||||
|
ACCEPT_CASE_INSENSITIVE_ENUMS: true
|
||||||
|
|
||||||
endpoints:
|
endpoints:
|
||||||
all:
|
all:
|
||||||
@@ -100,6 +117,10 @@ endpoints:
|
|||||||
sensitive: false
|
sensitive: false
|
||||||
health:
|
health:
|
||||||
details-visible: ANONYMOUS
|
details-visible: ANONYMOUS
|
||||||
|
disk-space:
|
||||||
|
enabled: false
|
||||||
|
discovery-client:
|
||||||
|
enabled: false
|
||||||
loggers:
|
loggers:
|
||||||
write-sensitive: false
|
write-sensitive: false
|
||||||
env:
|
env:
|
||||||
@@ -133,12 +154,46 @@ kestra:
|
|||||||
tutorial-flows:
|
tutorial-flows:
|
||||||
# Automatically loads all tutorial flows at startup.
|
# Automatically loads all tutorial flows at startup.
|
||||||
enabled: true
|
enabled: true
|
||||||
|
|
||||||
retries:
|
retries:
|
||||||
attempts: 5
|
attempts: 5
|
||||||
multiplier: 2.0
|
multiplier: 2.0
|
||||||
delay: 1s
|
delay: 1s
|
||||||
maxDelay: ""
|
maxDelay: ""
|
||||||
|
|
||||||
|
server:
|
||||||
|
basic-auth:
|
||||||
|
# These URLs will not be authenticated, by default we open some of the Micronaut default endpoints but not all for security reasons
|
||||||
|
open-urls:
|
||||||
|
- "/ping"
|
||||||
|
- "/api/v1/executions/webhook/"
|
||||||
|
- "/api/v1/main/executions/webhook/"
|
||||||
|
- "/api/v1/*/executions/webhook/"
|
||||||
|
|
||||||
|
preview:
|
||||||
|
initial-rows: 100
|
||||||
|
max-rows: 5000
|
||||||
|
|
||||||
|
# The expected time for this server to complete all its tasks before initiating a graceful shutdown.
|
||||||
|
terminationGracePeriod: 5m
|
||||||
|
workerTaskRestartStrategy: AFTER_TERMINATION_GRACE_PERIOD
|
||||||
|
# Configuration for Liveness and Heartbeat mechanism between servers.
|
||||||
|
liveness:
|
||||||
|
enabled: true
|
||||||
|
# The expected time between liveness probe.
|
||||||
|
interval: 10s
|
||||||
|
# The timeout used to detect service failures.
|
||||||
|
timeout: 1m
|
||||||
|
# The time to wait before executing a liveness probe.
|
||||||
|
initialDelay: 1m
|
||||||
|
# The expected time between service heartbeats.
|
||||||
|
heartbeatInterval: 3s
|
||||||
|
service:
|
||||||
|
purge:
|
||||||
|
initial-delay: 1h
|
||||||
|
fixed-delay: 1d
|
||||||
|
retention: 30d
|
||||||
|
|
||||||
jdbc:
|
jdbc:
|
||||||
queues:
|
queues:
|
||||||
min-poll-interval: 25ms
|
min-poll-interval: 25ms
|
||||||
@@ -150,7 +205,7 @@ kestra:
|
|||||||
fixed-delay: 1h
|
fixed-delay: 1h
|
||||||
retention: 7d
|
retention: 7d
|
||||||
types:
|
types:
|
||||||
- type : io.kestra.core.models.executions.LogEntry
|
- type: io.kestra.core.models.executions.LogEntry
|
||||||
retention: 1h
|
retention: 1h
|
||||||
- type: io.kestra.core.models.executions.MetricEntry
|
- type: io.kestra.core.models.executions.MetricEntry
|
||||||
retention: 1h
|
retention: 1h
|
||||||
@@ -182,37 +237,12 @@ kestra:
|
|||||||
traces:
|
traces:
|
||||||
root: DISABLED
|
root: DISABLED
|
||||||
|
|
||||||
server:
|
ui-anonymous-usage-report:
|
||||||
basic-auth:
|
enabled: true
|
||||||
# These URLs will not be authenticated, by default we open some of the Micronaut default endpoints but not all for security reasons
|
|
||||||
open-urls:
|
|
||||||
- "/ping"
|
|
||||||
- "/api/v1/executions/webhook/"
|
|
||||||
preview:
|
|
||||||
initial-rows: 100
|
|
||||||
max-rows: 5000
|
|
||||||
# The expected time for this server to complete all its tasks before initiating a graceful shutdown.
|
|
||||||
terminationGracePeriod: 5m
|
|
||||||
workerTaskRestartStrategy: AFTER_TERMINATION_GRACE_PERIOD
|
|
||||||
# Configuration for Liveness and Heartbeat mechanism between servers.
|
|
||||||
liveness:
|
|
||||||
enabled: true
|
|
||||||
# The expected time between liveness probe.
|
|
||||||
interval: 10s
|
|
||||||
# The timeout used to detect service failures.
|
|
||||||
timeout: 1m
|
|
||||||
# The time to wait before executing a liveness probe.
|
|
||||||
initialDelay: 1m
|
|
||||||
# The expected time between service heartbeats.
|
|
||||||
heartbeatInterval: 3s
|
|
||||||
service:
|
|
||||||
purge:
|
|
||||||
initial-delay: 1h
|
|
||||||
fixed-delay: 1d
|
|
||||||
retention: 30d
|
|
||||||
anonymous-usage-report:
|
anonymous-usage-report:
|
||||||
enabled: true
|
enabled: true
|
||||||
uri: https://api.kestra.io/v1/reports/usages
|
uri: https://api.kestra.io/v1/reports/server-events
|
||||||
initial-delay: 5m
|
initial-delay: 5m
|
||||||
fixed-delay: 1h
|
fixed-delay: 1h
|
||||||
|
|
||||||
|
|||||||
@@ -63,6 +63,10 @@ dependencies {
|
|||||||
exclude group: 'com.fasterxml.jackson.core'
|
exclude group: 'com.fasterxml.jackson.core'
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// micrometer
|
||||||
|
implementation "io.micronaut.micrometer:micronaut-micrometer-observation"
|
||||||
|
implementation 'io.micrometer:micrometer-java21'
|
||||||
|
|
||||||
// test
|
// test
|
||||||
testAnnotationProcessor project(':processor')
|
testAnnotationProcessor project(':processor')
|
||||||
testImplementation project(':tests')
|
testImplementation project(':tests')
|
||||||
|
|||||||
@@ -122,12 +122,13 @@ public class JsonSchemaGenerator {
|
|||||||
if (jsonNode instanceof ObjectNode clazzSchema && clazzSchema.get("required") instanceof ArrayNode requiredPropsNode && clazzSchema.get("properties") instanceof ObjectNode properties) {
|
if (jsonNode instanceof ObjectNode clazzSchema && clazzSchema.get("required") instanceof ArrayNode requiredPropsNode && clazzSchema.get("properties") instanceof ObjectNode properties) {
|
||||||
List<String> requiredFieldValues = StreamSupport.stream(requiredPropsNode.spliterator(), false)
|
List<String> requiredFieldValues = StreamSupport.stream(requiredPropsNode.spliterator(), false)
|
||||||
.map(JsonNode::asText)
|
.map(JsonNode::asText)
|
||||||
.toList();
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
properties.fields().forEachRemaining(e -> {
|
properties.fields().forEachRemaining(e -> {
|
||||||
int indexInRequiredArray = requiredFieldValues.indexOf(e.getKey());
|
int indexInRequiredArray = requiredFieldValues.indexOf(e.getKey());
|
||||||
if (indexInRequiredArray != -1 && e.getValue() instanceof ObjectNode valueNode && valueNode.has("default")) {
|
if (indexInRequiredArray != -1 && e.getValue() instanceof ObjectNode valueNode && valueNode.has("default")) {
|
||||||
requiredPropsNode.remove(indexInRequiredArray);
|
requiredPropsNode.remove(indexInRequiredArray);
|
||||||
|
requiredFieldValues.remove(indexInRequiredArray);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -6,8 +6,14 @@ import io.kestra.core.http.HttpRequest;
|
|||||||
import io.kestra.core.http.HttpResponse;
|
import io.kestra.core.http.HttpResponse;
|
||||||
import io.kestra.core.http.client.apache.*;
|
import io.kestra.core.http.client.apache.*;
|
||||||
import io.kestra.core.http.client.configurations.HttpConfiguration;
|
import io.kestra.core.http.client.configurations.HttpConfiguration;
|
||||||
|
import io.kestra.core.runners.DefaultRunContext;
|
||||||
import io.kestra.core.runners.RunContext;
|
import io.kestra.core.runners.RunContext;
|
||||||
import io.kestra.core.serializers.JacksonMapper;
|
import io.kestra.core.serializers.JacksonMapper;
|
||||||
|
import io.micrometer.common.KeyValues;
|
||||||
|
import io.micrometer.core.instrument.binder.httpcomponents.hc5.ApacheHttpClientContext;
|
||||||
|
import io.micrometer.core.instrument.binder.httpcomponents.hc5.DefaultApacheHttpClientObservationConvention;
|
||||||
|
import io.micrometer.core.instrument.binder.httpcomponents.hc5.ObservationExecChainHandler;
|
||||||
|
import io.micrometer.observation.ObservationRegistry;
|
||||||
import io.micronaut.http.MediaType;
|
import io.micronaut.http.MediaType;
|
||||||
import jakarta.annotation.Nullable;
|
import jakarta.annotation.Nullable;
|
||||||
import lombok.Builder;
|
import lombok.Builder;
|
||||||
@@ -16,6 +22,7 @@ import org.apache.commons.lang3.ArrayUtils;
|
|||||||
import org.apache.hc.client5.http.ContextBuilder;
|
import org.apache.hc.client5.http.ContextBuilder;
|
||||||
import org.apache.hc.client5.http.auth.*;
|
import org.apache.hc.client5.http.auth.*;
|
||||||
import org.apache.hc.client5.http.config.ConnectionConfig;
|
import org.apache.hc.client5.http.config.ConnectionConfig;
|
||||||
|
import org.apache.hc.client5.http.impl.ChainElement;
|
||||||
import org.apache.hc.client5.http.impl.DefaultAuthenticationStrategy;
|
import org.apache.hc.client5.http.impl.DefaultAuthenticationStrategy;
|
||||||
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
|
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
|
||||||
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
|
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
|
||||||
@@ -50,11 +57,16 @@ public class HttpClient implements Closeable {
|
|||||||
private transient CloseableHttpClient client;
|
private transient CloseableHttpClient client;
|
||||||
private final RunContext runContext;
|
private final RunContext runContext;
|
||||||
private final HttpConfiguration configuration;
|
private final HttpConfiguration configuration;
|
||||||
|
private ObservationRegistry observationRegistry;
|
||||||
|
|
||||||
@Builder
|
@Builder
|
||||||
public HttpClient(RunContext runContext, @Nullable HttpConfiguration configuration) throws IllegalVariableEvaluationException {
|
public HttpClient(RunContext runContext, @Nullable HttpConfiguration configuration) throws IllegalVariableEvaluationException {
|
||||||
this.runContext = runContext;
|
this.runContext = runContext;
|
||||||
this.configuration = configuration == null ? HttpConfiguration.builder().build() : configuration;
|
this.configuration = configuration == null ? HttpConfiguration.builder().build() : configuration;
|
||||||
|
if (runContext instanceof DefaultRunContext defaultRunContext) {
|
||||||
|
this.observationRegistry = defaultRunContext.getApplicationContext().findBean(ObservationRegistry.class).orElse(null);
|
||||||
|
}
|
||||||
|
|
||||||
this.client = this.createClient();
|
this.client = this.createClient();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -67,6 +79,13 @@ public class HttpClient implements Closeable {
|
|||||||
.disableDefaultUserAgent()
|
.disableDefaultUserAgent()
|
||||||
.setUserAgent("Kestra");
|
.setUserAgent("Kestra");
|
||||||
|
|
||||||
|
if (observationRegistry != null) {
|
||||||
|
// micrometer, must be placed before the retry strategy (see https://docs.micrometer.io/micrometer/reference/reference/httpcomponents.html#_retry_strategy_considerations)
|
||||||
|
builder.addExecInterceptorAfter(ChainElement.RETRY.name(), "micrometer",
|
||||||
|
new ObservationExecChainHandler(observationRegistry, new CustomApacheHttpClientObservationConvention())
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
// logger
|
// logger
|
||||||
if (this.configuration.getLogs() != null && this.configuration.getLogs().length > 0) {
|
if (this.configuration.getLogs() != null && this.configuration.getLogs().length > 0) {
|
||||||
if (ArrayUtils.contains(this.configuration.getLogs(), HttpConfiguration.LoggingType.REQUEST_HEADERS) ||
|
if (ArrayUtils.contains(this.configuration.getLogs(), HttpConfiguration.LoggingType.REQUEST_HEADERS) ||
|
||||||
@@ -297,4 +316,14 @@ public class HttpClient implements Closeable {
|
|||||||
this.client.close();
|
this.client.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class CustomApacheHttpClientObservationConvention extends DefaultApacheHttpClientObservationConvention {
|
||||||
|
@Override
|
||||||
|
public KeyValues getLowCardinalityKeyValues(ApacheHttpClientContext context) {
|
||||||
|
return KeyValues.concat(
|
||||||
|
super.getLowCardinalityKeyValues(context),
|
||||||
|
KeyValues.of("type", "core-client")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,34 @@
|
|||||||
|
package io.kestra.core.metrics;
|
||||||
|
|
||||||
|
import io.micrometer.core.instrument.binder.jvm.JvmThreadDeadlockMetrics;
|
||||||
|
import io.micrometer.java21.instrument.binder.jdk.VirtualThreadMetrics;
|
||||||
|
import io.micronaut.configuration.metrics.annotation.RequiresMetrics;
|
||||||
|
import io.micronaut.context.annotation.Bean;
|
||||||
|
import io.micronaut.context.annotation.Factory;
|
||||||
|
import io.micronaut.context.annotation.Primary;
|
||||||
|
import io.micronaut.context.annotation.Requires;
|
||||||
|
import jakarta.inject.Singleton;
|
||||||
|
|
||||||
|
import static io.micronaut.configuration.metrics.micrometer.MeterRegistryFactory.MICRONAUT_METRICS_BINDERS;
|
||||||
|
import static io.micronaut.core.util.StringUtils.FALSE;
|
||||||
|
|
||||||
|
@Factory
|
||||||
|
@RequiresMetrics
|
||||||
|
|
||||||
|
public class MeterRegistryBinderFactory {
|
||||||
|
@Bean
|
||||||
|
@Primary
|
||||||
|
@Singleton
|
||||||
|
@Requires(property = MICRONAUT_METRICS_BINDERS + ".jvm.enabled", notEquals = FALSE)
|
||||||
|
public VirtualThreadMetrics virtualThreadMetrics() {
|
||||||
|
return new VirtualThreadMetrics();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@Primary
|
||||||
|
@Singleton
|
||||||
|
@Requires(property = MICRONAUT_METRICS_BINDERS + ".jvm.enabled", notEquals = FALSE)
|
||||||
|
public JvmThreadDeadlockMetrics threadDeadlockMetricsMetrics() {
|
||||||
|
return new JvmThreadDeadlockMetrics();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1040,6 +1040,16 @@ public class Execution implements DeletedInterface, TenantInterface {
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Find all children of this {@link TaskRun}.
|
||||||
|
*/
|
||||||
|
public List<TaskRun> findChildren(TaskRun parentTaskRun) {
|
||||||
|
return taskRunList.stream()
|
||||||
|
.filter(taskRun -> parentTaskRun.getId().equals(taskRun.getParentTaskRunId()))
|
||||||
|
.toList();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public List<String> findParentsValues(TaskRun taskRun, boolean withCurrent) {
|
public List<String> findParentsValues(TaskRun taskRun, boolean withCurrent) {
|
||||||
return (withCurrent ?
|
return (withCurrent ?
|
||||||
Stream.concat(findParents(taskRun).stream(), Stream.of(taskRun)) :
|
Stream.concat(findParents(taskRun).stream(), Stream.of(taskRun)) :
|
||||||
|
|||||||
@@ -116,7 +116,7 @@ public class State {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public Instant maxDate() {
|
public Instant maxDate() {
|
||||||
if (this.histories.size() == 0) {
|
if (this.histories.isEmpty()) {
|
||||||
return Instant.now();
|
return Instant.now();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -124,7 +124,7 @@ public class State {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public Instant minDate() {
|
public Instant minDate() {
|
||||||
if (this.histories.size() == 0) {
|
if (this.histories.isEmpty()) {
|
||||||
return Instant.now();
|
return Instant.now();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -173,6 +173,11 @@ public class State {
|
|||||||
return this.current.isBreakpoint();
|
return this.current.isBreakpoint();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@JsonIgnore
|
||||||
|
public boolean isQueued() {
|
||||||
|
return this.current.isQueued();
|
||||||
|
}
|
||||||
|
|
||||||
@JsonIgnore
|
@JsonIgnore
|
||||||
public boolean isRetrying() {
|
public boolean isRetrying() {
|
||||||
return this.current.isRetrying();
|
return this.current.isRetrying();
|
||||||
@@ -206,6 +211,14 @@ public class State {
|
|||||||
return this.histories.get(this.histories.size() - 2).state.isPaused();
|
return this.histories.get(this.histories.size() - 2).state.isPaused();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return true if the execution has failed, then was restarted.
|
||||||
|
* This is to disambiguate between a RESTARTED after PAUSED and RESTARTED after FAILED state.
|
||||||
|
*/
|
||||||
|
public boolean failedThenRestarted() {
|
||||||
|
return this.current == Type.RESTARTED && this.histories.get(this.histories.size() - 2).state.isFailed();
|
||||||
|
}
|
||||||
|
|
||||||
@Introspected
|
@Introspected
|
||||||
public enum Type {
|
public enum Type {
|
||||||
CREATED,
|
CREATED,
|
||||||
@@ -264,6 +277,10 @@ public class State {
|
|||||||
return this == Type.KILLED;
|
return this == Type.KILLED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isQueued(){
|
||||||
|
return this == Type.QUEUED;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return states that are terminal to an execution
|
* @return states that are terminal to an execution
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -69,6 +69,19 @@ public class Property<T> {
|
|||||||
return expression;
|
return expression;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a new {@link Property} with no cached rendered value,
|
||||||
|
* so that the next render will evaluate its original Pebble expression.
|
||||||
|
* <p>
|
||||||
|
* The returned property will still cache its rendered result.
|
||||||
|
* To re-evaluate on a subsequent render, call {@code skipCache()} again.
|
||||||
|
*
|
||||||
|
* @return a new {@link Property} without a pre-rendered value
|
||||||
|
*/
|
||||||
|
public Property<T> skipCache() {
|
||||||
|
return Property.ofExpression(expression);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Build a new Property object with a value already set.<br>
|
* Build a new Property object with a value already set.<br>
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import io.kestra.core.models.Pauseable;
|
|||||||
import io.kestra.core.utils.Either;
|
import io.kestra.core.utils.Either;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
|
import java.util.List;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
public interface QueueInterface<T> extends Closeable, Pauseable {
|
public interface QueueInterface<T> extends Closeable, Pauseable {
|
||||||
@@ -18,7 +19,15 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
|
|||||||
emitAsync(null, message);
|
emitAsync(null, message);
|
||||||
}
|
}
|
||||||
|
|
||||||
void emitAsync(String consumerGroup, T message) throws QueueException;
|
default void emitAsync(String consumerGroup, T message) throws QueueException {
|
||||||
|
emitAsync(consumerGroup, List.of(message));
|
||||||
|
}
|
||||||
|
|
||||||
|
default void emitAsync(List<T> messages) throws QueueException {
|
||||||
|
emitAsync(null, messages);
|
||||||
|
}
|
||||||
|
|
||||||
|
void emitAsync(String consumerGroup, List<T> messages) throws QueueException;
|
||||||
|
|
||||||
default void delete(T message) throws QueueException {
|
default void delete(T message) throws QueueException {
|
||||||
delete(null, message);
|
delete(null, message);
|
||||||
@@ -27,7 +36,7 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
|
|||||||
void delete(String consumerGroup, T message) throws QueueException;
|
void delete(String consumerGroup, T message) throws QueueException;
|
||||||
|
|
||||||
default Runnable receive(Consumer<Either<T, DeserializationException>> consumer) {
|
default Runnable receive(Consumer<Either<T, DeserializationException>> consumer) {
|
||||||
return receive((String) null, consumer);
|
return receive(null, consumer, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
default Runnable receive(String consumerGroup, Consumer<Either<T, DeserializationException>> consumer) {
|
default Runnable receive(String consumerGroup, Consumer<Either<T, DeserializationException>> consumer) {
|
||||||
|
|||||||
@@ -0,0 +1,12 @@
|
|||||||
|
package io.kestra.core.queues;
|
||||||
|
|
||||||
|
import java.io.Serial;
|
||||||
|
|
||||||
|
public class UnsupportedMessageException extends QueueException {
|
||||||
|
@Serial
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
public UnsupportedMessageException(String message, Throwable cause) {
|
||||||
|
super(message, cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -161,7 +161,7 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
|
|||||||
}
|
}
|
||||||
|
|
||||||
List<Execution> lastExecutions(
|
List<Execution> lastExecutions(
|
||||||
@Nullable String tenantId,
|
String tenantId,
|
||||||
@Nullable List<FlowFilter> flows
|
@Nullable List<FlowFilter> flows
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -86,7 +86,7 @@ public class Executor {
|
|||||||
|
|
||||||
public Boolean canBeProcessed() {
|
public Boolean canBeProcessed() {
|
||||||
return !(this.getException() != null || this.getFlow() == null || this.getFlow() instanceof FlowWithException || this.getFlow().getTasks() == null ||
|
return !(this.getException() != null || this.getFlow() == null || this.getFlow() instanceof FlowWithException || this.getFlow().getTasks() == null ||
|
||||||
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint());
|
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint() || this.getExecution().getState().isQueued());
|
||||||
}
|
}
|
||||||
|
|
||||||
public Executor withFlow(FlowWithSource flow) {
|
public Executor withFlow(FlowWithSource flow) {
|
||||||
|
|||||||
@@ -237,9 +237,9 @@ public class ExecutorService {
|
|||||||
try {
|
try {
|
||||||
state = flowableParent.resolveState(runContext, execution, parentTaskRun);
|
state = flowableParent.resolveState(runContext, execution, parentTaskRun);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// This will lead to the next task being still executed but at least Kestra will not crash.
|
// This will lead to the next task being still executed, but at least Kestra will not crash.
|
||||||
// This is the best we can do, Flowable task should not fail, so it's a kind of panic mode.
|
// This is the best we can do, Flowable task should not fail, so it's a kind of panic mode.
|
||||||
runContext.logger().error("Unable to resolve state from the Flowable task: " + e.getMessage(), e);
|
runContext.logger().error("Unable to resolve state from the Flowable task: {}", e.getMessage(), e);
|
||||||
state = Optional.of(State.Type.FAILED);
|
state = Optional.of(State.Type.FAILED);
|
||||||
}
|
}
|
||||||
Optional<WorkerTaskResult> endedTask = childWorkerTaskTypeToWorkerTask(
|
Optional<WorkerTaskResult> endedTask = childWorkerTaskTypeToWorkerTask(
|
||||||
@@ -589,6 +589,23 @@ public class ExecutorService {
|
|||||||
list = list.stream().filter(workerTaskResult -> !workerTaskResult.getTaskRun().getId().equals(taskRun.getParentTaskRunId()))
|
list = list.stream().filter(workerTaskResult -> !workerTaskResult.getTaskRun().getId().equals(taskRun.getParentTaskRunId()))
|
||||||
.collect(Collectors.toCollection(ArrayList::new));
|
.collect(Collectors.toCollection(ArrayList::new));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If the task is a flowable and its terminated, check that all children are terminated.
|
||||||
|
// This may not be the case for parallel flowable tasks like Parallel, Dag, ForEach...
|
||||||
|
// After a fail task, some child flowable may not be correctly terminated.
|
||||||
|
if (task instanceof FlowableTask<?> && taskRun.getState().isTerminated()) {
|
||||||
|
List<TaskRun> updated = executor.getExecution().findChildren(taskRun).stream()
|
||||||
|
.filter(child -> !child.getState().isTerminated())
|
||||||
|
.map(throwFunction(child -> child.withState(taskRun.getState().getCurrent())))
|
||||||
|
.toList();
|
||||||
|
if (!updated.isEmpty()) {
|
||||||
|
Execution execution = executor.getExecution();
|
||||||
|
for (TaskRun child : updated) {
|
||||||
|
execution = execution.withTaskRun(child);
|
||||||
|
}
|
||||||
|
executor = executor.withExecution(execution, "handledTerminatedFlowableTasks");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
metricRegistry
|
metricRegistry
|
||||||
@@ -1045,6 +1062,17 @@ public class ExecutorService {
|
|||||||
var executionUpdatingTask = (ExecutionUpdatableTask) workerTask.getTask();
|
var executionUpdatingTask = (ExecutionUpdatableTask) workerTask.getTask();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
// handle runIf
|
||||||
|
if (!TruthUtils.isTruthy(workerTask.getRunContext().render(workerTask.getTask().getRunIf()))) {
|
||||||
|
executor.withExecution(
|
||||||
|
executor
|
||||||
|
.getExecution()
|
||||||
|
.withTaskRun(workerTask.getTaskRun().withState(State.Type.SKIPPED)),
|
||||||
|
"handleExecutionUpdatingTaskSkipped"
|
||||||
|
);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
executor.withExecution(
|
executor.withExecution(
|
||||||
executionUpdatingTask.update(executor.getExecution(), workerTask.getRunContext())
|
executionUpdatingTask.update(executor.getExecution(), workerTask.getRunContext())
|
||||||
.withTaskRun(workerTask.getTaskRun().withState(State.Type.RUNNING)),
|
.withTaskRun(workerTask.getTaskRun().withState(State.Type.RUNNING)),
|
||||||
@@ -1144,7 +1172,7 @@ public class ExecutorService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return taskRuns.size() > execution.getTaskRunList().size() ? execution.withTaskRunList(taskRuns) : null;
|
return taskRuns.size() > ListUtils.emptyOnNull(execution.getTaskRunList()).size() ? execution.withTaskRunList(taskRuns) : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean canBePurged(final Executor executor) {
|
public boolean canBePurged(final Executor executor) {
|
||||||
|
|||||||
@@ -29,7 +29,7 @@ import java.util.function.Supplier;
|
|||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
||||||
private static final int MAX_MESSAGE_LENGTH = 1024 * 10;
|
private static final int MAX_MESSAGE_LENGTH = 1024 * 15;
|
||||||
public static final String ORIGINAL_TIMESTAMP_KEY = "originalTimestamp";
|
public static final String ORIGINAL_TIMESTAMP_KEY = "originalTimestamp";
|
||||||
|
|
||||||
private final String loggerName;
|
private final String loggerName;
|
||||||
@@ -80,7 +80,6 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
List<LogEntry> result = new ArrayList<>();
|
List<LogEntry> result = new ArrayList<>();
|
||||||
long i = 0;
|
|
||||||
for (String s : split) {
|
for (String s : split) {
|
||||||
result.add(LogEntry.builder()
|
result.add(LogEntry.builder()
|
||||||
.namespace(logEntry.getNamespace())
|
.namespace(logEntry.getNamespace())
|
||||||
@@ -98,7 +97,6 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
|||||||
.thread(event.getThreadName())
|
.thread(event.getThreadName())
|
||||||
.build()
|
.build()
|
||||||
);
|
);
|
||||||
i++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
@@ -331,14 +329,11 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
|||||||
protected void append(ILoggingEvent e) {
|
protected void append(ILoggingEvent e) {
|
||||||
e = this.transform(e);
|
e = this.transform(e);
|
||||||
|
|
||||||
logEntries(e, logEntry)
|
try {
|
||||||
.forEach(l -> {
|
logQueue.emitAsync(logEntries(e, logEntry));
|
||||||
try {
|
} catch (QueueException ex) {
|
||||||
logQueue.emitAsync(l);
|
log.warn("Unable to emit logQueue", ex);
|
||||||
} catch (QueueException ex) {
|
}
|
||||||
log.warn("Unable to emit logQueue", ex);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -4,15 +4,11 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
|||||||
import io.kestra.core.models.property.Property;
|
import io.kestra.core.models.property.Property;
|
||||||
import io.kestra.core.models.tasks.Task;
|
import io.kestra.core.models.tasks.Task;
|
||||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||||
import jakarta.validation.ConstraintViolation;
|
|
||||||
import jakarta.validation.ConstraintViolationException;
|
|
||||||
import jakarta.validation.Validator;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||||
|
|
||||||
@@ -28,11 +24,18 @@ public class RunContextProperty<T> {
|
|||||||
private final Task task;
|
private final Task task;
|
||||||
private final AbstractTrigger trigger;
|
private final AbstractTrigger trigger;
|
||||||
|
|
||||||
|
private final boolean skipCache;
|
||||||
|
|
||||||
RunContextProperty(Property<T> property, RunContext runContext) {
|
RunContextProperty(Property<T> property, RunContext runContext) {
|
||||||
|
this(property, runContext, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
RunContextProperty(Property<T> property, RunContext runContext, boolean skipCache) {
|
||||||
this.property = property;
|
this.property = property;
|
||||||
this.runContext = runContext;
|
this.runContext = runContext;
|
||||||
this.task = ((DefaultRunContext) runContext).getTask();
|
this.task = ((DefaultRunContext) runContext).getTask();
|
||||||
this.trigger = ((DefaultRunContext) runContext).getTrigger();
|
this.trigger = ((DefaultRunContext) runContext).getTrigger();
|
||||||
|
this.skipCache = skipCache;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void validate() {
|
private void validate() {
|
||||||
@@ -46,6 +49,19 @@ public class RunContextProperty<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a new {@link RunContextProperty} that will always be rendered by evaluating
|
||||||
|
* its original Pebble expression, without using any previously cached value.
|
||||||
|
* <p>
|
||||||
|
* This ensures that each time the property is rendered, the underlying
|
||||||
|
* expression is re-evaluated to produce a fresh result.
|
||||||
|
*
|
||||||
|
* @return a new {@link Property} that bypasses the cache
|
||||||
|
*/
|
||||||
|
public RunContextProperty<T> skipCache() {
|
||||||
|
return new RunContextProperty<>(this.property, this.runContext, true);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Render a property then convert it to its target type and validate it.<br>
|
* Render a property then convert it to its target type and validate it.<br>
|
||||||
*
|
*
|
||||||
@@ -55,7 +71,7 @@ public class RunContextProperty<T> {
|
|||||||
* Warning, due to the caching mechanism, this method is not thread-safe.
|
* Warning, due to the caching mechanism, this method is not thread-safe.
|
||||||
*/
|
*/
|
||||||
public Optional<T> as(Class<T> clazz) throws IllegalVariableEvaluationException {
|
public Optional<T> as(Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||||
var as = Optional.ofNullable(this.property)
|
var as = Optional.ofNullable(getProperty())
|
||||||
.map(throwFunction(prop -> Property.as(prop, this.runContext, clazz)));
|
.map(throwFunction(prop -> Property.as(prop, this.runContext, clazz)));
|
||||||
|
|
||||||
validate();
|
validate();
|
||||||
@@ -71,7 +87,7 @@ public class RunContextProperty<T> {
|
|||||||
* Warning, due to the caching mechanism, this method is not thread-safe.
|
* Warning, due to the caching mechanism, this method is not thread-safe.
|
||||||
*/
|
*/
|
||||||
public Optional<T> as(Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
public Optional<T> as(Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||||
var as = Optional.ofNullable(this.property)
|
var as = Optional.ofNullable(getProperty())
|
||||||
.map(throwFunction(prop -> Property.as(prop, this.runContext, clazz, variables)));
|
.map(throwFunction(prop -> Property.as(prop, this.runContext, clazz, variables)));
|
||||||
|
|
||||||
validate();
|
validate();
|
||||||
@@ -89,7 +105,7 @@ public class RunContextProperty<T> {
|
|||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public <I> T asList(Class<I> itemClazz) throws IllegalVariableEvaluationException {
|
public <I> T asList(Class<I> itemClazz) throws IllegalVariableEvaluationException {
|
||||||
var as = Optional.ofNullable(this.property)
|
var as = Optional.ofNullable(getProperty())
|
||||||
.map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz)))
|
.map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz)))
|
||||||
.orElse((T) Collections.emptyList());
|
.orElse((T) Collections.emptyList());
|
||||||
|
|
||||||
@@ -108,7 +124,7 @@ public class RunContextProperty<T> {
|
|||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public <I> T asList(Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
public <I> T asList(Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||||
var as = Optional.ofNullable(this.property)
|
var as = Optional.ofNullable(getProperty())
|
||||||
.map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz, variables)))
|
.map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz, variables)))
|
||||||
.orElse((T) Collections.emptyList());
|
.orElse((T) Collections.emptyList());
|
||||||
|
|
||||||
@@ -127,7 +143,7 @@ public class RunContextProperty<T> {
|
|||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
|
public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
|
||||||
var as = Optional.ofNullable(this.property)
|
var as = Optional.ofNullable(getProperty())
|
||||||
.map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass)))
|
.map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass)))
|
||||||
.orElse((T) Collections.emptyMap());
|
.orElse((T) Collections.emptyMap());
|
||||||
|
|
||||||
@@ -146,11 +162,15 @@ public class RunContextProperty<T> {
|
|||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||||
var as = Optional.ofNullable(this.property)
|
var as = Optional.ofNullable(getProperty())
|
||||||
.map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass, variables)))
|
.map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass, variables)))
|
||||||
.orElse((T) Collections.emptyMap());
|
.orElse((T) Collections.emptyMap());
|
||||||
|
|
||||||
validate();
|
validate();
|
||||||
return as;
|
return as;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Property<T> getProperty() {
|
||||||
|
return skipCache ? this.property.skipCache() : this.property;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -45,7 +45,7 @@ final class Secret {
|
|||||||
for (var entry: data.entrySet()) {
|
for (var entry: data.entrySet()) {
|
||||||
if (entry.getValue() instanceof Map map) {
|
if (entry.getValue() instanceof Map map) {
|
||||||
// if some value are of type EncryptedString we decode them and replace the object
|
// if some value are of type EncryptedString we decode them and replace the object
|
||||||
if (EncryptedString.TYPE.equalsIgnoreCase((String)map.get("type"))) {
|
if (map.get("type") instanceof String typeStr && EncryptedString.TYPE.equalsIgnoreCase(typeStr)) {
|
||||||
try {
|
try {
|
||||||
String decoded = decrypt((String) map.get("value"));
|
String decoded = decrypt((String) map.get("value"));
|
||||||
decryptedMap.put(entry.getKey(), decoded);
|
decryptedMap.put(entry.getKey(), decoded);
|
||||||
|
|||||||
@@ -508,14 +508,11 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
|||||||
Execution execution = workerTrigger.getTrigger().isFailOnTriggerError() ? TriggerService.generateExecution(workerTrigger.getTrigger(), workerTrigger.getConditionContext(), workerTrigger.getTriggerContext(), (Output) null)
|
Execution execution = workerTrigger.getTrigger().isFailOnTriggerError() ? TriggerService.generateExecution(workerTrigger.getTrigger(), workerTrigger.getConditionContext(), workerTrigger.getTriggerContext(), (Output) null)
|
||||||
.withState(FAILED) : null;
|
.withState(FAILED) : null;
|
||||||
if (execution != null) {
|
if (execution != null) {
|
||||||
RunContextLogger.logEntries(Execution.loggingEventFromException(e), LogEntry.of(execution))
|
try {
|
||||||
.forEach(log -> {
|
logQueue.emitAsync(RunContextLogger.logEntries(Execution.loggingEventFromException(e), LogEntry.of(execution)));
|
||||||
try {
|
} catch (QueueException ex) {
|
||||||
logQueue.emitAsync(log);
|
// fail silently
|
||||||
} catch (QueueException ex) {
|
}
|
||||||
// fail silently
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
this.workerTriggerResultQueue.emit(
|
this.workerTriggerResultQueue.emit(
|
||||||
WorkerTriggerResult.builder()
|
WorkerTriggerResult.builder()
|
||||||
@@ -764,6 +761,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
|||||||
workerTask = workerTask.withTaskRun(workerTask.getTaskRun().withState(state));
|
workerTask = workerTask.withTaskRun(workerTask.getTaskRun().withState(state));
|
||||||
|
|
||||||
WorkerTaskResult workerTaskResult = new WorkerTaskResult(workerTask.getTaskRun(), dynamicTaskRuns);
|
WorkerTaskResult workerTaskResult = new WorkerTaskResult(workerTask.getTaskRun(), dynamicTaskRuns);
|
||||||
|
|
||||||
this.workerTaskResultQueue.emit(workerTaskResult);
|
this.workerTaskResultQueue.emit(workerTaskResult);
|
||||||
|
|
||||||
// upload the cache file, hash may not be present if we didn't succeed in computing it
|
// upload the cache file, hash may not be present if we didn't succeed in computing it
|
||||||
@@ -796,6 +794,10 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
|||||||
// If it's a message too big, we remove the outputs
|
// If it's a message too big, we remove the outputs
|
||||||
failed = failed.withOutputs(Variables.empty());
|
failed = failed.withOutputs(Variables.empty());
|
||||||
}
|
}
|
||||||
|
if (e instanceof UnsupportedMessageException) {
|
||||||
|
// we expect the offending char is in the output so we remove it
|
||||||
|
failed = failed.withOutputs(Variables.empty());
|
||||||
|
}
|
||||||
WorkerTaskResult workerTaskResult = new WorkerTaskResult(failed);
|
WorkerTaskResult workerTaskResult = new WorkerTaskResult(failed);
|
||||||
RunContextLogger contextLogger = runContextLoggerFactory.create(workerTask);
|
RunContextLogger contextLogger = runContextLoggerFactory.create(workerTask);
|
||||||
contextLogger.logger().error("Unable to emit the worker task result to the queue: {}", e.getMessage(), e);
|
contextLogger.logger().error("Unable to emit the worker task result to the queue: {}", e.getMessage(), e);
|
||||||
@@ -818,7 +820,11 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
|||||||
private Optional<String> hashTask(RunContext runContext, Task task) {
|
private Optional<String> hashTask(RunContext runContext, Task task) {
|
||||||
try {
|
try {
|
||||||
var map = JacksonMapper.toMap(task);
|
var map = JacksonMapper.toMap(task);
|
||||||
var rMap = runContext.render(map);
|
// If there are task provided variables, rendering the task may fail.
|
||||||
|
// The best we can do is to add a fake 'workingDir' as it's an often added variables,
|
||||||
|
// and it should not be part of the task hash.
|
||||||
|
Map<String, Object> variables = Map.of("workingDir", "workingDir");
|
||||||
|
var rMap = runContext.render(map, variables);
|
||||||
var json = JacksonMapper.ofJson().writeValueAsBytes(rMap);
|
var json = JacksonMapper.ofJson().writeValueAsBytes(rMap);
|
||||||
MessageDigest digest = MessageDigest.getInstance("SHA-256");
|
MessageDigest digest = MessageDigest.getInstance("SHA-256");
|
||||||
digest.update(json);
|
digest.update(json);
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import io.kestra.core.events.CrudEventType;
|
|||||||
import io.kestra.core.exceptions.DeserializationException;
|
import io.kestra.core.exceptions.DeserializationException;
|
||||||
import io.kestra.core.exceptions.InternalException;
|
import io.kestra.core.exceptions.InternalException;
|
||||||
import io.kestra.core.metrics.MetricRegistry;
|
import io.kestra.core.metrics.MetricRegistry;
|
||||||
|
import io.kestra.core.models.HasUID;
|
||||||
import io.kestra.core.models.conditions.Condition;
|
import io.kestra.core.models.conditions.Condition;
|
||||||
import io.kestra.core.models.conditions.ConditionContext;
|
import io.kestra.core.models.conditions.ConditionContext;
|
||||||
import io.kestra.core.models.executions.Execution;
|
import io.kestra.core.models.executions.Execution;
|
||||||
@@ -29,10 +30,7 @@ import io.kestra.core.server.Service;
|
|||||||
import io.kestra.core.server.ServiceStateChangeEvent;
|
import io.kestra.core.server.ServiceStateChangeEvent;
|
||||||
import io.kestra.core.server.ServiceType;
|
import io.kestra.core.server.ServiceType;
|
||||||
import io.kestra.core.services.*;
|
import io.kestra.core.services.*;
|
||||||
import io.kestra.core.utils.Await;
|
import io.kestra.core.utils.*;
|
||||||
import io.kestra.core.utils.Either;
|
|
||||||
import io.kestra.core.utils.IdUtils;
|
|
||||||
import io.kestra.core.utils.ListUtils;
|
|
||||||
import io.micronaut.context.ApplicationContext;
|
import io.micronaut.context.ApplicationContext;
|
||||||
import io.micronaut.context.event.ApplicationEventPublisher;
|
import io.micronaut.context.event.ApplicationEventPublisher;
|
||||||
import io.micronaut.core.util.CollectionUtils;
|
import io.micronaut.core.util.CollectionUtils;
|
||||||
@@ -91,7 +89,9 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
|||||||
private volatile Boolean isReady = false;
|
private volatile Boolean isReady = false;
|
||||||
|
|
||||||
private final ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
|
private final ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||||
|
private ScheduledFuture<?> scheduledFuture;
|
||||||
private final ScheduledExecutorService executionMonitorExecutor = Executors.newSingleThreadScheduledExecutor();
|
private final ScheduledExecutorService executionMonitorExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||||
|
private ScheduledFuture<?> executionMonitorFuture;
|
||||||
|
|
||||||
@Getter
|
@Getter
|
||||||
protected SchedulerTriggerStateInterface triggerState;
|
protected SchedulerTriggerStateInterface triggerState;
|
||||||
@@ -152,7 +152,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
|||||||
this.flowListeners.run();
|
this.flowListeners.run();
|
||||||
this.flowListeners.listen(this::initializedTriggers);
|
this.flowListeners.listen(this::initializedTriggers);
|
||||||
|
|
||||||
ScheduledFuture<?> evaluationLoop = scheduleExecutor.scheduleAtFixedRate(
|
scheduledFuture = scheduleExecutor.scheduleAtFixedRate(
|
||||||
this::handle,
|
this::handle,
|
||||||
0,
|
0,
|
||||||
1,
|
1,
|
||||||
@@ -162,10 +162,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
|||||||
// look at exception on the evaluation loop thread
|
// look at exception on the evaluation loop thread
|
||||||
Thread.ofVirtual().name("scheduler-evaluation-loop-watch").start(
|
Thread.ofVirtual().name("scheduler-evaluation-loop-watch").start(
|
||||||
() -> {
|
() -> {
|
||||||
Await.until(evaluationLoop::isDone);
|
Await.until(scheduledFuture::isDone);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
evaluationLoop.get();
|
scheduledFuture.get();
|
||||||
} catch (CancellationException ignored) {
|
} catch (CancellationException ignored) {
|
||||||
|
|
||||||
} catch (ExecutionException | InterruptedException e) {
|
} catch (ExecutionException | InterruptedException e) {
|
||||||
@@ -177,7 +177,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
|||||||
);
|
);
|
||||||
|
|
||||||
// Periodically report metrics and logs of running executions
|
// Periodically report metrics and logs of running executions
|
||||||
ScheduledFuture<?> monitoringLoop = executionMonitorExecutor.scheduleWithFixedDelay(
|
executionMonitorFuture = executionMonitorExecutor.scheduleWithFixedDelay(
|
||||||
this::executionMonitor,
|
this::executionMonitor,
|
||||||
30,
|
30,
|
||||||
10,
|
10,
|
||||||
@@ -187,10 +187,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
|||||||
// look at exception on the monitoring loop thread
|
// look at exception on the monitoring loop thread
|
||||||
Thread.ofVirtual().name("scheduler-monitoring-loop-watch").start(
|
Thread.ofVirtual().name("scheduler-monitoring-loop-watch").start(
|
||||||
() -> {
|
() -> {
|
||||||
Await.until(monitoringLoop::isDone);
|
Await.until(executionMonitorFuture::isDone);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
monitoringLoop.get();
|
executionMonitorFuture.get();
|
||||||
} catch (CancellationException ignored) {
|
} catch (CancellationException ignored) {
|
||||||
|
|
||||||
} catch (ExecutionException | InterruptedException e) {
|
} catch (ExecutionException | InterruptedException e) {
|
||||||
@@ -302,6 +302,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
|||||||
// Initialized local trigger state,
|
// Initialized local trigger state,
|
||||||
// and if some flows were created outside the box, for example from the CLI,
|
// and if some flows were created outside the box, for example from the CLI,
|
||||||
// then we may have some triggers that are not created yet.
|
// then we may have some triggers that are not created yet.
|
||||||
|
/* FIXME: There is a race between Kafka stream consumption & initializedTriggers: we can override a trigger update coming from a stream consumption with an old one because stream consumption is not waiting for trigger initialization
|
||||||
|
* Example: we see a SUCCESS execution so we reset the trigger's executionId but then the initializedTriggers resubmits an old trigger state for some reasons (evaluationDate for eg.) */
|
||||||
private void initializedTriggers(List<FlowWithSource> flows) {
|
private void initializedTriggers(List<FlowWithSource> flows) {
|
||||||
record FlowAndTrigger(FlowWithSource flow, AbstractTrigger trigger) {
|
record FlowAndTrigger(FlowWithSource flow, AbstractTrigger trigger) {
|
||||||
@Override
|
@Override
|
||||||
@@ -318,7 +320,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
|||||||
}
|
}
|
||||||
|
|
||||||
synchronized (this) { // we need a sync block as we read then update so we should not do it in multiple threads concurrently
|
synchronized (this) { // we need a sync block as we read then update so we should not do it in multiple threads concurrently
|
||||||
List<Trigger> triggers = triggerState.findAllForAllTenants();
|
Map<String, Trigger> triggers = triggerState.findAllForAllTenants().stream().collect(Collectors.toMap(HasUID::uid, Function.identity()));
|
||||||
|
|
||||||
flows
|
flows
|
||||||
.stream()
|
.stream()
|
||||||
@@ -328,7 +330,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
|||||||
.flatMap(flow -> flow.getTriggers().stream().filter(trigger -> trigger instanceof WorkerTriggerInterface).map(trigger -> new FlowAndTrigger(flow, trigger)))
|
.flatMap(flow -> flow.getTriggers().stream().filter(trigger -> trigger instanceof WorkerTriggerInterface).map(trigger -> new FlowAndTrigger(flow, trigger)))
|
||||||
.distinct()
|
.distinct()
|
||||||
.forEach(flowAndTrigger -> {
|
.forEach(flowAndTrigger -> {
|
||||||
Optional<Trigger> trigger = triggers.stream().filter(t -> t.uid().equals(Trigger.uid(flowAndTrigger.flow(), flowAndTrigger.trigger()))).findFirst(); // must have one or none
|
String triggerUid = Trigger.uid(flowAndTrigger.flow(), flowAndTrigger.trigger());
|
||||||
|
Optional<Trigger> trigger = Optional.ofNullable(triggers.get(triggerUid));
|
||||||
if (trigger.isEmpty()) {
|
if (trigger.isEmpty()) {
|
||||||
RunContext runContext = runContextFactory.of(flowAndTrigger.flow(), flowAndTrigger.trigger());
|
RunContext runContext = runContextFactory.of(flowAndTrigger.flow(), flowAndTrigger.trigger());
|
||||||
ConditionContext conditionContext = conditionService.conditionContext(runContext, flowAndTrigger.flow(), null);
|
ConditionContext conditionContext = conditionService.conditionContext(runContext, flowAndTrigger.flow(), null);
|
||||||
@@ -371,10 +374,13 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
|||||||
|
|
||||||
this.triggerState.update(lastUpdate);
|
this.triggerState.update(lastUpdate);
|
||||||
}
|
}
|
||||||
} else if (recoverMissedSchedules == RecoverMissedSchedules.NONE) {
|
} else {
|
||||||
lastUpdate = trigger.get().toBuilder().nextExecutionDate(schedule.nextEvaluationDate()).build();
|
ZonedDateTime nextEvaluationDate = schedule.nextEvaluationDate();
|
||||||
|
if (recoverMissedSchedules == RecoverMissedSchedules.NONE && !Objects.equals(trigger.get().getNextExecutionDate(), nextEvaluationDate)) {
|
||||||
|
lastUpdate = trigger.get().toBuilder().nextExecutionDate(nextEvaluationDate).build();
|
||||||
|
|
||||||
this.triggerState.update(lastUpdate);
|
this.triggerState.update(lastUpdate);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// Used for schedulableNextDate
|
// Used for schedulableNextDate
|
||||||
FlowWithWorkerTrigger flowWithWorkerTrigger = FlowWithWorkerTrigger.builder()
|
FlowWithWorkerTrigger flowWithWorkerTrigger = FlowWithWorkerTrigger.builder()
|
||||||
@@ -467,9 +473,12 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
|||||||
|
|
||||||
private List<FlowWithTriggers> computeSchedulable(List<FlowWithSource> flows, List<Trigger> triggerContextsToEvaluate, ScheduleContextInterface scheduleContext) {
|
private List<FlowWithTriggers> computeSchedulable(List<FlowWithSource> flows, List<Trigger> triggerContextsToEvaluate, ScheduleContextInterface scheduleContext) {
|
||||||
List<String> flowToKeep = triggerContextsToEvaluate.stream().map(Trigger::getFlowId).toList();
|
List<String> flowToKeep = triggerContextsToEvaluate.stream().map(Trigger::getFlowId).toList();
|
||||||
|
List<String> flowIds = flows.stream().map(FlowId::uidWithoutRevision).toList();
|
||||||
|
Map<String, Trigger> triggerById = triggerContextsToEvaluate.stream().collect(Collectors.toMap(HasUID::uid, Function.identity()));
|
||||||
|
|
||||||
|
// delete trigger which flow has been deleted
|
||||||
triggerContextsToEvaluate.stream()
|
triggerContextsToEvaluate.stream()
|
||||||
.filter(trigger -> !flows.stream().map(FlowId::uidWithoutRevision).toList().contains(FlowId.uid(trigger)))
|
.filter(trigger -> !flowIds.contains(FlowId.uid(trigger)))
|
||||||
.forEach(trigger -> {
|
.forEach(trigger -> {
|
||||||
try {
|
try {
|
||||||
this.triggerState.delete(trigger);
|
this.triggerState.delete(trigger);
|
||||||
@@ -491,12 +500,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
|||||||
.map(abstractTrigger -> {
|
.map(abstractTrigger -> {
|
||||||
RunContext runContext = runContextFactory.of(flow, abstractTrigger);
|
RunContext runContext = runContextFactory.of(flow, abstractTrigger);
|
||||||
ConditionContext conditionContext = conditionService.conditionContext(runContext, flow, null);
|
ConditionContext conditionContext = conditionService.conditionContext(runContext, flow, null);
|
||||||
Trigger triggerContext = null;
|
Trigger triggerContext;
|
||||||
Trigger lastTrigger = triggerContextsToEvaluate
|
Trigger lastTrigger = triggerById.get(Trigger.uid(flow, abstractTrigger));
|
||||||
.stream()
|
|
||||||
.filter(triggerContextToFind -> triggerContextToFind.uid().equals(Trigger.uid(flow, abstractTrigger)))
|
|
||||||
.findFirst()
|
|
||||||
.orElse(null);
|
|
||||||
// If a trigger is not found in triggers to evaluate, then we ignore it
|
// If a trigger is not found in triggers to evaluate, then we ignore it
|
||||||
if (lastTrigger == null) {
|
if (lastTrigger == null) {
|
||||||
return null;
|
return null;
|
||||||
@@ -1006,8 +1011,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
|||||||
|
|
||||||
setState(ServiceState.TERMINATING);
|
setState(ServiceState.TERMINATING);
|
||||||
this.receiveCancellations.forEach(Runnable::run);
|
this.receiveCancellations.forEach(Runnable::run);
|
||||||
this.scheduleExecutor.shutdown();
|
ExecutorsUtils.closeScheduledThreadPool(this.scheduleExecutor, Duration.ofSeconds(5), List.of(scheduledFuture));
|
||||||
this.executionMonitorExecutor.shutdown();
|
ExecutorsUtils.closeScheduledThreadPool(executionMonitorExecutor, Duration.ofSeconds(5), List.of(executionMonitorFuture));
|
||||||
try {
|
try {
|
||||||
if (onClose != null) {
|
if (onClose != null) {
|
||||||
onClose.run();
|
onClose.run();
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package io.kestra.core.server;
|
package io.kestra.core.server;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import io.kestra.core.utils.ExecutorsUtils;
|
||||||
import io.micronaut.core.annotation.Introspected;
|
import io.micronaut.core.annotation.Introspected;
|
||||||
import jakarta.annotation.PostConstruct;
|
import jakarta.annotation.PostConstruct;
|
||||||
import jakarta.annotation.PreDestroy;
|
import jakarta.annotation.PreDestroy;
|
||||||
@@ -8,9 +9,11 @@ import lombok.extern.slf4j.Slf4j;
|
|||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
@@ -25,6 +28,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
|
|||||||
protected final ServerConfig serverConfig;
|
protected final ServerConfig serverConfig;
|
||||||
private final AtomicBoolean isStopped = new AtomicBoolean(false);
|
private final AtomicBoolean isStopped = new AtomicBoolean(false);
|
||||||
private ScheduledExecutorService scheduledExecutorService;
|
private ScheduledExecutorService scheduledExecutorService;
|
||||||
|
private ScheduledFuture<?> scheduledFuture;
|
||||||
private Instant lastScheduledExecution;
|
private Instant lastScheduledExecution;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -98,7 +102,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
|
|||||||
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, name));
|
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, name));
|
||||||
Duration scheduleInterval = getScheduleInterval();
|
Duration scheduleInterval = getScheduleInterval();
|
||||||
log.debug("Scheduling '{}' at fixed rate {}.", name, scheduleInterval);
|
log.debug("Scheduling '{}' at fixed rate {}.", name, scheduleInterval);
|
||||||
scheduledExecutorService.scheduleAtFixedRate(
|
scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(
|
||||||
this,
|
this,
|
||||||
0,
|
0,
|
||||||
scheduleInterval.toSeconds(),
|
scheduleInterval.toSeconds(),
|
||||||
@@ -133,20 +137,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
|
|||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
if (isStopped.compareAndSet(false, true) && scheduledExecutorService != null) {
|
if (isStopped.compareAndSet(false, true) && scheduledExecutorService != null) {
|
||||||
scheduledExecutorService.shutdown();
|
ExecutorsUtils.closeScheduledThreadPool(scheduledExecutorService, Duration.ofSeconds(5), List.of(scheduledFuture));
|
||||||
if (scheduledExecutorService.isTerminated()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
if (!scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS)) {
|
|
||||||
log.debug("Failed to wait for scheduled '{}' task termination. Cause: Timeout", name);
|
|
||||||
}
|
|
||||||
log.debug("Stopped scheduled '{}' task.", name);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
scheduledExecutorService.shutdownNow();
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
log.debug("Failed to wait for scheduled '{}' task termination. Cause: Interrupted.", name);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -317,6 +317,32 @@ public class ExecutionService {
|
|||||||
return revision != null ? newExecution.withFlowRevision(revision) : newExecution;
|
return revision != null ? newExecution.withFlowRevision(revision) : newExecution;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Execution changeTaskRunState(final Execution execution, Flow flow, String taskRunId, State.Type newState) throws Exception {
|
||||||
|
Execution newExecution = markAs(execution, flow, taskRunId, newState);
|
||||||
|
|
||||||
|
// if the execution was terminated, it could have executed errors/finally/afterExecutions, we must remove them as the execution will be restarted
|
||||||
|
if (execution.getState().isTerminated()) {
|
||||||
|
List<TaskRun> newTaskRuns = newExecution.getTaskRunList();
|
||||||
|
// We need to remove global error tasks and flowable error tasks if any
|
||||||
|
flow
|
||||||
|
.allErrorsWithChildren()
|
||||||
|
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
|
||||||
|
|
||||||
|
// We need to remove global finally tasks and flowable error tasks if any
|
||||||
|
flow
|
||||||
|
.allFinallyWithChildren()
|
||||||
|
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
|
||||||
|
|
||||||
|
// We need to remove afterExecution tasks
|
||||||
|
ListUtils.emptyOnNull(flow.getAfterExecution())
|
||||||
|
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
|
||||||
|
|
||||||
|
return newExecution.withTaskRunList(newTaskRuns);
|
||||||
|
} else {
|
||||||
|
return newExecution;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public Execution markAs(final Execution execution, FlowInterface flow, String taskRunId, State.Type newState) throws Exception {
|
public Execution markAs(final Execution execution, FlowInterface flow, String taskRunId, State.Type newState) throws Exception {
|
||||||
return this.markAs(execution, flow, taskRunId, newState, null, null);
|
return this.markAs(execution, flow, taskRunId, newState, null, null);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import io.kestra.core.models.flows.FlowInterface;
|
|||||||
import io.kestra.core.models.flows.FlowWithException;
|
import io.kestra.core.models.flows.FlowWithException;
|
||||||
import io.kestra.core.models.flows.FlowWithSource;
|
import io.kestra.core.models.flows.FlowWithSource;
|
||||||
import io.kestra.core.models.flows.GenericFlow;
|
import io.kestra.core.models.flows.GenericFlow;
|
||||||
|
import io.kestra.core.models.tasks.RunnableTask;
|
||||||
import io.kestra.core.models.topologies.FlowTopology;
|
import io.kestra.core.models.topologies.FlowTopology;
|
||||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||||
import io.kestra.core.models.validations.ModelValidator;
|
import io.kestra.core.models.validations.ModelValidator;
|
||||||
@@ -51,7 +52,6 @@ import java.util.stream.StreamSupport;
|
|||||||
@Singleton
|
@Singleton
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class FlowService {
|
public class FlowService {
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
Optional<FlowRepositoryInterface> flowRepository;
|
Optional<FlowRepositoryInterface> flowRepository;
|
||||||
|
|
||||||
@@ -236,6 +236,7 @@ public class FlowService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
List<String> warnings = new ArrayList<>(checkValidSubflows(flow, tenantId));
|
List<String> warnings = new ArrayList<>(checkValidSubflows(flow, tenantId));
|
||||||
|
|
||||||
List<io.kestra.plugin.core.trigger.Flow> flowTriggers = ListUtils.emptyOnNull(flow.getTriggers()).stream()
|
List<io.kestra.plugin.core.trigger.Flow> flowTriggers = ListUtils.emptyOnNull(flow.getTriggers()).stream()
|
||||||
.filter(io.kestra.plugin.core.trigger.Flow.class::isInstance)
|
.filter(io.kestra.plugin.core.trigger.Flow.class::isInstance)
|
||||||
.map(io.kestra.plugin.core.trigger.Flow.class::cast)
|
.map(io.kestra.plugin.core.trigger.Flow.class::cast)
|
||||||
@@ -246,6 +247,21 @@ public class FlowService {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// add warning for runnable properties (timeout, workerGroup, taskCache) when used not in a runnable
|
||||||
|
flow.allTasksWithChilds().forEach(task -> {
|
||||||
|
if (!(task instanceof RunnableTask<?>)) {
|
||||||
|
if (task.getTimeout() != null) {
|
||||||
|
warnings.add("The task '" + task.getId() + "' cannot use the 'timeout' property as it's only relevant for runnable tasks.");
|
||||||
|
}
|
||||||
|
if (task.getTaskCache() != null) {
|
||||||
|
warnings.add("The task '" + task.getId() + "' cannot use the 'taskCache' property as it's only relevant for runnable tasks.");
|
||||||
|
}
|
||||||
|
if (task.getWorkerGroup() != null) {
|
||||||
|
warnings.add("The task '" + task.getId() + "' cannot use the 'workerGroup' property as it's only relevant for runnable tasks.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
return warnings;
|
return warnings;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package io.kestra.core.services;
|
package io.kestra.core.services;
|
||||||
|
|
||||||
import io.kestra.core.models.executions.Execution;
|
import io.kestra.core.models.executions.Execution;
|
||||||
|
import io.kestra.core.models.executions.ExecutionKind;
|
||||||
import io.kestra.core.models.flows.Flow;
|
import io.kestra.core.models.flows.Flow;
|
||||||
import io.kestra.core.models.flows.FlowWithException;
|
import io.kestra.core.models.flows.FlowWithException;
|
||||||
import io.kestra.core.models.flows.FlowWithSource;
|
import io.kestra.core.models.flows.FlowWithSource;
|
||||||
@@ -10,7 +11,6 @@ import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInte
|
|||||||
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
|
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
|
||||||
import io.kestra.core.runners.RunContextFactory;
|
import io.kestra.core.runners.RunContextFactory;
|
||||||
import io.kestra.core.utils.ListUtils;
|
import io.kestra.core.utils.ListUtils;
|
||||||
import jakarta.inject.Inject;
|
|
||||||
import jakarta.inject.Singleton;
|
import jakarta.inject.Singleton;
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
@@ -24,14 +24,15 @@ import java.util.stream.Stream;
|
|||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
public class FlowTriggerService {
|
public class FlowTriggerService {
|
||||||
@Inject
|
private final ConditionService conditionService;
|
||||||
private ConditionService conditionService;
|
private final RunContextFactory runContextFactory;
|
||||||
|
private final FlowService flowService;
|
||||||
|
|
||||||
@Inject
|
public FlowTriggerService(ConditionService conditionService, RunContextFactory runContextFactory, FlowService flowService) {
|
||||||
private RunContextFactory runContextFactory;
|
this.conditionService = conditionService;
|
||||||
|
this.runContextFactory = runContextFactory;
|
||||||
@Inject
|
this.flowService = flowService;
|
||||||
private FlowService flowService;
|
}
|
||||||
|
|
||||||
// used in EE only
|
// used in EE only
|
||||||
public Stream<FlowWithFlowTrigger> withFlowTriggersOnly(Stream<FlowWithSource> allFlows) {
|
public Stream<FlowWithFlowTrigger> withFlowTriggersOnly(Stream<FlowWithSource> allFlows) {
|
||||||
@@ -53,6 +54,8 @@ public class FlowTriggerService {
|
|||||||
List<FlowWithFlowTrigger> validTriggersBeforeMultipleConditionEval = allFlows.stream()
|
List<FlowWithFlowTrigger> validTriggersBeforeMultipleConditionEval = allFlows.stream()
|
||||||
// prevent recursive flow triggers
|
// prevent recursive flow triggers
|
||||||
.filter(flow -> flowService.removeUnwanted(flow, execution))
|
.filter(flow -> flowService.removeUnwanted(flow, execution))
|
||||||
|
// filter out Test Executions
|
||||||
|
.filter(flow -> execution.getKind() == null)
|
||||||
// ensure flow & triggers are enabled
|
// ensure flow & triggers are enabled
|
||||||
.filter(flow -> !flow.isDisabled() && !(flow instanceof FlowWithException))
|
.filter(flow -> !flow.isDisabled() && !(flow instanceof FlowWithException))
|
||||||
.filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty())
|
.filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty())
|
||||||
|
|||||||
@@ -173,18 +173,15 @@ public class PluginDefaultService {
|
|||||||
try {
|
try {
|
||||||
return this.injectAllDefaults(flow, false);
|
return this.injectAllDefaults(flow, false);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
RunContextLogger
|
try {
|
||||||
.logEntries(
|
logQueue.emitAsync(RunContextLogger
|
||||||
Execution.loggingEventFromException(e),
|
.logEntries(
|
||||||
LogEntry.of(execution)
|
Execution.loggingEventFromException(e),
|
||||||
)
|
LogEntry.of(execution)
|
||||||
.forEach(logEntry -> {
|
));
|
||||||
try {
|
} catch (QueueException e1) {
|
||||||
logQueue.emitAsync(logEntry);
|
// silently do nothing
|
||||||
} catch (QueueException e1) {
|
}
|
||||||
// silently do nothing
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return readWithoutDefaultsOrThrow(flow);
|
return readWithoutDefaultsOrThrow(flow);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,12 +3,16 @@ package io.kestra.core.utils;
|
|||||||
import io.micrometer.core.instrument.MeterRegistry;
|
import io.micrometer.core.instrument.MeterRegistry;
|
||||||
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
|
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.List;
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.*;
|
||||||
|
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
import jakarta.inject.Singleton;
|
import jakarta.inject.Singleton;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
|
@Slf4j
|
||||||
public class ExecutorsUtils {
|
public class ExecutorsUtils {
|
||||||
@Inject
|
@Inject
|
||||||
private ThreadMainFactoryBuilder threadFactoryBuilder;
|
private ThreadMainFactoryBuilder threadFactoryBuilder;
|
||||||
@@ -61,6 +65,29 @@ public class ExecutorsUtils {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void closeScheduledThreadPool(ScheduledExecutorService scheduledExecutorService, Duration gracePeriod, List<ScheduledFuture<?>> taskFutures) {
|
||||||
|
scheduledExecutorService.shutdown();
|
||||||
|
if (scheduledExecutorService.isTerminated()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (!scheduledExecutorService.awaitTermination(gracePeriod.toMillis(), TimeUnit.MILLISECONDS)) {
|
||||||
|
log.warn("Failed to shutdown the ScheduledThreadPoolExecutor during grace period, forcing it to shutdown now");
|
||||||
|
|
||||||
|
// Ensure the scheduled task reaches a terminal state to avoid possible memory leak
|
||||||
|
ListUtils.emptyOnNull(taskFutures).forEach(taskFuture -> taskFuture.cancel(true));
|
||||||
|
|
||||||
|
scheduledExecutorService.shutdownNow();
|
||||||
|
}
|
||||||
|
log.debug("Stopped scheduled ScheduledThreadPoolExecutor.");
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
scheduledExecutorService.shutdownNow();
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
log.debug("Failed to shutdown the ScheduledThreadPoolExecutor.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private ExecutorService wrap(String name, ExecutorService executorService) {
|
private ExecutorService wrap(String name, ExecutorService executorService) {
|
||||||
return ExecutorServiceMetrics.monitor(
|
return ExecutorServiceMetrics.monitor(
|
||||||
meterRegistry,
|
meterRegistry,
|
||||||
|
|||||||
@@ -54,9 +54,10 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
|
|||||||
violations.add("Namespace '" + value.getNamespace() + "' does not exist but is required to exist before a flow can be created in it.");
|
violations.add("Namespace '" + value.getNamespace() + "' does not exist but is required to exist before a flow can be created in it.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
List<Task> allTasks = value.allTasksWithChilds();
|
||||||
|
|
||||||
// tasks unique id
|
// tasks unique id
|
||||||
List<String> taskIds = value.allTasksWithChilds()
|
List<String> taskIds = allTasks.stream()
|
||||||
.stream()
|
|
||||||
.map(Task::getId)
|
.map(Task::getId)
|
||||||
.toList();
|
.toList();
|
||||||
|
|
||||||
@@ -72,8 +73,8 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
|
|||||||
violations.add("Duplicate trigger id with name [" + String.join(", ", duplicateIds) + "]");
|
violations.add("Duplicate trigger id with name [" + String.join(", ", duplicateIds) + "]");
|
||||||
}
|
}
|
||||||
|
|
||||||
value.allTasksWithChilds()
|
allTasks.stream()
|
||||||
.stream().filter(task -> task instanceof ExecutableTask<?> executableTask
|
.filter(task -> task instanceof ExecutableTask<?> executableTask
|
||||||
&& value.getId().equals(executableTask.subflowId().flowId())
|
&& value.getId().equals(executableTask.subflowId().flowId())
|
||||||
&& value.getNamespace().equals(executableTask.subflowId().namespace()))
|
&& value.getNamespace().equals(executableTask.subflowId().namespace()))
|
||||||
.forEach(task -> violations.add("Recursive call to flow [" + value.getNamespace() + "." + value.getId() + "]"));
|
.forEach(task -> violations.add("Recursive call to flow [" + value.getNamespace() + "." + value.getId() + "]"));
|
||||||
@@ -102,7 +103,7 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
|
|||||||
.map(input -> Pattern.compile("\\{\\{\\s*inputs." + input.getId() + "\\s*\\}\\}"))
|
.map(input -> Pattern.compile("\\{\\{\\s*inputs." + input.getId() + "\\s*\\}\\}"))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
List<String> invalidTasks = value.allTasks()
|
List<String> invalidTasks = allTasks.stream()
|
||||||
.filter(task -> checkObjectFieldsWithPatterns(task, inputsWithMinusPatterns))
|
.filter(task -> checkObjectFieldsWithPatterns(task, inputsWithMinusPatterns))
|
||||||
.map(task -> task.getId())
|
.map(task -> task.getId())
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
@@ -112,12 +113,12 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
|
|||||||
" [" + String.join(", ", invalidTasks) + "]");
|
" [" + String.join(", ", invalidTasks) + "]");
|
||||||
}
|
}
|
||||||
|
|
||||||
List<Pattern> outputsWithMinusPattern = value.allTasks()
|
List<Pattern> outputsWithMinusPattern = allTasks.stream()
|
||||||
.filter(output -> Optional.ofNullable(output.getId()).orElse("").contains("-"))
|
.filter(output -> Optional.ofNullable(output.getId()).orElse("").contains("-"))
|
||||||
.map(output -> Pattern.compile("\\{\\{\\s*outputs\\." + output.getId() + "\\.[^}]+\\s*\\}\\}"))
|
.map(output -> Pattern.compile("\\{\\{\\s*outputs\\." + output.getId() + "\\.[^}]+\\s*\\}\\}"))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
invalidTasks = value.allTasks()
|
invalidTasks = allTasks.stream()
|
||||||
.filter(task -> checkObjectFieldsWithPatterns(task, outputsWithMinusPattern))
|
.filter(task -> checkObjectFieldsWithPatterns(task, outputsWithMinusPattern))
|
||||||
.map(task -> task.getId())
|
.map(task -> task.getId())
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|||||||
@@ -90,7 +90,7 @@ public class ExecutionOutputs extends Condition implements ScheduleCondition {
|
|||||||
private static final String OUTPUTS_VAR = "outputs";
|
private static final String OUTPUTS_VAR = "outputs";
|
||||||
|
|
||||||
@NotNull
|
@NotNull
|
||||||
private Property<String> expression;
|
private Property<Boolean> expression;
|
||||||
|
|
||||||
/** {@inheritDoc} **/
|
/** {@inheritDoc} **/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@@ -106,8 +106,7 @@ public class ExecutionOutputs extends Condition implements ScheduleCondition {
|
|||||||
Map.of(TRIGGER_VAR, Map.of(OUTPUTS_VAR, conditionContext.getExecution().getOutputs()))
|
Map.of(TRIGGER_VAR, Map.of(OUTPUTS_VAR, conditionContext.getExecution().getOutputs()))
|
||||||
);
|
);
|
||||||
|
|
||||||
String render = conditionContext.getRunContext().render(expression).as(String.class, variables).orElseThrow();
|
return conditionContext.getRunContext().render(expression).skipCache().as(Boolean.class, variables).orElseThrow();
|
||||||
return !(render.isBlank() || render.trim().equals("false"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean hasNoOutputs(final Execution execution) {
|
private boolean hasNoOutputs(final Execution execution) {
|
||||||
|
|||||||
@@ -19,7 +19,6 @@ import lombok.experimental.SuperBuilder;
|
|||||||
@NoArgsConstructor
|
@NoArgsConstructor
|
||||||
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
|
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
|
||||||
@EqualsAndHashCode
|
@EqualsAndHashCode
|
||||||
//@TriggersDataFilterValidation
|
|
||||||
@Schema(
|
@Schema(
|
||||||
title = "Display Execution data in a dashboard chart.",
|
title = "Display Execution data in a dashboard chart.",
|
||||||
description = "Execution data can be displayed in charts broken out by Namespace and filtered by State, for example."
|
description = "Execution data can be displayed in charts broken out by Namespace and filtered by State, for example."
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
|
|||||||
import jakarta.validation.constraints.NotNull;
|
import jakarta.validation.constraints.NotNull;
|
||||||
import lombok.*;
|
import lombok.*;
|
||||||
import lombok.experimental.SuperBuilder;
|
import lombok.experimental.SuperBuilder;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
@@ -68,6 +69,7 @@ import java.util.Optional;
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
@Slf4j
|
||||||
public class Exit extends Task implements ExecutionUpdatableTask {
|
public class Exit extends Task implements ExecutionUpdatableTask {
|
||||||
@NotNull
|
@NotNull
|
||||||
@Schema(
|
@Schema(
|
||||||
@@ -104,12 +106,13 @@ public class Exit extends Task implements ExecutionUpdatableTask {
|
|||||||
// ends all parents
|
// ends all parents
|
||||||
while (newTaskRun.getParentTaskRunId() != null) {
|
while (newTaskRun.getParentTaskRunId() != null) {
|
||||||
newTaskRun = newExecution.findTaskRunByTaskRunId(newTaskRun.getParentTaskRunId()).withState(exitState);
|
newTaskRun = newExecution.findTaskRunByTaskRunId(newTaskRun.getParentTaskRunId()).withState(exitState);
|
||||||
newExecution = execution.withTaskRun(newTaskRun);
|
newExecution = newExecution.withTaskRun(newTaskRun);
|
||||||
}
|
}
|
||||||
return newExecution;
|
return newExecution;
|
||||||
} catch (InternalException e) {
|
} catch (InternalException e) {
|
||||||
// in case we cannot update the last not terminated task run, we ignore it
|
// in case we cannot update the last not terminated task run, we ignore it
|
||||||
return execution;
|
log.warn("Unable to update the taskrun state", e);
|
||||||
|
return execution.withState(exitState);
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.orElse(execution)
|
.orElse(execution)
|
||||||
|
|||||||
@@ -208,48 +208,50 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
|
|||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean isOutputsAllowed = runContext
|
|
||||||
.<Boolean>pluginConfiguration(PLUGIN_FLOW_OUTPUTS_ENABLED)
|
|
||||||
.orElse(true);
|
|
||||||
|
|
||||||
final Output.OutputBuilder builder = Output.builder()
|
final Output.OutputBuilder builder = Output.builder()
|
||||||
.executionId(execution.getId())
|
.executionId(execution.getId())
|
||||||
.state(execution.getState().getCurrent());
|
.state(execution.getState().getCurrent());
|
||||||
|
|
||||||
final Map<String, Object> subflowOutputs = Optional
|
|
||||||
.ofNullable(flow.getOutputs())
|
|
||||||
.map(outputs -> outputs
|
|
||||||
.stream()
|
|
||||||
.collect(Collectors.toMap(
|
|
||||||
io.kestra.core.models.flows.Output::getId,
|
|
||||||
io.kestra.core.models.flows.Output::getValue)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
.orElseGet(() -> isOutputsAllowed ? this.getOutputs() : null);
|
|
||||||
|
|
||||||
VariablesService variablesService = ((DefaultRunContext) runContext).getApplicationContext().getBean(VariablesService.class);
|
VariablesService variablesService = ((DefaultRunContext) runContext).getApplicationContext().getBean(VariablesService.class);
|
||||||
if (subflowOutputs != null) {
|
if (this.wait) { // we only compute outputs if we wait for the subflow
|
||||||
try {
|
boolean isOutputsAllowed = runContext
|
||||||
Map<String, Object> outputs = runContext.render(subflowOutputs);
|
.<Boolean>pluginConfiguration(PLUGIN_FLOW_OUTPUTS_ENABLED)
|
||||||
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
|
.orElse(true);
|
||||||
if (flow.getOutputs() != null && flowInputOutput != null) {
|
|
||||||
outputs = flowInputOutput.typedOutputs(flow, execution, outputs);
|
|
||||||
}
|
|
||||||
builder.outputs(outputs);
|
|
||||||
} catch (Exception e) {
|
|
||||||
runContext.logger().warn("Failed to extract outputs with the error: '{}'", e.getLocalizedMessage(), e);
|
|
||||||
var state = State.Type.fail(this);
|
|
||||||
Variables variables = variablesService.of(StorageContext.forTask(taskRun), builder.build());
|
|
||||||
taskRun = taskRun
|
|
||||||
.withState(state)
|
|
||||||
.withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(state)).build()))
|
|
||||||
.withOutputs(variables);
|
|
||||||
|
|
||||||
return Optional.of(SubflowExecutionResult.builder()
|
final Map<String, Object> subflowOutputs = Optional
|
||||||
.executionId(execution.getId())
|
.ofNullable(flow.getOutputs())
|
||||||
.state(State.Type.FAILED)
|
.map(outputs -> outputs
|
||||||
.parentTaskRun(taskRun)
|
.stream()
|
||||||
.build());
|
.collect(Collectors.toMap(
|
||||||
|
io.kestra.core.models.flows.Output::getId,
|
||||||
|
io.kestra.core.models.flows.Output::getValue)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.orElseGet(() -> isOutputsAllowed ? this.getOutputs() : null);
|
||||||
|
|
||||||
|
if (subflowOutputs != null) {
|
||||||
|
try {
|
||||||
|
Map<String, Object> outputs = runContext.render(subflowOutputs);
|
||||||
|
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
|
||||||
|
if (flow.getOutputs() != null && flowInputOutput != null) {
|
||||||
|
outputs = flowInputOutput.typedOutputs(flow, execution, outputs);
|
||||||
|
}
|
||||||
|
builder.outputs(outputs);
|
||||||
|
} catch (Exception e) {
|
||||||
|
runContext.logger().warn("Failed to extract outputs with the error: '{}'", e.getLocalizedMessage(), e);
|
||||||
|
var state = State.Type.fail(this);
|
||||||
|
Variables variables = variablesService.of(StorageContext.forTask(taskRun), builder.build());
|
||||||
|
taskRun = taskRun
|
||||||
|
.withState(state)
|
||||||
|
.withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(state)).build()))
|
||||||
|
.withOutputs(variables);
|
||||||
|
|
||||||
|
return Optional.of(SubflowExecutionResult.builder()
|
||||||
|
.executionId(execution.getId())
|
||||||
|
.state(State.Type.FAILED)
|
||||||
|
.parentTaskRun(taskRun)
|
||||||
|
.build());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -9,11 +9,11 @@ import io.kestra.core.runners.DefaultRunContext;
|
|||||||
import io.kestra.core.runners.RunContext;
|
import io.kestra.core.runners.RunContext;
|
||||||
import io.kestra.core.services.FlowService;
|
import io.kestra.core.services.FlowService;
|
||||||
import io.swagger.v3.oas.annotations.media.Schema;
|
import io.swagger.v3.oas.annotations.media.Schema;
|
||||||
|
import jakarta.validation.constraints.NotNull;
|
||||||
import lombok.Builder;
|
import lombok.Builder;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
import lombok.experimental.SuperBuilder;
|
import lombok.experimental.SuperBuilder;
|
||||||
import org.codehaus.commons.nullanalysis.NotNull;
|
|
||||||
|
|
||||||
import java.util.NoSuchElementException;
|
import java.util.NoSuchElementException;
|
||||||
|
|
||||||
|
|||||||
@@ -103,8 +103,10 @@ public class Set extends Task implements RunnableTask<VoidOutput> {
|
|||||||
|
|
||||||
KVStore kvStore = runContext.namespaceKv(renderedNamespace);
|
KVStore kvStore = runContext.namespaceKv(renderedNamespace);
|
||||||
|
|
||||||
if (kvType != null && renderedValue instanceof String renderedValueStr) {
|
if (kvType != null){
|
||||||
renderedValue = switch (runContext.render(kvType).as(KVType.class).orElseThrow()) {
|
KVType renderedKvType = runContext.render(kvType).as(KVType.class).orElseThrow();
|
||||||
|
if (renderedValue instanceof String renderedValueStr) {
|
||||||
|
renderedValue = switch (renderedKvType) {
|
||||||
case NUMBER -> JacksonMapper.ofJson().readValue(renderedValueStr, Number.class);
|
case NUMBER -> JacksonMapper.ofJson().readValue(renderedValueStr, Number.class);
|
||||||
case BOOLEAN -> Boolean.parseBoolean((String) renderedValue);
|
case BOOLEAN -> Boolean.parseBoolean((String) renderedValue);
|
||||||
case DATETIME, DATE -> Instant.parse(renderedValueStr);
|
case DATETIME, DATE -> Instant.parse(renderedValueStr);
|
||||||
@@ -112,7 +114,10 @@ public class Set extends Task implements RunnableTask<VoidOutput> {
|
|||||||
case JSON -> JacksonMapper.toObject(renderedValueStr);
|
case JSON -> JacksonMapper.toObject(renderedValueStr);
|
||||||
default -> renderedValue;
|
default -> renderedValue;
|
||||||
};
|
};
|
||||||
|
} else if (renderedValue instanceof Number valueNumber && renderedKvType == KVType.STRING) {
|
||||||
|
renderedValue = valueNumber.toString();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
kvStore.put(renderedKey, new KVValueAndMetadata(
|
kvStore.put(renderedKey, new KVValueAndMetadata(
|
||||||
new KVMetadata(
|
new KVMetadata(
|
||||||
|
|||||||
@@ -56,7 +56,8 @@ public class OverrideRetryInterceptor implements MethodInterceptor<Object, Objec
|
|||||||
retry.get("delay", Duration.class).orElse(Duration.ofSeconds(1)),
|
retry.get("delay", Duration.class).orElse(Duration.ofSeconds(1)),
|
||||||
retry.get("maxDelay", Duration.class).orElse(null),
|
retry.get("maxDelay", Duration.class).orElse(null),
|
||||||
new DefaultRetryPredicate(resolveIncludes(retry, "includes"), resolveIncludes(retry, "excludes")),
|
new DefaultRetryPredicate(resolveIncludes(retry, "includes"), resolveIncludes(retry, "excludes")),
|
||||||
Throwable.class
|
Throwable.class,
|
||||||
|
0
|
||||||
);
|
);
|
||||||
|
|
||||||
MutableConvertibleValues<Object> attrs = context.getAttributes();
|
MutableConvertibleValues<Object> attrs = context.getAttributes();
|
||||||
|
|||||||
@@ -0,0 +1,11 @@
|
|||||||
|
<svg width="512" height="512" viewBox="0 0 512 512" fill="currentColor" xmlns="http://www.w3.org/2000/svg">
|
||||||
|
<g clip-path="url(#clip0_1765_9330)">
|
||||||
|
<path d="M244.592 215.915C251.569 208.938 262.881 208.938 269.858 215.915L298.537 244.595C305.514 251.572 305.514 262.883 298.537 269.86L269.858 298.54C262.881 305.517 251.569 305.517 244.592 298.54L215.913 269.86C208.936 262.883 208.936 251.572 215.913 244.595L244.592 215.915Z" />
|
||||||
|
<path d="M376.685 215.687C383.537 208.835 394.646 208.835 401.498 215.687L430.63 244.818C437.482 251.67 437.482 262.78 430.63 269.632L401.498 298.763C394.646 305.615 383.537 305.615 376.685 298.763L347.553 269.632C340.701 262.78 340.701 251.67 347.553 244.818L376.685 215.687Z" />
|
||||||
|
<path d="M244.818 83.8243C251.671 76.9722 262.78 76.9722 269.632 83.8243L298.763 112.956C305.616 119.808 305.616 130.917 298.763 137.769L269.632 166.901C262.78 173.753 251.671 173.753 244.818 166.901L215.687 137.769C208.835 130.917 208.835 119.808 215.687 112.956L244.818 83.8243Z" />
|
||||||
|
<path d="M232.611 178.663C239.588 185.64 239.588 196.951 232.611 203.928L203.931 232.608C196.955 239.585 185.643 239.585 178.666 232.608L149.986 203.928C143.01 196.952 143.01 185.64 149.986 178.663L178.666 149.983C185.643 143.006 196.955 143.006 203.931 149.983L232.611 178.663Z" />
|
||||||
|
<path d="M166.901 244.818C173.753 251.67 173.753 262.78 166.901 269.632L137.77 298.763C130.918 305.615 119.808 305.615 112.956 298.763L83.8246 269.632C76.9725 262.78 76.9725 251.67 83.8246 244.818L112.956 215.687C119.808 208.835 130.918 208.835 137.77 215.687L166.901 244.818Z" />
|
||||||
|
<path d="M364.472 178.663C371.449 185.64 371.449 196.951 364.472 203.928L335.793 232.608C328.816 239.585 317.504 239.585 310.527 232.608L281.848 203.928C274.871 196.952 274.871 185.64 281.848 178.663L310.527 149.983C317.504 143.006 328.816 143.006 335.793 149.983L364.472 178.663Z" />
|
||||||
|
<path d="M285.45 367.015C301.037 382.602 301.037 407.873 285.45 423.46C269.863 439.047 244.591 439.047 229.004 423.46C213.417 407.873 213.417 382.602 229.004 367.015C244.591 351.428 269.863 351.428 285.45 367.015Z" />
|
||||||
|
</g>
|
||||||
|
</svg>
|
||||||
|
After Width: | Height: | Size: 2.1 KiB |
@@ -0,0 +1,11 @@
|
|||||||
|
<svg width="512" height="512" viewBox="0 0 512 512" fill="currentColor" xmlns="http://www.w3.org/2000/svg">
|
||||||
|
<g clip-path="url(#clip0_1765_9330)">
|
||||||
|
<path d="M244.592 215.915C251.569 208.938 262.881 208.938 269.858 215.915L298.537 244.595C305.514 251.572 305.514 262.883 298.537 269.86L269.858 298.54C262.881 305.517 251.569 305.517 244.592 298.54L215.913 269.86C208.936 262.883 208.936 251.572 215.913 244.595L244.592 215.915Z" />
|
||||||
|
<path d="M376.685 215.687C383.537 208.835 394.646 208.835 401.498 215.687L430.63 244.818C437.482 251.67 437.482 262.78 430.63 269.632L401.498 298.763C394.646 305.615 383.537 305.615 376.685 298.763L347.553 269.632C340.701 262.78 340.701 251.67 347.553 244.818L376.685 215.687Z" />
|
||||||
|
<path d="M244.818 83.8243C251.671 76.9722 262.78 76.9722 269.632 83.8243L298.763 112.956C305.616 119.808 305.616 130.917 298.763 137.769L269.632 166.901C262.78 173.753 251.671 173.753 244.818 166.901L215.687 137.769C208.835 130.917 208.835 119.808 215.687 112.956L244.818 83.8243Z" />
|
||||||
|
<path d="M232.611 178.663C239.588 185.64 239.588 196.951 232.611 203.928L203.931 232.608C196.955 239.585 185.643 239.585 178.666 232.608L149.986 203.928C143.01 196.952 143.01 185.64 149.986 178.663L178.666 149.983C185.643 143.006 196.955 143.006 203.931 149.983L232.611 178.663Z" />
|
||||||
|
<path d="M166.901 244.818C173.753 251.67 173.753 262.78 166.901 269.632L137.77 298.763C130.918 305.615 119.808 305.615 112.956 298.763L83.8246 269.632C76.9725 262.78 76.9725 251.67 83.8246 244.818L112.956 215.687C119.808 208.835 130.918 208.835 137.77 215.687L166.901 244.818Z" />
|
||||||
|
<path d="M364.472 178.663C371.449 185.64 371.449 196.951 364.472 203.928L335.793 232.608C328.816 239.585 317.504 239.585 310.527 232.608L281.848 203.928C274.871 196.952 274.871 185.64 281.848 178.663L310.527 149.983C317.504 143.006 328.816 143.006 335.793 149.983L364.472 178.663Z" />
|
||||||
|
<path d="M285.45 367.015C301.037 382.602 301.037 407.873 285.45 423.46C269.863 439.047 244.591 439.047 229.004 423.46C213.417 407.873 213.417 382.602 229.004 367.015C244.591 351.428 269.863 351.428 285.45 367.015Z" />
|
||||||
|
</g>
|
||||||
|
</svg>
|
||||||
|
After Width: | Height: | Size: 2.1 KiB |
@@ -112,7 +112,7 @@ class JsonSchemaGeneratorTest {
|
|||||||
|
|
||||||
var requiredWithDefault = definitions.get("io.kestra.core.docs.JsonSchemaGeneratorTest-RequiredWithDefault");
|
var requiredWithDefault = definitions.get("io.kestra.core.docs.JsonSchemaGeneratorTest-RequiredWithDefault");
|
||||||
assertThat(requiredWithDefault, is(notNullValue()));
|
assertThat(requiredWithDefault, is(notNullValue()));
|
||||||
assertThat((List<String>) requiredWithDefault.get("required"), not(contains("requiredWithDefault")));
|
assertThat((List<String>) requiredWithDefault.get("required"), not(containsInAnyOrder("requiredWithDefault", "anotherRequiredWithDefault")));
|
||||||
|
|
||||||
var properties = (Map<String, Map<String, Object>>) flow.get("properties");
|
var properties = (Map<String, Map<String, Object>>) flow.get("properties");
|
||||||
var listeners = properties.get("listeners");
|
var listeners = properties.get("listeners");
|
||||||
@@ -253,7 +253,7 @@ class JsonSchemaGeneratorTest {
|
|||||||
void requiredAreRemovedIfThereIsADefault() {
|
void requiredAreRemovedIfThereIsADefault() {
|
||||||
Map<String, Object> generate = jsonSchemaGenerator.properties(Task.class, RequiredWithDefault.class);
|
Map<String, Object> generate = jsonSchemaGenerator.properties(Task.class, RequiredWithDefault.class);
|
||||||
assertThat(generate, is(not(nullValue())));
|
assertThat(generate, is(not(nullValue())));
|
||||||
assertThat((List<String>) generate.get("required"), not(containsInAnyOrder("requiredWithDefault")));
|
assertThat((List<String>) generate.get("required"), not(containsInAnyOrder("requiredWithDefault", "anotherRequiredWithDefault")));
|
||||||
assertThat((List<String>) generate.get("required"), containsInAnyOrder("requiredWithNoDefault"));
|
assertThat((List<String>) generate.get("required"), containsInAnyOrder("requiredWithNoDefault"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -466,6 +466,11 @@ class JsonSchemaGeneratorTest {
|
|||||||
@Builder.Default
|
@Builder.Default
|
||||||
private Property<TaskWithEnum.TestClass> requiredWithDefault = Property.ofValue(TaskWithEnum.TestClass.builder().testProperty("test").build());
|
private Property<TaskWithEnum.TestClass> requiredWithDefault = Property.ofValue(TaskWithEnum.TestClass.builder().testProperty("test").build());
|
||||||
|
|
||||||
|
@PluginProperty
|
||||||
|
@NotNull
|
||||||
|
@Builder.Default
|
||||||
|
private Property<TaskWithEnum.TestClass> anotherRequiredWithDefault = Property.ofValue(TaskWithEnum.TestClass.builder().testProperty("test2").build());
|
||||||
|
|
||||||
@PluginProperty
|
@PluginProperty
|
||||||
@NotNull
|
@NotNull
|
||||||
private Property<TaskWithEnum.TestClass> requiredWithNoDefault;
|
private Property<TaskWithEnum.TestClass> requiredWithNoDefault;
|
||||||
|
|||||||
@@ -44,6 +44,7 @@ import java.time.ZonedDateTime;
|
|||||||
import java.time.format.DateTimeFormatter;
|
import java.time.format.DateTimeFormatter;
|
||||||
import java.time.temporal.ChronoUnit;
|
import java.time.temporal.ChronoUnit;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import static io.kestra.core.models.flows.FlowScope.USER;
|
import static io.kestra.core.models.flows.FlowScope.USER;
|
||||||
@@ -740,4 +741,16 @@ public abstract class AbstractExecutionRepositoryTest {
|
|||||||
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
|
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
|
||||||
assertThat(executions.size()).isEqualTo(0L);
|
assertThat(executions.size()).isEqualTo(0L);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
protected void shouldReturnLastExecutionsWhenInputsAreNull() {
|
||||||
|
inject();
|
||||||
|
|
||||||
|
List<Execution> lastExecutions = executionRepository.lastExecutions(MAIN_TENANT, null);
|
||||||
|
|
||||||
|
assertThat(lastExecutions).isNotEmpty();
|
||||||
|
Set<String> flowIds = lastExecutions.stream().map(Execution::getFlowId).collect(Collectors.toSet());
|
||||||
|
assertThat(flowIds.size()).isEqualTo(lastExecutions.size());
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -387,6 +387,13 @@ public abstract class AbstractRunnerTest {
|
|||||||
forEachItemCaseTest.forEachItemInIf();
|
forEachItemCaseTest.forEachItemInIf();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@LoadFlows({"flows/valids/for-each-item-subflow-after-execution.yaml",
|
||||||
|
"flows/valids/for-each-item-after-execution.yaml"})
|
||||||
|
protected void forEachItemWithAfterExecution() throws Exception {
|
||||||
|
forEachItemCaseTest.forEachItemWithAfterExecution();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@LoadFlows({"flows/valids/flow-concurrency-cancel.yml"})
|
@LoadFlows({"flows/valids/flow-concurrency-cancel.yml"})
|
||||||
void concurrencyCancel() throws Exception {
|
void concurrencyCancel() throws Exception {
|
||||||
@@ -423,6 +430,24 @@ public abstract class AbstractRunnerTest {
|
|||||||
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem();
|
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@LoadFlows({"flows/valids/flow-concurrency-queue-fail.yml"})
|
||||||
|
protected void concurrencyQueueRestarted() throws Exception {
|
||||||
|
flowConcurrencyCaseTest.flowConcurrencyQueueRestarted();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@LoadFlows({"flows/valids/flow-concurrency-queue-after-execution.yml"})
|
||||||
|
void concurrencyQueueAfterExecution() throws Exception {
|
||||||
|
flowConcurrencyCaseTest.flowConcurrencyQueueAfterExecution();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@LoadFlows({"flows/valids/flow-concurrency-subflow.yml", "flows/valids/flow-concurrency-cancel.yml"})
|
||||||
|
void flowConcurrencySubflow() throws Exception {
|
||||||
|
flowConcurrencyCaseTest.flowConcurrencySubflow();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ExecuteFlow("flows/valids/executable-fail.yml")
|
@ExecuteFlow("flows/valids/executable-fail.yml")
|
||||||
void badExecutable(Execution execution) {
|
void badExecutable(Execution execution) {
|
||||||
|
|||||||
@@ -442,4 +442,22 @@ class ExecutionServiceTest {
|
|||||||
assertThat(killed.findTaskRunsByTaskId("pause").getFirst().getState().getCurrent()).isEqualTo(State.Type.KILLED);
|
assertThat(killed.findTaskRunsByTaskId("pause").getFirst().getState().getCurrent()).isEqualTo(State.Type.KILLED);
|
||||||
assertThat(killed.getState().getHistories()).hasSize(5);
|
assertThat(killed.getState().getHistories()).hasSize(5);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@LoadFlows({"flows/valids/change-state-errors.yaml"})
|
||||||
|
void changeStateWithErrorBranch() throws Exception {
|
||||||
|
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "change-state-errors");
|
||||||
|
Flow flow = flowRepository.findByExecution(execution);
|
||||||
|
|
||||||
|
assertThat(execution.getTaskRunList()).hasSize(3);
|
||||||
|
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||||
|
|
||||||
|
Execution restart = executionService.changeTaskRunState(execution, flow, execution.findTaskRunsByTaskId("make_error").getFirst().getId(), State.Type.SUCCESS);
|
||||||
|
|
||||||
|
assertThat(restart.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
||||||
|
assertThat(restart.getMetadata().getAttemptNumber()).isEqualTo(2);
|
||||||
|
assertThat(restart.getState().getHistories()).hasSize(4);
|
||||||
|
assertThat(restart.getTaskRunList()).hasSize(2);
|
||||||
|
assertThat(restart.findTaskRunsByTaskId("make_error").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -8,6 +8,7 @@ import io.kestra.core.queues.QueueException;
|
|||||||
import io.kestra.core.queues.QueueFactoryInterface;
|
import io.kestra.core.queues.QueueFactoryInterface;
|
||||||
import io.kestra.core.queues.QueueInterface;
|
import io.kestra.core.queues.QueueInterface;
|
||||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||||
|
import io.kestra.core.services.ExecutionService;
|
||||||
import io.kestra.core.storages.StorageInterface;
|
import io.kestra.core.storages.StorageInterface;
|
||||||
import io.kestra.core.utils.TestsUtils;
|
import io.kestra.core.utils.TestsUtils;
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
@@ -53,6 +54,9 @@ public class FlowConcurrencyCaseTest {
|
|||||||
@Named(QueueFactoryInterface.EXECUTION_NAMED)
|
@Named(QueueFactoryInterface.EXECUTION_NAMED)
|
||||||
protected QueueInterface<Execution> executionQueue;
|
protected QueueInterface<Execution> executionQueue;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private ExecutionService executionService;
|
||||||
|
|
||||||
public void flowConcurrencyCancel() throws TimeoutException, QueueException, InterruptedException {
|
public void flowConcurrencyCancel() throws TimeoutException, QueueException, InterruptedException {
|
||||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
|
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
|
||||||
Execution execution2 = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel");
|
Execution execution2 = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel");
|
||||||
@@ -278,6 +282,161 @@ public class FlowConcurrencyCaseTest {
|
|||||||
assertThat(terminated.getState().getCurrent()).isEqualTo(Type.SUCCESS);
|
assertThat(terminated.getState().getCurrent()).isEqualTo(Type.SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void flowConcurrencyQueueRestarted() throws Exception {
|
||||||
|
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-fail", null, null, Duration.ofSeconds(30));
|
||||||
|
Flow flow = flowRepository
|
||||||
|
.findById(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-fail", Optional.empty())
|
||||||
|
.orElseThrow();
|
||||||
|
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
||||||
|
executionQueue.emit(execution2);
|
||||||
|
|
||||||
|
assertThat(execution1.getState().isRunning()).isTrue();
|
||||||
|
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CREATED);
|
||||||
|
|
||||||
|
var executionResult1 = new AtomicReference<Execution>();
|
||||||
|
var executionResult2 = new AtomicReference<Execution>();
|
||||||
|
|
||||||
|
CountDownLatch latch1 = new CountDownLatch(2);
|
||||||
|
AtomicReference<Execution> failedExecution = new AtomicReference<>();
|
||||||
|
CountDownLatch latch2 = new CountDownLatch(1);
|
||||||
|
CountDownLatch latch3 = new CountDownLatch(1);
|
||||||
|
|
||||||
|
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
|
||||||
|
if (e.getLeft().getId().equals(execution1.getId())) {
|
||||||
|
executionResult1.set(e.getLeft());
|
||||||
|
if (e.getLeft().getState().getCurrent() == Type.FAILED) {
|
||||||
|
failedExecution.set(e.getLeft());
|
||||||
|
latch1.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (e.getLeft().getId().equals(execution2.getId())) {
|
||||||
|
executionResult2.set(e.getLeft());
|
||||||
|
if (e.getLeft().getState().getCurrent() == State.Type.RUNNING) {
|
||||||
|
latch2.countDown();
|
||||||
|
}
|
||||||
|
if (e.getLeft().getState().getCurrent() == Type.FAILED) {
|
||||||
|
latch3.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
assertTrue(latch2.await(1, TimeUnit.MINUTES));
|
||||||
|
assertThat(failedExecution.get()).isNotNull();
|
||||||
|
// here the first fail and the second is now running.
|
||||||
|
// we restart the first one, it should be queued then fail again.
|
||||||
|
Execution restarted = executionService.restart(failedExecution.get(), null);
|
||||||
|
executionQueue.emit(restarted);
|
||||||
|
|
||||||
|
assertTrue(latch3.await(1, TimeUnit.MINUTES));
|
||||||
|
assertTrue(latch1.await(1, TimeUnit.MINUTES));
|
||||||
|
receive.blockLast();
|
||||||
|
|
||||||
|
assertThat(executionResult1.get().getState().getCurrent()).isEqualTo(Type.FAILED);
|
||||||
|
// it should have been queued after restarted
|
||||||
|
assertThat(executionResult1.get().getState().getHistories().stream().anyMatch(history -> history.getState() == Type.RESTARTED)).isTrue();
|
||||||
|
assertThat(executionResult1.get().getState().getHistories().stream().anyMatch(history -> history.getState() == Type.QUEUED)).isTrue();
|
||||||
|
assertThat(executionResult2.get().getState().getCurrent()).isEqualTo(Type.FAILED);
|
||||||
|
assertThat(executionResult2.get().getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
|
||||||
|
assertThat(executionResult2.get().getState().getHistories().get(1).getState()).isEqualTo(State.Type.QUEUED);
|
||||||
|
assertThat(executionResult2.get().getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void flowConcurrencyQueueAfterExecution() throws TimeoutException, QueueException, InterruptedException {
|
||||||
|
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-after-execution", null, null, Duration.ofSeconds(30));
|
||||||
|
Flow flow = flowRepository
|
||||||
|
.findById(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-after-execution", Optional.empty())
|
||||||
|
.orElseThrow();
|
||||||
|
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
||||||
|
executionQueue.emit(execution2);
|
||||||
|
|
||||||
|
assertThat(execution1.getState().isRunning()).isTrue();
|
||||||
|
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CREATED);
|
||||||
|
|
||||||
|
var executionResult1 = new AtomicReference<Execution>();
|
||||||
|
var executionResult2 = new AtomicReference<Execution>();
|
||||||
|
|
||||||
|
CountDownLatch latch1 = new CountDownLatch(1);
|
||||||
|
CountDownLatch latch2 = new CountDownLatch(1);
|
||||||
|
CountDownLatch latch3 = new CountDownLatch(1);
|
||||||
|
|
||||||
|
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
|
||||||
|
if (e.getLeft().getId().equals(execution1.getId())) {
|
||||||
|
executionResult1.set(e.getLeft());
|
||||||
|
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
|
||||||
|
latch1.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (e.getLeft().getId().equals(execution2.getId())) {
|
||||||
|
executionResult2.set(e.getLeft());
|
||||||
|
if (e.getLeft().getState().getCurrent() == State.Type.RUNNING) {
|
||||||
|
latch2.countDown();
|
||||||
|
}
|
||||||
|
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
|
||||||
|
latch3.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
assertTrue(latch1.await(1, TimeUnit.MINUTES));
|
||||||
|
assertTrue(latch2.await(1, TimeUnit.MINUTES));
|
||||||
|
assertTrue(latch3.await(1, TimeUnit.MINUTES));
|
||||||
|
receive.blockLast();
|
||||||
|
|
||||||
|
assertThat(executionResult1.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||||
|
assertThat(executionResult2.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||||
|
assertThat(executionResult2.get().getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
|
||||||
|
assertThat(executionResult2.get().getState().getHistories().get(1).getState()).isEqualTo(State.Type.QUEUED);
|
||||||
|
assertThat(executionResult2.get().getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void flowConcurrencySubflow() throws TimeoutException, QueueException, InterruptedException {
|
||||||
|
CountDownLatch successLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch canceledLatch = new CountDownLatch(1);
|
||||||
|
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
|
||||||
|
if (e.getLeft().getFlowId().equals("flow-concurrency-cancel")) {
|
||||||
|
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
|
||||||
|
successLatch.countDown();
|
||||||
|
}
|
||||||
|
if (e.getLeft().getState().getCurrent() == Type.CANCELLED) {
|
||||||
|
canceledLatch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// FIXME we should fail if we receive the cancel execution again but on Kafka it happens
|
||||||
|
});
|
||||||
|
|
||||||
|
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-subflow", null, null, Duration.ofSeconds(30));
|
||||||
|
Execution execution2 = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-subflow");
|
||||||
|
|
||||||
|
assertThat(execution1.getState().isRunning()).isTrue();
|
||||||
|
assertThat(execution2.getState().getCurrent()).isEqualTo(Type.SUCCESS);
|
||||||
|
|
||||||
|
// assert we have one canceled subflow and one in success
|
||||||
|
assertTrue(canceledLatch.await(1, TimeUnit.MINUTES));
|
||||||
|
assertTrue(successLatch.await(1, TimeUnit.MINUTES));
|
||||||
|
receive.blockLast();
|
||||||
|
|
||||||
|
// run another execution to be sure that everything work (purge is correctly done)
|
||||||
|
CountDownLatch newSuccessLatch = new CountDownLatch(1);
|
||||||
|
Flux<Execution> secondReceive = TestsUtils.receive(executionQueue, e -> {
|
||||||
|
if (e.getLeft().getFlowId().equals("flow-concurrency-cancel")) {
|
||||||
|
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
|
||||||
|
newSuccessLatch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// FIXME we should fail if we receive the cancel execution again but on Kafka it happens
|
||||||
|
});
|
||||||
|
Execution execution3 = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-subflow");
|
||||||
|
assertThat(execution3.getState().getCurrent()).isEqualTo(Type.SUCCESS);
|
||||||
|
|
||||||
|
// assert we have two successful subflow
|
||||||
|
assertTrue(newSuccessLatch.await(1, TimeUnit.MINUTES));
|
||||||
|
secondReceive.blockLast();
|
||||||
|
}
|
||||||
|
|
||||||
private URI storageUpload() throws URISyntaxException, IOException {
|
private URI storageUpload() throws URISyntaxException, IOException {
|
||||||
File tempFile = File.createTempFile("file", ".txt");
|
File tempFile = File.createTempFile("file", ".txt");
|
||||||
|
|
||||||
|
|||||||
@@ -83,4 +83,24 @@ class RunContextPropertyTest {
|
|||||||
runContextProperty = new RunContextProperty<>(Property.<Map<String, String>>builder().expression("{ \"key\": \"{{ key }}\"}").build(), runContext);
|
runContextProperty = new RunContextProperty<>(Property.<Map<String, String>>builder().expression("{ \"key\": \"{{ key }}\"}").build(), runContext);
|
||||||
assertThat(runContextProperty.asMap(String.class, String.class, Map.of("key", "value"))).containsEntry("key", "value");
|
assertThat(runContextProperty.asMap(String.class, String.class, Map.of("key", "value"))).containsEntry("key", "value");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void asShouldReturnCachedRenderedProperty() throws IllegalVariableEvaluationException {
|
||||||
|
var runContext = runContextFactory.of();
|
||||||
|
|
||||||
|
var runContextProperty = new RunContextProperty<>(Property.<String>builder().expression("{{ variable }}").build(), runContext);
|
||||||
|
|
||||||
|
assertThat(runContextProperty.as(String.class, Map.of("variable", "value1"))).isEqualTo(Optional.of("value1"));
|
||||||
|
assertThat(runContextProperty.as(String.class, Map.of("variable", "value2"))).isEqualTo(Optional.of("value1"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void asShouldNotReturnCachedRenderedPropertyWithSkipCache() throws IllegalVariableEvaluationException {
|
||||||
|
var runContext = runContextFactory.of();
|
||||||
|
|
||||||
|
var runContextProperty = new RunContextProperty<>(Property.<String>builder().expression("{{ variable }}").build(), runContext);
|
||||||
|
|
||||||
|
assertThat(runContextProperty.as(String.class, Map.of("variable", "value1"))).isEqualTo(Optional.of("value1"));
|
||||||
|
assertThat(runContextProperty.skipCache().as(String.class, Map.of("variable", "value2"))).isEqualTo(Optional.of("value2"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -140,7 +140,7 @@ class RunContextTest {
|
|||||||
List<LogEntry> logs = new CopyOnWriteArrayList<>();
|
List<LogEntry> logs = new CopyOnWriteArrayList<>();
|
||||||
Flux<LogEntry> receive = TestsUtils.receive(workerTaskLogQueue, either -> logs.add(either.getLeft()));
|
Flux<LogEntry> receive = TestsUtils.receive(workerTaskLogQueue, either -> logs.add(either.getLeft()));
|
||||||
|
|
||||||
char[] chars = new char[1024 * 11];
|
char[] chars = new char[1024 * 16];
|
||||||
Arrays.fill(chars, 'a');
|
Arrays.fill(chars, 'a');
|
||||||
|
|
||||||
Map<String, Object> inputs = new HashMap<>(InputsTest.inputs);
|
Map<String, Object> inputs = new HashMap<>(InputsTest.inputs);
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import io.kestra.core.junit.annotations.LoadFlows;
|
|||||||
import io.kestra.core.models.annotations.Plugin;
|
import io.kestra.core.models.annotations.Plugin;
|
||||||
import io.kestra.core.models.executions.Execution;
|
import io.kestra.core.models.executions.Execution;
|
||||||
import io.kestra.core.models.flows.State;
|
import io.kestra.core.models.flows.State;
|
||||||
|
import io.kestra.core.models.property.Property;
|
||||||
import io.kestra.core.models.tasks.RunnableTask;
|
import io.kestra.core.models.tasks.RunnableTask;
|
||||||
import io.kestra.core.models.tasks.Task;
|
import io.kestra.core.models.tasks.Task;
|
||||||
import io.kestra.core.queues.QueueException;
|
import io.kestra.core.queues.QueueException;
|
||||||
@@ -18,6 +19,7 @@ import org.junit.jupiter.api.BeforeEach;
|
|||||||
import org.junit.jupiter.api.Disabled;
|
import org.junit.jupiter.api.Disabled;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
@@ -77,8 +79,12 @@ public class TaskCacheTest {
|
|||||||
@Plugin
|
@Plugin
|
||||||
public static class CounterTask extends Task implements RunnableTask<CounterTask.Output> {
|
public static class CounterTask extends Task implements RunnableTask<CounterTask.Output> {
|
||||||
|
|
||||||
|
private String workingDir;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Output run(RunContext runContext) throws Exception {
|
public Output run(RunContext runContext) throws Exception {
|
||||||
|
Map<String, Object> variables = Map.of("workingDir", runContext.workingDir().path().toString());
|
||||||
|
runContext.render(this.workingDir, variables);
|
||||||
return Output.builder()
|
return Output.builder()
|
||||||
.counter(COUNTER.incrementAndGet())
|
.counter(COUNTER.incrementAndGet())
|
||||||
.build();
|
.build();
|
||||||
|
|||||||
@@ -6,6 +6,8 @@ import io.kestra.core.models.executions.Execution;
|
|||||||
import io.kestra.core.models.flows.State;
|
import io.kestra.core.models.flows.State;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
@KestraTest(startRunner = true)
|
@KestraTest(startRunner = true)
|
||||||
@@ -31,4 +33,15 @@ class TaskWithRunIfTest {
|
|||||||
assertThat(execution.findTaskRunsByTaskId("log_test").getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
|
assertThat(execution.findTaskRunsByTaskId("log_test").getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@ExecuteFlow("flows/valids/task-runif-executionupdating.yml")
|
||||||
|
void executionUpdatingTask(Execution execution) {
|
||||||
|
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||||
|
assertThat(execution.getTaskRunList()).hasSize(5);
|
||||||
|
assertThat(execution.findTaskRunsByTaskId("skipSetVariables").getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
|
||||||
|
assertThat(execution.findTaskRunsByTaskId("skipUnsetVariables").getFirst().getState().getCurrent()).isEqualTo(State.Type.SKIPPED);
|
||||||
|
assertThat(execution.findTaskRunsByTaskId("unsetVariables").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||||
|
assertThat(execution.findTaskRunsByTaskId("setVariables").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||||
|
assertThat(execution.getVariables()).containsEntry("list", List.of(42));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -372,4 +372,44 @@ class FlowServiceTest {
|
|||||||
|
|
||||||
assertThat(exceptions.size()).isZero();
|
assertThat(exceptions.size()).isZero();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void shouldReturnValidationForRunnablePropsOnFlowable() {
|
||||||
|
// Given
|
||||||
|
String source = """
|
||||||
|
id: dolphin_164914
|
||||||
|
namespace: company.team
|
||||||
|
|
||||||
|
tasks:
|
||||||
|
- id: for
|
||||||
|
type: io.kestra.plugin.core.flow.ForEach
|
||||||
|
values: [1, 2, 3]
|
||||||
|
workerGroup:
|
||||||
|
key: toto
|
||||||
|
timeout: PT10S
|
||||||
|
taskCache:
|
||||||
|
enabled: true
|
||||||
|
tasks:
|
||||||
|
- id: hello
|
||||||
|
type: io.kestra.plugin.core.log.Log
|
||||||
|
message: Hello World! 🚀
|
||||||
|
workerGroup:
|
||||||
|
key: toto
|
||||||
|
timeout: PT10S
|
||||||
|
taskCache:
|
||||||
|
enabled: true
|
||||||
|
""";
|
||||||
|
|
||||||
|
// When
|
||||||
|
List<ValidateConstraintViolation> results = flowService.validate("my-tenant", source);
|
||||||
|
|
||||||
|
// Then
|
||||||
|
assertThat(results).hasSize(1);
|
||||||
|
assertThat(results.getFirst().getWarnings()).hasSize(3);
|
||||||
|
assertThat(results.getFirst().getWarnings()).containsExactlyInAnyOrder(
|
||||||
|
"The task 'for' cannot use the 'timeout' property as it's only relevant for runnable tasks.",
|
||||||
|
"The task 'for' cannot use the 'taskCache' property as it's only relevant for runnable tasks.",
|
||||||
|
"The task 'for' cannot use the 'workerGroup' property as it's only relevant for runnable tasks."
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -0,0 +1,142 @@
|
|||||||
|
package io.kestra.core.services;
|
||||||
|
|
||||||
|
import io.kestra.core.context.TestRunContextFactory;
|
||||||
|
import io.kestra.core.junit.annotations.KestraTest;
|
||||||
|
import io.kestra.core.models.Label;
|
||||||
|
import io.kestra.core.models.executions.Execution;
|
||||||
|
import io.kestra.core.models.executions.ExecutionKind;
|
||||||
|
import io.kestra.core.models.flows.Flow;
|
||||||
|
import io.kestra.core.models.flows.State;
|
||||||
|
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
|
||||||
|
import io.kestra.core.utils.IdUtils;
|
||||||
|
import io.kestra.plugin.core.log.Log;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import static io.kestra.core.repositories.AbstractFlowRepositoryTest.TEST_NAMESPACE;
|
||||||
|
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
|
@KestraTest
|
||||||
|
class FlowTriggerServiceTest {
|
||||||
|
public static final List<Label> EMPTY_LABELS = List.of();
|
||||||
|
public static final Optional<MultipleConditionStorageInterface> EMPTY_MULTIPLE_CONDITION_STORAGE = Optional.empty();
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
private TestRunContextFactory runContextFactory;
|
||||||
|
@Inject
|
||||||
|
private ConditionService conditionService;
|
||||||
|
@Inject
|
||||||
|
private FlowService flowService;
|
||||||
|
private FlowTriggerService flowTriggerService;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setUp() {
|
||||||
|
flowTriggerService = new FlowTriggerService(conditionService, runContextFactory, flowService);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void computeExecutionsFromFlowTriggers_ok() {
|
||||||
|
var simpleFlow = aSimpleFlow();
|
||||||
|
var flowWithFlowTrigger = Flow.builder()
|
||||||
|
.id("flow-with-flow-trigger")
|
||||||
|
.namespace(TEST_NAMESPACE)
|
||||||
|
.tenantId(MAIN_TENANT)
|
||||||
|
.tasks(List.of(simpleLogTask()))
|
||||||
|
.triggers(List.of(
|
||||||
|
flowTriggerWithNoConditions()
|
||||||
|
))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
var simpleFlowExecution = Execution.newExecution(simpleFlow, EMPTY_LABELS).withState(State.Type.SUCCESS);
|
||||||
|
|
||||||
|
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
|
||||||
|
simpleFlowExecution,
|
||||||
|
List.of(simpleFlow, flowWithFlowTrigger),
|
||||||
|
EMPTY_MULTIPLE_CONDITION_STORAGE
|
||||||
|
);
|
||||||
|
|
||||||
|
assertThat(resultingExecutionsToRun).size().isEqualTo(1);
|
||||||
|
assertThat(resultingExecutionsToRun.get(0).getFlowId()).isEqualTo(flowWithFlowTrigger.getId());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void computeExecutionsFromFlowTriggers_filteringOutCreatedExecutions() {
|
||||||
|
var simpleFlow = aSimpleFlow();
|
||||||
|
var flowWithFlowTrigger = Flow.builder()
|
||||||
|
.id("flow-with-flow-trigger")
|
||||||
|
.namespace(TEST_NAMESPACE)
|
||||||
|
.tenantId(MAIN_TENANT)
|
||||||
|
.tasks(List.of(simpleLogTask()))
|
||||||
|
.triggers(List.of(
|
||||||
|
flowTriggerWithNoConditions()
|
||||||
|
))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
var simpleFlowExecution = Execution.newExecution(simpleFlow, EMPTY_LABELS).withState(State.Type.CREATED);
|
||||||
|
|
||||||
|
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
|
||||||
|
simpleFlowExecution,
|
||||||
|
List.of(simpleFlow, flowWithFlowTrigger),
|
||||||
|
EMPTY_MULTIPLE_CONDITION_STORAGE
|
||||||
|
);
|
||||||
|
|
||||||
|
assertThat(resultingExecutionsToRun).size().isEqualTo(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void computeExecutionsFromFlowTriggers_filteringOutTestExecutions() {
|
||||||
|
var simpleFlow = aSimpleFlow();
|
||||||
|
var flowWithFlowTrigger = Flow.builder()
|
||||||
|
.id("flow-with-flow-trigger")
|
||||||
|
.namespace(TEST_NAMESPACE)
|
||||||
|
.tenantId(MAIN_TENANT)
|
||||||
|
.tasks(List.of(simpleLogTask()))
|
||||||
|
.triggers(List.of(
|
||||||
|
flowTriggerWithNoConditions()
|
||||||
|
))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
var simpleFlowExecutionComingFromATest = Execution.newExecution(simpleFlow, EMPTY_LABELS)
|
||||||
|
.withState(State.Type.SUCCESS)
|
||||||
|
.toBuilder()
|
||||||
|
.kind(ExecutionKind.TEST)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
|
||||||
|
simpleFlowExecutionComingFromATest,
|
||||||
|
List.of(simpleFlow, flowWithFlowTrigger),
|
||||||
|
EMPTY_MULTIPLE_CONDITION_STORAGE
|
||||||
|
);
|
||||||
|
|
||||||
|
assertThat(resultingExecutionsToRun).size().isEqualTo(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Flow aSimpleFlow() {
|
||||||
|
return Flow.builder()
|
||||||
|
.id("simple-flow")
|
||||||
|
.namespace(TEST_NAMESPACE)
|
||||||
|
.tenantId(MAIN_TENANT)
|
||||||
|
.tasks(List.of(simpleLogTask()))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static io.kestra.plugin.core.trigger.Flow flowTriggerWithNoConditions() {
|
||||||
|
return io.kestra.plugin.core.trigger.Flow.builder()
|
||||||
|
.id("flowTrigger")
|
||||||
|
.type(io.kestra.plugin.core.trigger.Flow.class.getName())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Log simpleLogTask() {
|
||||||
|
return Log.builder()
|
||||||
|
.id(IdUtils.create())
|
||||||
|
.type(Log.class.getName())
|
||||||
|
.message("Hello World")
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -38,7 +38,7 @@ class ExitTest {
|
|||||||
@ExecuteFlow("flows/valids/exit.yaml")
|
@ExecuteFlow("flows/valids/exit.yaml")
|
||||||
void shouldExitTheExecution(Execution execution) {
|
void shouldExitTheExecution(Execution execution) {
|
||||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.WARNING);
|
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.WARNING);
|
||||||
assertThat(execution.getTaskRunList().size()).isEqualTo(2);
|
assertThat(execution.getTaskRunList()).hasSize(2);
|
||||||
assertThat(execution.getTaskRunList().getFirst().getState().getCurrent()).isEqualTo(State.Type.WARNING);
|
assertThat(execution.getTaskRunList().getFirst().getState().getCurrent()).isEqualTo(State.Type.WARNING);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -68,4 +68,14 @@ class ExitTest {
|
|||||||
assertThat(killedExecution.get().getTaskRunList().get(1).getState().getCurrent()).isEqualTo(State.Type.KILLED);
|
assertThat(killedExecution.get().getTaskRunList().get(1).getState().getCurrent()).isEqualTo(State.Type.KILLED);
|
||||||
receive.blockLast();
|
receive.blockLast();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@ExecuteFlow("flows/valids/exit-nested.yaml")
|
||||||
|
void shouldExitAndFailNestedIf(Execution execution) {
|
||||||
|
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||||
|
assertThat(execution.getTaskRunList()).hasSize(4);
|
||||||
|
assertThat(execution.findTaskRunsByTaskId("if_some_bool").getFirst().getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||||
|
assertThat(execution.findTaskRunsByTaskId("nested_bool_check").getFirst().getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||||
|
assertThat(execution.findTaskRunsByTaskId("nested_was_false").getFirst().getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -372,6 +372,51 @@ public class ForEachItemCaseTest {
|
|||||||
assertThat(correlationId.get().value()).isEqualTo(execution.getId());
|
assertThat(correlationId.get().value()).isEqualTo(execution.getId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void forEachItemWithAfterExecution() throws TimeoutException, InterruptedException, URISyntaxException, IOException, QueueException {
|
||||||
|
CountDownLatch countDownLatch = new CountDownLatch(26);
|
||||||
|
AtomicReference<Execution> triggered = new AtomicReference<>();
|
||||||
|
|
||||||
|
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
|
||||||
|
Execution execution = either.getLeft();
|
||||||
|
if (execution.getFlowId().equals("for-each-item-subflow-after-execution") && execution.getState().getCurrent().isTerminated()) {
|
||||||
|
triggered.set(execution);
|
||||||
|
countDownLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
URI file = storageUpload();
|
||||||
|
Map<String, Object> inputs = Map.of("file", file.toString(), "batch", 4);
|
||||||
|
Execution execution = runnerUtils.runOne(MAIN_TENANT, TEST_NAMESPACE, "for-each-item-after-execution", null,
|
||||||
|
(flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs),
|
||||||
|
Duration.ofSeconds(30));
|
||||||
|
|
||||||
|
// we should have triggered 26 subflows
|
||||||
|
assertThat(countDownLatch.await(1, TimeUnit.MINUTES)).isTrue();
|
||||||
|
receive.blockLast();
|
||||||
|
|
||||||
|
// assert on the main flow execution
|
||||||
|
assertThat(execution.getTaskRunList()).hasSize(5);
|
||||||
|
assertThat(execution.getTaskRunList().get(2).getAttempts()).hasSize(1);
|
||||||
|
assertThat(execution.getTaskRunList().get(2).getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||||
|
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||||
|
Map<String, Object> outputs = execution.getTaskRunList().get(2).getOutputs();
|
||||||
|
assertThat(outputs.get("numberOfBatches")).isEqualTo(26);
|
||||||
|
assertThat(outputs.get("iterations")).isNotNull();
|
||||||
|
Map<String, Integer> iterations = (Map<String, Integer>) outputs.get("iterations");
|
||||||
|
assertThat(iterations.get("CREATED")).isZero();
|
||||||
|
assertThat(iterations.get("RUNNING")).isZero();
|
||||||
|
assertThat(iterations.get("SUCCESS")).isEqualTo(26);
|
||||||
|
|
||||||
|
// assert on the last subflow execution
|
||||||
|
assertThat(triggered.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||||
|
assertThat(triggered.get().getFlowId()).isEqualTo("for-each-item-subflow-after-execution");
|
||||||
|
assertThat((String) triggered.get().getInputs().get("items")).matches("kestra:///io/kestra/tests/for-each-item-after-execution/executions/.*/tasks/each-split/.*\\.txt");
|
||||||
|
assertThat(triggered.get().getTaskRunList()).hasSize(2);
|
||||||
|
Optional<Label> correlationId = triggered.get().getLabels().stream().filter(label -> label.key().equals(Label.CORRELATION_ID)).findAny();
|
||||||
|
assertThat(correlationId.isPresent()).isTrue();
|
||||||
|
assertThat(correlationId.get().value()).isEqualTo(execution.getId());
|
||||||
|
}
|
||||||
|
|
||||||
private URI storageUpload() throws URISyntaxException, IOException {
|
private URI storageUpload() throws URISyntaxException, IOException {
|
||||||
File tempFile = File.createTempFile("file", ".txt");
|
File tempFile = File.createTempFile("file", ".txt");
|
||||||
|
|
||||||
|
|||||||
@@ -58,4 +58,15 @@ class ParallelTest {
|
|||||||
assertThat(execution.findTaskRunsByTaskId("a2").getFirst().getState().getStartDate().isAfter(execution.findTaskRunsByTaskId("a1").getFirst().getState().getEndDate().orElseThrow())).isTrue();
|
assertThat(execution.findTaskRunsByTaskId("a2").getFirst().getState().getStartDate().isAfter(execution.findTaskRunsByTaskId("a1").getFirst().getState().getEndDate().orElseThrow())).isTrue();
|
||||||
assertThat(execution.findTaskRunsByTaskId("e2").getFirst().getState().getStartDate().isAfter(execution.findTaskRunsByTaskId("e1").getFirst().getState().getEndDate().orElseThrow())).isTrue();
|
assertThat(execution.findTaskRunsByTaskId("e2").getFirst().getState().getStartDate().isAfter(execution.findTaskRunsByTaskId("e1").getFirst().getState().getEndDate().orElseThrow())).isTrue();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@ExecuteFlow("flows/valids/parallel-fail-with-flowable.yaml")
|
||||||
|
void parallelFailWithFlowable(Execution execution) {
|
||||||
|
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||||
|
assertThat(execution.getTaskRunList()).hasSize(5);
|
||||||
|
// all tasks must be terminated except the Sleep that will ends later as everything is concurrent
|
||||||
|
execution.getTaskRunList().stream()
|
||||||
|
.filter(taskRun -> !"sleep".equals(taskRun.getTaskId()))
|
||||||
|
.forEach(run -> assertThat(run.getState().isTerminated()).isTrue());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,16 +4,24 @@ import io.kestra.core.junit.annotations.KestraTest;
|
|||||||
import io.kestra.core.junit.annotations.LoadFlows;
|
import io.kestra.core.junit.annotations.LoadFlows;
|
||||||
import io.kestra.core.models.Label;
|
import io.kestra.core.models.Label;
|
||||||
import io.kestra.core.models.executions.Execution;
|
import io.kestra.core.models.executions.Execution;
|
||||||
|
import io.kestra.core.models.flows.State;
|
||||||
import io.kestra.core.queues.QueueException;
|
import io.kestra.core.queues.QueueException;
|
||||||
|
import io.kestra.core.queues.QueueFactoryInterface;
|
||||||
|
import io.kestra.core.queues.QueueInterface;
|
||||||
import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||||
import io.kestra.core.runners.RunnerUtils;
|
import io.kestra.core.runners.RunnerUtils;
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
|
import jakarta.inject.Named;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
@KestraTest(startRunner = true)
|
@KestraTest(startRunner = true)
|
||||||
class SubflowRunnerTest {
|
class SubflowRunnerTest {
|
||||||
@@ -24,6 +32,10 @@ class SubflowRunnerTest {
|
|||||||
@Inject
|
@Inject
|
||||||
private ExecutionRepositoryInterface executionRepository;
|
private ExecutionRepositoryInterface executionRepository;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
@Named(QueueFactoryInterface.EXECUTION_NAMED)
|
||||||
|
protected QueueInterface<Execution> executionQueue;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@LoadFlows({"flows/valids/subflow-inherited-labels-child.yaml", "flows/valids/subflow-inherited-labels-parent.yaml"})
|
@LoadFlows({"flows/valids/subflow-inherited-labels-child.yaml", "flows/valids/subflow-inherited-labels-parent.yaml"})
|
||||||
void inheritedLabelsAreOverridden() throws QueueException, TimeoutException {
|
void inheritedLabelsAreOverridden() throws QueueException, TimeoutException {
|
||||||
@@ -50,4 +62,29 @@ class SubflowRunnerTest {
|
|||||||
new Label("parentFlowLabel2", "value2") // inherited from the parent flow
|
new Label("parentFlowLabel2", "value2") // inherited from the parent flow
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@LoadFlows({"flows/valids/subflow-parent-no-wait.yaml", "flows/valids/subflow-child-with-output.yaml"})
|
||||||
|
void subflowOutputWithoutWait() throws QueueException, TimeoutException, InterruptedException {
|
||||||
|
AtomicReference<Execution> childExecution = new AtomicReference<>();
|
||||||
|
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||||
|
Runnable closing = executionQueue.receive(either -> {
|
||||||
|
if (either.isLeft() && either.getLeft().getFlowId().equals("subflow-child-with-output") && either.getLeft().getState().isTerminated()) {
|
||||||
|
childExecution.set(either.getLeft());
|
||||||
|
countDownLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Execution parentExecution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "subflow-parent-no-wait");
|
||||||
|
String childExecutionId = (String) parentExecution.findTaskRunsByTaskId("subflow").getFirst().getOutputs().get("executionId");
|
||||||
|
assertThat(childExecutionId).isNotBlank();
|
||||||
|
assertThat(parentExecution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||||
|
assertThat(parentExecution.getTaskRunList()).hasSize(1);
|
||||||
|
|
||||||
|
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
|
||||||
|
assertThat(childExecution.get().getId()).isEqualTo(childExecutionId);
|
||||||
|
assertThat(childExecution.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||||
|
assertThat(childExecution.get().getTaskRunList()).hasSize(1);
|
||||||
|
closing.run();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -206,14 +206,20 @@ class SetTest {
|
|||||||
|
|
||||||
kv = createAndPerformSetTask("[{\"some\":\"value\"},{\"another\":\"value\"}]", KVType.JSON);
|
kv = createAndPerformSetTask("[{\"some\":\"value\"},{\"another\":\"value\"}]", KVType.JSON);
|
||||||
assertThat(kv.getValue(TEST_KEY).orElseThrow().value()).isEqualTo(List.of(Map.of("some", "value"), Map.of("another", "value")));
|
assertThat(kv.getValue(TEST_KEY).orElseThrow().value()).isEqualTo(List.of(Map.of("some", "value"), Map.of("another", "value")));
|
||||||
|
|
||||||
|
kv = createAndPerformSetTask("{{ 200 }}", KVType.STRING);
|
||||||
|
assertThat(kv.getValue(TEST_KEY).orElseThrow().value()).isEqualTo("200");
|
||||||
|
|
||||||
|
kv = createAndPerformSetTask("{{ 200.1 }}", KVType.STRING);
|
||||||
|
assertThat(kv.getValue(TEST_KEY).orElseThrow().value()).isEqualTo("200.1");
|
||||||
}
|
}
|
||||||
|
|
||||||
private KVStore createAndPerformSetTask(String value, KVType type) throws Exception {
|
private KVStore createAndPerformSetTask(String value, KVType type) throws Exception {
|
||||||
Set set = Set.builder()
|
Set set = Set.builder()
|
||||||
.id(Set.class.getSimpleName())
|
.id(Set.class.getSimpleName())
|
||||||
.type(Set.class.getName())
|
.type(Set.class.getName())
|
||||||
.key(new Property<>(TEST_KEY))
|
.key(Property.ofValue(TEST_KEY))
|
||||||
.value(new Property<>(value))
|
.value(value.contains("{{") ? Property.ofExpression(value) : Property.ofValue(value))
|
||||||
.kvType(Property.ofValue(type))
|
.kvType(Property.ofValue(type))
|
||||||
.build();
|
.build();
|
||||||
final RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, set, null);
|
final RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, set, null);
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ namespace: io.kestra.tests
|
|||||||
tasks:
|
tasks:
|
||||||
- id: cache
|
- id: cache
|
||||||
type: io.kestra.core.runners.TaskCacheTest$CounterTask
|
type: io.kestra.core.runners.TaskCacheTest$CounterTask
|
||||||
|
workingDir: "{{workingDir}}"
|
||||||
taskCache:
|
taskCache:
|
||||||
enabled: true
|
enabled: true
|
||||||
ttl: PT1S
|
ttl: PT1S
|
||||||
@@ -0,0 +1,18 @@
|
|||||||
|
id: change-state-errors
|
||||||
|
namespace: io.kestra.tests
|
||||||
|
|
||||||
|
errors:
|
||||||
|
- id: print_error_log
|
||||||
|
type: io.kestra.plugin.core.log.Log
|
||||||
|
message: "Failure alert for flow {{ flow.namespace }}.{{ flow.id }} with ID {{ execution.id }}"
|
||||||
|
|
||||||
|
tasks:
|
||||||
|
- id: before_task
|
||||||
|
type: io.kestra.plugin.core.debug.Return
|
||||||
|
format: "this always works"
|
||||||
|
- id: make_error
|
||||||
|
type: io.kestra.plugin.core.debug.Return
|
||||||
|
format: "{{error}}"
|
||||||
|
- id: after-task
|
||||||
|
type: io.kestra.plugin.core.debug.Return
|
||||||
|
format: "after"
|
||||||
36
core/src/test/resources/flows/valids/exit-nested.yaml
Normal file
36
core/src/test/resources/flows/valids/exit-nested.yaml
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
id: exit-nested
|
||||||
|
namespace: io.kestra.tests
|
||||||
|
|
||||||
|
inputs:
|
||||||
|
- id: someBool
|
||||||
|
type: BOOL
|
||||||
|
defaults: true
|
||||||
|
|
||||||
|
- id: secondBool
|
||||||
|
type: BOOL
|
||||||
|
defaults: false
|
||||||
|
|
||||||
|
tasks:
|
||||||
|
- id: if_some_bool
|
||||||
|
type: io.kestra.plugin.core.flow.If
|
||||||
|
condition: "{{ inputs.someBool }}"
|
||||||
|
then:
|
||||||
|
- id: was_true
|
||||||
|
type: io.kestra.plugin.core.log.Log
|
||||||
|
message: The value was true
|
||||||
|
|
||||||
|
- id: nested_bool_check
|
||||||
|
type: io.kestra.plugin.core.flow.If
|
||||||
|
condition: "{{ inputs.secondBool }}"
|
||||||
|
then:
|
||||||
|
- id: was_also_true
|
||||||
|
type: io.kestra.plugin.core.log.Log
|
||||||
|
message: Was also true
|
||||||
|
else:
|
||||||
|
- id: nested_was_false
|
||||||
|
type: io.kestra.plugin.core.execution.Exit
|
||||||
|
state: FAILED
|
||||||
|
else:
|
||||||
|
- id: was_false
|
||||||
|
type: io.kestra.plugin.core.execution.Exit
|
||||||
|
state: FAILED
|
||||||
@@ -0,0 +1,17 @@
|
|||||||
|
id: flow-concurrency-queue-after-execution
|
||||||
|
namespace: io.kestra.tests
|
||||||
|
|
||||||
|
concurrency:
|
||||||
|
behavior: QUEUE
|
||||||
|
limit: 1
|
||||||
|
|
||||||
|
tasks:
|
||||||
|
- id: sleep
|
||||||
|
type: io.kestra.plugin.core.flow.Sleep
|
||||||
|
duration: PT2S
|
||||||
|
|
||||||
|
afterExecution:
|
||||||
|
- id: afterExecution
|
||||||
|
type: io.kestra.plugin.core.output.OutputValues
|
||||||
|
values:
|
||||||
|
some: value
|
||||||
@@ -0,0 +1,13 @@
|
|||||||
|
id: flow-concurrency-queue-fail
|
||||||
|
namespace: io.kestra.tests
|
||||||
|
|
||||||
|
concurrency:
|
||||||
|
behavior: QUEUE
|
||||||
|
limit: 1
|
||||||
|
|
||||||
|
tasks:
|
||||||
|
- id: sleep
|
||||||
|
type: io.kestra.plugin.core.flow.Sleep
|
||||||
|
duration: PT2S
|
||||||
|
- id: fail
|
||||||
|
type: io.kestra.plugin.core.execution.Fail
|
||||||
@@ -0,0 +1,8 @@
|
|||||||
|
id: flow-concurrency-subflow
|
||||||
|
namespace: io.kestra.tests
|
||||||
|
|
||||||
|
tasks:
|
||||||
|
- id: subflow
|
||||||
|
type: io.kestra.plugin.core.flow.Subflow
|
||||||
|
namespace: io.kestra.tests
|
||||||
|
flowId: flow-concurrency-cancel
|
||||||
@@ -0,0 +1,26 @@
|
|||||||
|
id: for-each-item-after-execution
|
||||||
|
namespace: io.kestra.tests
|
||||||
|
|
||||||
|
inputs:
|
||||||
|
- id: file
|
||||||
|
type: FILE
|
||||||
|
- id: batch
|
||||||
|
type: INT
|
||||||
|
|
||||||
|
tasks:
|
||||||
|
- id: each
|
||||||
|
type: io.kestra.plugin.core.flow.ForEachItem
|
||||||
|
items: "{{ inputs.file }}"
|
||||||
|
batch:
|
||||||
|
rows: "{{inputs.batch}}"
|
||||||
|
namespace: io.kestra.tests
|
||||||
|
flowId: for-each-item-subflow-after-execution
|
||||||
|
wait: true
|
||||||
|
transmitFailed: true
|
||||||
|
inputs:
|
||||||
|
items: "{{ taskrun.items }}"
|
||||||
|
|
||||||
|
afterExecution:
|
||||||
|
- id: afterExecution
|
||||||
|
type: io.kestra.plugin.core.log.Log
|
||||||
|
message: Hello from afterExecution!
|
||||||
@@ -0,0 +1,16 @@
|
|||||||
|
id: for-each-item-subflow-after-execution
|
||||||
|
namespace: io.kestra.tests
|
||||||
|
|
||||||
|
inputs:
|
||||||
|
- id: items
|
||||||
|
type: STRING
|
||||||
|
|
||||||
|
tasks:
|
||||||
|
- id: per-item
|
||||||
|
type: io.kestra.plugin.core.log.Log
|
||||||
|
message: "{{ inputs.items }}"
|
||||||
|
|
||||||
|
afterExecution:
|
||||||
|
- id: afterExecution
|
||||||
|
type: io.kestra.plugin.core.log.Log
|
||||||
|
message: Hello from afterExecution!
|
||||||
@@ -0,0 +1,28 @@
|
|||||||
|
id: parallel-fail-with-flowable
|
||||||
|
namespace: io.kestra.tests
|
||||||
|
|
||||||
|
inputs:
|
||||||
|
- id: user
|
||||||
|
type: STRING
|
||||||
|
defaults: Rick Astley
|
||||||
|
|
||||||
|
|
||||||
|
tasks:
|
||||||
|
- id: parallel
|
||||||
|
type: io.kestra.plugin.core.flow.Parallel
|
||||||
|
tasks:
|
||||||
|
- id: if-1
|
||||||
|
type: io.kestra.plugin.core.flow.If
|
||||||
|
condition: "{{ inputs.user == 'Rick Astley'}}"
|
||||||
|
then:
|
||||||
|
- id: sleep
|
||||||
|
type: io.kestra.plugin.core.flow.Sleep
|
||||||
|
duration: PT1S
|
||||||
|
|
||||||
|
- id: if-2
|
||||||
|
type: io.kestra.plugin.core.flow.If
|
||||||
|
condition: "{{ inputs.user == 'Rick Astley'}}"
|
||||||
|
then:
|
||||||
|
- id: fail_missing_variable
|
||||||
|
type: io.kestra.plugin.core.log.Log
|
||||||
|
message: "{{ vars.nonexistent_variable }}"
|
||||||
@@ -0,0 +1,12 @@
|
|||||||
|
id: subflow-child-with-output
|
||||||
|
namespace: io.kestra.tests
|
||||||
|
|
||||||
|
tasks:
|
||||||
|
- id: return
|
||||||
|
type: io.kestra.plugin.core.debug.Return
|
||||||
|
format: "Some value"
|
||||||
|
|
||||||
|
outputs:
|
||||||
|
- id: flow_a_output
|
||||||
|
type: STRING
|
||||||
|
value: "{{ outputs.return.value }}"
|
||||||
@@ -0,0 +1,9 @@
|
|||||||
|
id: subflow-parent-no-wait
|
||||||
|
namespace: io.kestra.tests
|
||||||
|
|
||||||
|
tasks:
|
||||||
|
- id: subflow
|
||||||
|
type: io.kestra.plugin.core.flow.Subflow
|
||||||
|
namespace: io.kestra.tests
|
||||||
|
flowId: subflow-child-with-output
|
||||||
|
wait: false
|
||||||
@@ -0,0 +1,35 @@
|
|||||||
|
id: task-runif-executionupdating
|
||||||
|
namespace: io.kestra.tests
|
||||||
|
|
||||||
|
variables:
|
||||||
|
list: []
|
||||||
|
|
||||||
|
tasks:
|
||||||
|
- id: output
|
||||||
|
type: io.kestra.plugin.core.output.OutputValues
|
||||||
|
values:
|
||||||
|
taskrun_data: 1
|
||||||
|
|
||||||
|
- id: unsetVariables
|
||||||
|
type: io.kestra.plugin.core.execution.UnsetVariables
|
||||||
|
runIf: "true"
|
||||||
|
variables:
|
||||||
|
- list
|
||||||
|
|
||||||
|
- id: setVariables
|
||||||
|
type: io.kestra.plugin.core.execution.SetVariables
|
||||||
|
runIf: "{{ outputs.output['values']['taskrun_data'] == 1 }}"
|
||||||
|
variables:
|
||||||
|
list: [42]
|
||||||
|
|
||||||
|
- id: skipSetVariables
|
||||||
|
type: io.kestra.plugin.core.execution.SetVariables
|
||||||
|
runIf: "false"
|
||||||
|
variables:
|
||||||
|
list: [1]
|
||||||
|
|
||||||
|
- id: skipUnsetVariables
|
||||||
|
type: io.kestra.plugin.core.execution.UnsetVariables
|
||||||
|
runIf: "{{ outputs.output['values']['taskrun_data'] == 2 }}"
|
||||||
|
variables:
|
||||||
|
- list
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
version=0.24.0-SNAPSHOT
|
version=0.24.8
|
||||||
|
|
||||||
org.gradle.jvmargs=-Xmx2g -XX:MaxMetaspaceSize=512m -XX:+HeapDumpOnOutOfMemoryError
|
org.gradle.jvmargs=-Xmx2g -XX:MaxMetaspaceSize=512m -XX:+HeapDumpOnOutOfMemoryError
|
||||||
org.gradle.parallel=true
|
org.gradle.parallel=true
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import io.kestra.core.models.executions.TaskRun;
|
|||||||
import io.kestra.core.models.executions.Variables;
|
import io.kestra.core.models.executions.Variables;
|
||||||
import io.kestra.core.models.flows.State;
|
import io.kestra.core.models.flows.State;
|
||||||
import io.kestra.core.queues.QueueException;
|
import io.kestra.core.queues.QueueException;
|
||||||
|
import io.kestra.core.queues.UnsupportedMessageException;
|
||||||
import io.kestra.core.runners.WorkerTaskResult;
|
import io.kestra.core.runners.WorkerTaskResult;
|
||||||
import io.kestra.core.utils.IdUtils;
|
import io.kestra.core.utils.IdUtils;
|
||||||
import io.kestra.jdbc.runner.JdbcQueueTest;
|
import io.kestra.jdbc.runner.JdbcQueueTest;
|
||||||
@@ -31,7 +32,8 @@ class PostgresQueueTest extends JdbcQueueTest {
|
|||||||
.build();
|
.build();
|
||||||
|
|
||||||
var exception = assertThrows(QueueException.class, () -> workerTaskResultQueue.emit(workerTaskResult));
|
var exception = assertThrows(QueueException.class, () -> workerTaskResultQueue.emit(workerTaskResult));
|
||||||
assertThat(exception.getMessage()).isEqualTo("Unable to emit a message to the queue");
|
assertThat(exception).isInstanceOf(UnsupportedMessageException.class);
|
||||||
|
assertThat(exception.getMessage()).contains("ERROR: unsupported Unicode escape sequence");
|
||||||
assertThat(exception.getCause()).isInstanceOf(DataException.class);
|
assertThat(exception.getCause()).isInstanceOf(DataException.class);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -869,8 +869,8 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Execution> lastExecutions(
|
public List<Execution> lastExecutions(
|
||||||
@Nullable String tenantId,
|
String tenantId,
|
||||||
List<FlowFilter> flows
|
@Nullable List<FlowFilter> flows
|
||||||
) {
|
) {
|
||||||
return this.jdbcRepository
|
return this.jdbcRepository
|
||||||
.getDslContextWrapper()
|
.getDslContextWrapper()
|
||||||
@@ -892,14 +892,19 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
|||||||
.and(NORMAL_KIND_CONDITION)
|
.and(NORMAL_KIND_CONDITION)
|
||||||
.and(field("end_date").isNotNull())
|
.and(field("end_date").isNotNull())
|
||||||
.and(DSL.or(
|
.and(DSL.or(
|
||||||
flows
|
ListUtils.emptyOnNull(flows).isEmpty() ?
|
||||||
.stream()
|
DSL.trueCondition()
|
||||||
.map(flow -> DSL.and(
|
:
|
||||||
field("namespace").eq(flow.getNamespace()),
|
DSL.or(
|
||||||
field("flow_id").eq(flow.getId())
|
flows.stream()
|
||||||
))
|
.map(flow -> DSL.and(
|
||||||
.toList()
|
field("namespace").eq(flow.getNamespace()),
|
||||||
));
|
field("flow_id").eq(flow.getId())
|
||||||
|
))
|
||||||
|
.toList()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
Table<Record2<Object, Integer>> cte = subquery.asTable("cte");
|
Table<Record2<Object, Integer>> cte = subquery.asTable("cte");
|
||||||
|
|
||||||
|
|||||||
@@ -20,6 +20,12 @@ public class AbstractJdbcExecutionRunningStorage extends AbstractJdbcRepository
|
|||||||
this.jdbcRepository = jdbcRepository;
|
this.jdbcRepository = jdbcRepository;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void save(ExecutionRunning executionRunning) {
|
||||||
|
jdbcRepository.getDslContextWrapper().transaction(
|
||||||
|
configuration -> save(DSL.using(configuration), executionRunning)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
public void save(DSLContext dslContext, ExecutionRunning executionRunning) {
|
public void save(DSLContext dslContext, ExecutionRunning executionRunning) {
|
||||||
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(executionRunning);
|
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(executionRunning);
|
||||||
this.jdbcRepository.persist(executionRunning, dslContext, fields);
|
this.jdbcRepository.persist(executionRunning, dslContext, fields);
|
||||||
|
|||||||
@@ -72,6 +72,8 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
|||||||
private static final ObjectMapper MAPPER = JdbcMapper.of();
|
private static final ObjectMapper MAPPER = JdbcMapper.of();
|
||||||
|
|
||||||
private final ScheduledExecutorService scheduledDelay = Executors.newSingleThreadScheduledExecutor();
|
private final ScheduledExecutorService scheduledDelay = Executors.newSingleThreadScheduledExecutor();
|
||||||
|
private ScheduledFuture<?> executionDelayFuture;
|
||||||
|
private ScheduledFuture<?> monitorSLAFuture;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
private AbstractJdbcExecutionRepository executionRepository;
|
private AbstractJdbcExecutionRepository executionRepository;
|
||||||
@@ -312,14 +314,14 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
|||||||
this.receiveCancellations.addFirst(this.executionRunningQueue.receive(Executor.class, this::executionRunningQueue));
|
this.receiveCancellations.addFirst(this.executionRunningQueue.receive(Executor.class, this::executionRunningQueue));
|
||||||
this.clusterEventQueue.ifPresent(clusterEventQueueInterface -> this.receiveCancellations.addFirst(clusterEventQueueInterface.receive(this::clusterEventQueue)));
|
this.clusterEventQueue.ifPresent(clusterEventQueueInterface -> this.receiveCancellations.addFirst(clusterEventQueueInterface.receive(this::clusterEventQueue)));
|
||||||
|
|
||||||
ScheduledFuture<?> scheduledDelayFuture = scheduledDelay.scheduleAtFixedRate(
|
executionDelayFuture = scheduledDelay.scheduleAtFixedRate(
|
||||||
this::executionDelaySend,
|
this::executionDelaySend,
|
||||||
0,
|
0,
|
||||||
1,
|
1,
|
||||||
TimeUnit.SECONDS
|
TimeUnit.SECONDS
|
||||||
);
|
);
|
||||||
|
|
||||||
ScheduledFuture<?> scheduledSLAMonitorFuture = scheduledDelay.scheduleAtFixedRate(
|
monitorSLAFuture = scheduledDelay.scheduleAtFixedRate(
|
||||||
this::executionSLAMonitor,
|
this::executionSLAMonitor,
|
||||||
0,
|
0,
|
||||||
1,
|
1,
|
||||||
@@ -329,11 +331,13 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
|||||||
// look at exceptions on the scheduledDelay thread
|
// look at exceptions on the scheduledDelay thread
|
||||||
Thread.ofVirtual().name("jdbc-delay-exception-watcher").start(
|
Thread.ofVirtual().name("jdbc-delay-exception-watcher").start(
|
||||||
() -> {
|
() -> {
|
||||||
Await.until(scheduledDelayFuture::isDone);
|
Await.until(executionDelayFuture::isDone);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
scheduledDelayFuture.get();
|
executionDelayFuture.get();
|
||||||
} catch (ExecutionException | InterruptedException | CancellationException e) {
|
} catch (CancellationException ignored) {
|
||||||
|
|
||||||
|
} catch (ExecutionException | InterruptedException e) {
|
||||||
if (e.getCause() != null && e.getCause().getClass() != CannotCreateTransactionException.class) {
|
if (e.getCause() != null && e.getCause().getClass() != CannotCreateTransactionException.class) {
|
||||||
log.error("Executor fatal exception in the scheduledDelay thread", e);
|
log.error("Executor fatal exception in the scheduledDelay thread", e);
|
||||||
close();
|
close();
|
||||||
@@ -346,11 +350,13 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
|||||||
// look at exceptions on the scheduledSLAMonitorFuture thread
|
// look at exceptions on the scheduledSLAMonitorFuture thread
|
||||||
Thread.ofVirtual().name("jdbc-sla-monitor-exception-watcher").start(
|
Thread.ofVirtual().name("jdbc-sla-monitor-exception-watcher").start(
|
||||||
() -> {
|
() -> {
|
||||||
Await.until(scheduledSLAMonitorFuture::isDone);
|
Await.until(monitorSLAFuture::isDone);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
scheduledSLAMonitorFuture.get();
|
monitorSLAFuture.get();
|
||||||
} catch (ExecutionException | InterruptedException | CancellationException e) {
|
} catch (CancellationException ignored) {
|
||||||
|
|
||||||
|
} catch (ExecutionException | InterruptedException e) {
|
||||||
if (e.getCause() != null && e.getCause().getClass() != CannotCreateTransactionException.class) {
|
if (e.getCause() != null && e.getCause().getClass() != CannotCreateTransactionException.class) {
|
||||||
log.error("Executor fatal exception in the scheduledSLAMonitor thread", e);
|
log.error("Executor fatal exception in the scheduledSLAMonitor thread", e);
|
||||||
close();
|
close();
|
||||||
@@ -546,7 +552,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// create an SLA monitor if needed
|
// create an SLA monitor if needed
|
||||||
if (execution.getState().getCurrent() == State.Type.CREATED && !ListUtils.isEmpty(flow.getSla())) {
|
if ((execution.getState().getCurrent() == State.Type.CREATED || execution.getState().failedThenRestarted()) && !ListUtils.isEmpty(flow.getSla())) {
|
||||||
List<SLAMonitor> monitors = flow.getSla().stream()
|
List<SLAMonitor> monitors = flow.getSla().stream()
|
||||||
.filter(ExecutionMonitoringSLA.class::isInstance)
|
.filter(ExecutionMonitoringSLA.class::isInstance)
|
||||||
.map(ExecutionMonitoringSLA.class::cast)
|
.map(ExecutionMonitoringSLA.class::cast)
|
||||||
@@ -562,7 +568,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
|||||||
|
|
||||||
// handle concurrency limit, we need to use a different queue to be sure that execution running
|
// handle concurrency limit, we need to use a different queue to be sure that execution running
|
||||||
// are processed sequentially so inside a queue with no parallelism
|
// are processed sequentially so inside a queue with no parallelism
|
||||||
if (execution.getState().getCurrent() == State.Type.CREATED && flow.getConcurrency() != null) {
|
if ((execution.getState().getCurrent() == State.Type.CREATED || execution.getState().failedThenRestarted()) && flow.getConcurrency() != null) {
|
||||||
ExecutionRunning executionRunning = ExecutionRunning.builder()
|
ExecutionRunning executionRunning = ExecutionRunning.builder()
|
||||||
.tenantId(executor.getFlow().getTenantId())
|
.tenantId(executor.getFlow().getTenantId())
|
||||||
.namespace(executor.getFlow().getNamespace())
|
.namespace(executor.getFlow().getNamespace())
|
||||||
@@ -983,28 +989,28 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ExecutionRunning executionRunning = either.getLeft();
|
ExecutionRunning executionRunning = either.getLeft();
|
||||||
FlowInterface flow = flowMetaStore.findByExecution(executionRunning.getExecution()).orElseThrow();
|
// we need to update the execution after applying concurrency limit so we use the lock for that
|
||||||
ExecutionRunning processed = executionRunningStorage.countThenProcess(flow, (dslContext, count) -> {
|
Executor executor = executionRepository.lock(executionRunning.getExecution().getId(), pair -> {
|
||||||
ExecutionRunning computed = executorService.processExecutionRunning(flow, count, executionRunning);
|
Execution execution = pair.getLeft();
|
||||||
if (computed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.RUNNING && !computed.getExecution().getState().isTerminated()) {
|
Executor newExecutor = new Executor(execution, null);
|
||||||
executionRunningStorage.save(dslContext, computed);
|
FlowInterface flow = flowMetaStore.findByExecution(execution).orElseThrow();
|
||||||
} else if (computed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.QUEUED) {
|
ExecutionRunning processed = executionRunningStorage.countThenProcess(flow, (dslContext, count) -> {
|
||||||
executionQueuedStorage.save(dslContext, ExecutionQueued.fromExecutionRunning(computed));
|
ExecutionRunning computed = executorService.processExecutionRunning(flow, count, executionRunning.withExecution(execution)); // be sure that the execution running contains the latest value of the execution
|
||||||
}
|
if (computed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.RUNNING && !computed.getExecution().getState().isTerminated()) {
|
||||||
return computed;
|
executionRunningStorage.save(dslContext, computed);
|
||||||
});
|
} else if (computed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.QUEUED) {
|
||||||
|
executionQueuedStorage.save(dslContext, ExecutionQueued.fromExecutionRunning(computed));
|
||||||
|
}
|
||||||
|
return computed;
|
||||||
|
});
|
||||||
|
|
||||||
try {
|
return Pair.of(
|
||||||
executionQueue.emit(processed.getExecution());
|
newExecutor.withExecution(processed.getExecution(), "handleExecutionRunning"),
|
||||||
} catch (QueueException e) {
|
pair.getRight()
|
||||||
try {
|
|
||||||
this.executionQueue.emit(
|
|
||||||
processed.getExecution().failedExecutionFromExecutor(e).getExecution().withState(State.Type.FAILED)
|
|
||||||
);
|
);
|
||||||
} catch (QueueException ex) {
|
});
|
||||||
log.error("Unable to emit the execution {}", processed.getExecution().getId(), ex);
|
|
||||||
}
|
toExecution(executor);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private Executor killingOrAfterKillState(final String executionId, Optional<State.Type> afterKillState) {
|
private Executor killingOrAfterKillState(final String executionId, Optional<State.Type> afterKillState) {
|
||||||
@@ -1065,7 +1071,11 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
|||||||
executorService.log(log, false, executor);
|
executorService.log(log, false, executor);
|
||||||
}
|
}
|
||||||
|
|
||||||
// the terminated state can only come from the execution queue, in this case we always have a flow in the executor
|
// the terminated state can come from the execution queue, in this case we always have a flow in the executor
|
||||||
|
// or from a worker task in an afterExecution block, in this case we need to load the flow
|
||||||
|
if (executor.getFlow() == null && executor.getExecution().getState().isTerminated()) {
|
||||||
|
executor = executor.withFlow(findFlow(executor.getExecution()));
|
||||||
|
}
|
||||||
boolean isTerminated = executor.getFlow() != null && executionService.isTerminated(executor.getFlow(), executor.getExecution());
|
boolean isTerminated = executor.getFlow() != null && executionService.isTerminated(executor.getFlow(), executor.getExecution());
|
||||||
|
|
||||||
// purge the executionQueue
|
// purge the executionQueue
|
||||||
@@ -1121,8 +1131,16 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
|||||||
executor.getFlow().getId(),
|
executor.getFlow().getId(),
|
||||||
throwConsumer(queued -> {
|
throwConsumer(queued -> {
|
||||||
var newExecution = queued.withState(State.Type.RUNNING);
|
var newExecution = queued.withState(State.Type.RUNNING);
|
||||||
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
|
ExecutionRunning executionRunning = ExecutionRunning.builder()
|
||||||
|
.tenantId(newExecution.getTenantId())
|
||||||
|
.namespace(newExecution.getNamespace())
|
||||||
|
.flowId(newExecution.getFlowId())
|
||||||
|
.execution(newExecution)
|
||||||
|
.concurrencyState(ExecutionRunning.ConcurrencyState.RUNNING)
|
||||||
|
.build();
|
||||||
|
executionRunningStorage.save(executionRunning);
|
||||||
executionQueue.emit(newExecution);
|
executionQueue.emit(newExecution);
|
||||||
|
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -1207,13 +1225,13 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
|||||||
try {
|
try {
|
||||||
// Handle paused tasks
|
// Handle paused tasks
|
||||||
if (executionDelay.getDelayType().equals(ExecutionDelay.DelayType.RESUME_FLOW) && !pair.getLeft().getState().isTerminated()) {
|
if (executionDelay.getDelayType().equals(ExecutionDelay.DelayType.RESUME_FLOW) && !pair.getLeft().getState().isTerminated()) {
|
||||||
FlowInterface flow = flowMetaStore.findByExecution(pair.getLeft()).orElseThrow();
|
|
||||||
if (executionDelay.getTaskRunId() == null) {
|
if (executionDelay.getTaskRunId() == null) {
|
||||||
// if taskRunId is null, this means we restart a flow that was delayed at startup (scheduled on)
|
// if taskRunId is null, this means we restart a flow that was delayed at startup (scheduled on)
|
||||||
Execution markAsExecution = pair.getKey().withState(executionDelay.getState());
|
Execution markAsExecution = pair.getKey().withState(executionDelay.getState());
|
||||||
executor = executor.withExecution(markAsExecution, "pausedRestart");
|
executor = executor.withExecution(markAsExecution, "pausedRestart");
|
||||||
} else {
|
} else {
|
||||||
// if there is a taskRun it means we restart a paused task
|
// if there is a taskRun it means we restart a paused task
|
||||||
|
FlowInterface flow = flowMetaStore.findByExecution(pair.getLeft()).orElseThrow();
|
||||||
Execution markAsExecution = executionService.markAs(
|
Execution markAsExecution = executionService.markAs(
|
||||||
pair.getKey(),
|
pair.getKey(),
|
||||||
flow,
|
flow,
|
||||||
@@ -1362,13 +1380,11 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
|||||||
private Executor handleFailedExecutionFromExecutor(Executor executor, Exception e) {
|
private Executor handleFailedExecutionFromExecutor(Executor executor, Exception e) {
|
||||||
Execution.FailedExecutionWithLog failedExecutionWithLog = executor.getExecution().failedExecutionFromExecutor(e);
|
Execution.FailedExecutionWithLog failedExecutionWithLog = executor.getExecution().failedExecutionFromExecutor(e);
|
||||||
|
|
||||||
failedExecutionWithLog.getLogs().forEach(log -> {
|
try {
|
||||||
try {
|
logQueue.emitAsync(failedExecutionWithLog.getLogs());
|
||||||
logQueue.emitAsync(log);
|
} catch (QueueException ex) {
|
||||||
} catch (QueueException ex) {
|
// fail silently
|
||||||
// fail silently
|
}
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
return executor.withExecution(failedExecutionWithLog.getExecution(), "exception");
|
return executor.withExecution(failedExecutionWithLog.getExecution(), "exception");
|
||||||
}
|
}
|
||||||
@@ -1386,7 +1402,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
|||||||
|
|
||||||
setState(ServiceState.TERMINATING);
|
setState(ServiceState.TERMINATING);
|
||||||
this.receiveCancellations.forEach(Runnable::run);
|
this.receiveCancellations.forEach(Runnable::run);
|
||||||
scheduledDelay.shutdown();
|
ExecutorsUtils.closeScheduledThreadPool(scheduledDelay, Duration.ofSeconds(5), List.of(executionDelayFuture, monitorSLAFuture));
|
||||||
setState(ServiceState.TERMINATED_GRACEFULLY);
|
setState(ServiceState.TERMINATED_GRACEFULLY);
|
||||||
|
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
|
|||||||
@@ -7,16 +7,13 @@ import com.google.common.collect.Iterables;
|
|||||||
import io.kestra.core.exceptions.DeserializationException;
|
import io.kestra.core.exceptions.DeserializationException;
|
||||||
import io.kestra.core.metrics.MetricRegistry;
|
import io.kestra.core.metrics.MetricRegistry;
|
||||||
import io.kestra.core.models.executions.Execution;
|
import io.kestra.core.models.executions.Execution;
|
||||||
import io.kestra.core.queues.QueueException;
|
import io.kestra.core.queues.*;
|
||||||
import io.kestra.core.queues.QueueInterface;
|
|
||||||
import io.kestra.core.queues.QueueService;
|
|
||||||
import io.kestra.core.utils.Either;
|
import io.kestra.core.utils.Either;
|
||||||
import io.kestra.core.utils.ExecutorsUtils;
|
import io.kestra.core.utils.ExecutorsUtils;
|
||||||
import io.kestra.core.utils.IdUtils;
|
import io.kestra.core.utils.IdUtils;
|
||||||
import io.kestra.jdbc.JdbcTableConfigs;
|
import io.kestra.jdbc.JdbcTableConfigs;
|
||||||
import io.kestra.jdbc.JdbcMapper;
|
import io.kestra.jdbc.JdbcMapper;
|
||||||
import io.kestra.jdbc.JooqDSLContextWrapper;
|
import io.kestra.jdbc.JooqDSLContextWrapper;
|
||||||
import io.kestra.core.queues.MessageTooBigException;
|
|
||||||
import io.kestra.jdbc.repository.AbstractJdbcRepository;
|
import io.kestra.jdbc.repository.AbstractJdbcRepository;
|
||||||
import io.micrometer.core.instrument.Counter;
|
import io.micrometer.core.instrument.Counter;
|
||||||
import io.micrometer.core.instrument.Timer;
|
import io.micrometer.core.instrument.Timer;
|
||||||
@@ -42,6 +39,7 @@ import java.util.function.BiConsumer;
|
|||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||||
import static io.kestra.core.utils.Rethrow.throwRunnable;
|
import static io.kestra.core.utils.Rethrow.throwRunnable;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@@ -151,6 +149,11 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
|
|||||||
.execute();
|
.execute();
|
||||||
});
|
});
|
||||||
} catch (DataException e) { // The exception is from the data itself, not the database/network/driver so instead of fail fast, we throw a recoverable QueueException
|
} catch (DataException e) { // The exception is from the data itself, not the database/network/driver so instead of fail fast, we throw a recoverable QueueException
|
||||||
|
// Postgres refuses to store JSONB with the '\0000' codepoint as it has no textual representation.
|
||||||
|
// We try to detect that and fail with a specific exception so the Worker can recover from it.
|
||||||
|
if (e.getMessage() != null && e.getMessage().contains("ERROR: unsupported Unicode escape sequence")) {
|
||||||
|
throw new UnsupportedMessageException(e.getMessage(), e);
|
||||||
|
}
|
||||||
throw new QueueException("Unable to emit a message to the queue", e);
|
throw new QueueException("Unable to emit a message to the queue", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -171,8 +174,8 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void emitAsync(String consumerGroup, T message) throws QueueException {
|
public void emitAsync(String consumerGroup, List<T> messages) throws QueueException {
|
||||||
this.asyncPoolExecutor.submit(throwRunnable(() -> this.emit(consumerGroup, message)));
|
this.asyncPoolExecutor.submit(throwRunnable(() -> messages.forEach(throwConsumer(message -> this.emit(consumerGroup, message)))));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ dependencies {
|
|||||||
// as Jackson is in the Micronaut BOM, to force its version we need to use enforcedPlatform but it didn't really work, see later :(
|
// as Jackson is in the Micronaut BOM, to force its version we need to use enforcedPlatform but it didn't really work, see later :(
|
||||||
api enforcedPlatform("com.fasterxml.jackson:jackson-bom:$jacksonVersion")
|
api enforcedPlatform("com.fasterxml.jackson:jackson-bom:$jacksonVersion")
|
||||||
api enforcedPlatform("org.slf4j:slf4j-api:$slf4jVersion")
|
api enforcedPlatform("org.slf4j:slf4j-api:$slf4jVersion")
|
||||||
api platform("io.micronaut.platform:micronaut-platform:4.8.2")
|
api platform("io.micronaut.platform:micronaut-platform:4.9.2")
|
||||||
api platform("io.qameta.allure:allure-bom:2.29.1")
|
api platform("io.qameta.allure:allure-bom:2.29.1")
|
||||||
// we define cloud bom here for GCP, Azure and AWS so they are aligned for all plugins that use them (secret, storage, oss and ee plugins)
|
// we define cloud bom here for GCP, Azure and AWS so they are aligned for all plugins that use them (secret, storage, oss and ee plugins)
|
||||||
api platform('com.google.cloud:libraries-bom:26.64.0')
|
api platform('com.google.cloud:libraries-bom:26.64.0')
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package io.kestra.core.junit.extensions;
|
|||||||
|
|
||||||
import io.kestra.core.junit.annotations.KestraTest;
|
import io.kestra.core.junit.annotations.KestraTest;
|
||||||
import io.kestra.core.runners.StandAloneRunner;
|
import io.kestra.core.runners.StandAloneRunner;
|
||||||
|
import io.kestra.core.utils.TestsUtils;
|
||||||
import io.micronaut.test.annotation.MicronautTestValue;
|
import io.micronaut.test.annotation.MicronautTestValue;
|
||||||
import io.micronaut.test.extensions.junit5.MicronautJunit5Extension;
|
import io.micronaut.test.extensions.junit5.MicronautJunit5Extension;
|
||||||
import org.junit.jupiter.api.extension.ExtensionContext;
|
import org.junit.jupiter.api.extension.ExtensionContext;
|
||||||
@@ -54,4 +55,11 @@ public class KestraTestExtension extends MicronautJunit5Extension {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterTestExecution(ExtensionContext context) throws Exception {
|
||||||
|
super.afterTestExecution(context);
|
||||||
|
|
||||||
|
TestsUtils.queueConsumersCleanup();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -38,6 +38,10 @@ public abstract class AbstractTaskRunnerTest {
|
|||||||
@Test
|
@Test
|
||||||
protected void run() throws Exception {
|
protected void run() throws Exception {
|
||||||
var runContext = runContext(this.runContextFactory);
|
var runContext = runContext(this.runContextFactory);
|
||||||
|
simpleRun(runContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void simpleRun(RunContext runContext) throws Exception {
|
||||||
var commands = initScriptCommands(runContext);
|
var commands = initScriptCommands(runContext);
|
||||||
Mockito.when(commands.getCommands()).thenReturn(
|
Mockito.when(commands.getCommands()).thenReturn(
|
||||||
Property.ofValue(ScriptService.scriptCommands(List.of("/bin/sh", "-c"), Collections.emptyList(), List.of("echo 'Hello World'")))
|
Property.ofValue(ScriptService.scriptCommands(List.of("/bin/sh", "-c"), Collections.emptyList(), List.of("echo 'Hello World'")))
|
||||||
@@ -166,6 +170,13 @@ public abstract class AbstractTaskRunnerTest {
|
|||||||
assertThat(taskException.getLogConsumer().getOutputs().get("logOutput")).isEqualTo("Hello World");
|
assertThat(taskException.getLogConsumer().getOutputs().get("logOutput")).isEqualTo("Hello World");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
protected void canWorkMultipleTimeInSameWdir() throws Exception {
|
||||||
|
var runContext = runContext(this.runContextFactory);
|
||||||
|
simpleRun(runContext);
|
||||||
|
simpleRun(runContext);
|
||||||
|
}
|
||||||
|
|
||||||
protected RunContext runContext(RunContextFactory runContextFactory) {
|
protected RunContext runContext(RunContextFactory runContextFactory) {
|
||||||
return this.runContext(runContextFactory, null);
|
return this.runContext(runContextFactory, null);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -41,8 +41,15 @@ import java.util.stream.Collectors;
|
|||||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||||
|
|
||||||
abstract public class TestsUtils {
|
abstract public class TestsUtils {
|
||||||
|
private static final ThreadLocal<List<Runnable>> queueConsumersCancellations = ThreadLocal.withInitial(ArrayList::new);
|
||||||
|
|
||||||
private static final ObjectMapper mapper = JacksonMapper.ofYaml();
|
private static final ObjectMapper mapper = JacksonMapper.ofYaml();
|
||||||
|
|
||||||
|
public static void queueConsumersCleanup() {
|
||||||
|
queueConsumersCancellations.get().forEach(Runnable::run);
|
||||||
|
queueConsumersCancellations.get().clear();
|
||||||
|
}
|
||||||
|
|
||||||
public static <T> T map(String path, Class<T> cls) throws IOException {
|
public static <T> T map(String path, Class<T> cls) throws IOException {
|
||||||
URL resource = TestsUtils.class.getClassLoader().getResource(path);
|
URL resource = TestsUtils.class.getClassLoader().getResource(path);
|
||||||
assert resource != null;
|
assert resource != null;
|
||||||
@@ -213,6 +220,7 @@ abstract public class TestsUtils {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
Runnable receiveCancellation = queueType == null ? queue.receive(consumerGroup, eitherConsumer, false) : queue.receive(consumerGroup, queueType, eitherConsumer, false);
|
Runnable receiveCancellation = queueType == null ? queue.receive(consumerGroup, eitherConsumer, false) : queue.receive(consumerGroup, queueType, eitherConsumer, false);
|
||||||
|
queueConsumersCancellations.get().add(receiveCancellation);
|
||||||
|
|
||||||
return Flux.<T>create(sink -> {
|
return Flux.<T>create(sink -> {
|
||||||
DeserializationException exception = exceptionRef.get();
|
DeserializationException exception = exceptionRef.get();
|
||||||
|
|||||||
14
ui/package-lock.json
generated
14
ui/package-lock.json
generated
@@ -10,7 +10,7 @@
|
|||||||
"hasInstallScript": true,
|
"hasInstallScript": true,
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@js-joda/core": "^5.6.5",
|
"@js-joda/core": "^5.6.5",
|
||||||
"@kestra-io/ui-libs": "^0.0.228",
|
"@kestra-io/ui-libs": "^0.0.232",
|
||||||
"@vue-flow/background": "^1.3.2",
|
"@vue-flow/background": "^1.3.2",
|
||||||
"@vue-flow/controls": "^1.1.2",
|
"@vue-flow/controls": "^1.1.2",
|
||||||
"@vue-flow/core": "^1.45.0",
|
"@vue-flow/core": "^1.45.0",
|
||||||
@@ -1792,9 +1792,9 @@
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"node_modules/@eslint/plugin-kit": {
|
"node_modules/@eslint/plugin-kit": {
|
||||||
"version": "0.3.3",
|
"version": "0.3.4",
|
||||||
"resolved": "https://registry.npmjs.org/@eslint/plugin-kit/-/plugin-kit-0.3.3.tgz",
|
"resolved": "https://registry.npmjs.org/@eslint/plugin-kit/-/plugin-kit-0.3.4.tgz",
|
||||||
"integrity": "sha512-1+WqvgNMhmlAambTvT3KPtCl/Ibr68VldY2XY40SL1CE0ZXiakFR/cbTspaF5HsnpDMvcYYoJHfl4980NBjGag==",
|
"integrity": "sha512-Ul5l+lHEcw3L5+k8POx6r74mxEYKG5kOb6Xpy2gCRW6zweT6TEhAf8vhxGgjhqrd/VO/Dirhsb+1hNpD1ue9hw==",
|
||||||
"dev": true,
|
"dev": true,
|
||||||
"license": "Apache-2.0",
|
"license": "Apache-2.0",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
@@ -3133,9 +3133,9 @@
|
|||||||
"license": "BSD-3-Clause"
|
"license": "BSD-3-Clause"
|
||||||
},
|
},
|
||||||
"node_modules/@kestra-io/ui-libs": {
|
"node_modules/@kestra-io/ui-libs": {
|
||||||
"version": "0.0.228",
|
"version": "0.0.232",
|
||||||
"resolved": "https://registry.npmjs.org/@kestra-io/ui-libs/-/ui-libs-0.0.228.tgz",
|
"resolved": "https://registry.npmjs.org/@kestra-io/ui-libs/-/ui-libs-0.0.232.tgz",
|
||||||
"integrity": "sha512-ZSUpBEhTJ7Ul0QtMU/ioDlgryoVwZv/BD1ko96q+m9sCA4Uab1yi2LUf+ZpEEzZWH8r37E/CNK6HNjG+tei7eA==",
|
"integrity": "sha512-4Z1DNxWEZSEEy2Tv63uNf2remxb/IqVUY01/qCaeYjLcp5axrS7Dn43N8DspA4EPdlhe4JFq2RhG13Pom+JDQA==",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@nuxtjs/mdc": "^0.16.1",
|
"@nuxtjs/mdc": "^0.16.1",
|
||||||
"@popperjs/core": "^2.11.8",
|
"@popperjs/core": "^2.11.8",
|
||||||
|
|||||||
@@ -24,7 +24,7 @@
|
|||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@js-joda/core": "^5.6.5",
|
"@js-joda/core": "^5.6.5",
|
||||||
"@kestra-io/ui-libs": "^0.0.228",
|
"@kestra-io/ui-libs": "^0.0.232",
|
||||||
"@vue-flow/background": "^1.3.2",
|
"@vue-flow/background": "^1.3.2",
|
||||||
"@vue-flow/controls": "^1.1.2",
|
"@vue-flow/controls": "^1.1.2",
|
||||||
"@vue-flow/core": "^1.45.0",
|
"@vue-flow/core": "^1.45.0",
|
||||||
@@ -149,7 +149,7 @@
|
|||||||
"@popperjs/core": "npm:@sxzz/popperjs-es@^2.11.7"
|
"@popperjs/core": "npm:@sxzz/popperjs-es@^2.11.7"
|
||||||
},
|
},
|
||||||
"el-table-infinite-scroll": {
|
"el-table-infinite-scroll": {
|
||||||
"vue": "$vue"
|
"vue": "^3.5.18"
|
||||||
},
|
},
|
||||||
"storybook": "$storybook"
|
"storybook": "$storybook"
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -25,4 +25,4 @@ const ANIMALS: string[] = [
|
|||||||
const getRandomNumber = (minimum: number = MINIMUM, maximum: number = MAXIMUM): number => Math.floor(Math.random() * (maximum - minimum + 1)) + minimum;
|
const getRandomNumber = (minimum: number = MINIMUM, maximum: number = MAXIMUM): number => Math.floor(Math.random() * (maximum - minimum + 1)) + minimum;
|
||||||
const getRandomAnimal = (): string => ANIMALS[Math.floor(Math.random() * ANIMALS.length)];
|
const getRandomAnimal = (): string => ANIMALS[Math.floor(Math.random() * ANIMALS.length)];
|
||||||
|
|
||||||
export const getRandomFlowID = (): string => `${getRandomAnimal()}_${getRandomNumber()}`.toLowerCase();
|
export const getRandomID = (): string => `${getRandomAnimal()}_${getRandomNumber()}`.toLowerCase();
|
||||||
@@ -18,7 +18,6 @@
|
|||||||
import VueTour from "./components/onboarding/VueTour.vue";
|
import VueTour from "./components/onboarding/VueTour.vue";
|
||||||
import DefaultLayout from "override/components/layout/DefaultLayout.vue";
|
import DefaultLayout from "override/components/layout/DefaultLayout.vue";
|
||||||
import DocIdDisplay from "./components/DocIdDisplay.vue";
|
import DocIdDisplay from "./components/DocIdDisplay.vue";
|
||||||
import posthog from "posthog-js";
|
|
||||||
import "@kestra-io/ui-libs/style.css";
|
import "@kestra-io/ui-libs/style.css";
|
||||||
|
|
||||||
import {useApiStore} from "./stores/api";
|
import {useApiStore} from "./stores/api";
|
||||||
@@ -26,6 +25,7 @@
|
|||||||
import {useLayoutStore} from "./stores/layout";
|
import {useLayoutStore} from "./stores/layout";
|
||||||
import {useCoreStore} from "./stores/core";
|
import {useCoreStore} from "./stores/core";
|
||||||
import {useDocStore} from "./stores/doc";
|
import {useDocStore} from "./stores/doc";
|
||||||
|
import {initPostHogForSetup} from "./composables/usePosthog";
|
||||||
import {useMiscStore} from "./stores/misc";
|
import {useMiscStore} from "./stores/misc";
|
||||||
import {useExecutionsStore} from "./stores/executions";
|
import {useExecutionsStore} from "./stores/executions";
|
||||||
import * as BasicAuth from "./utils/basicAuth";
|
import * as BasicAuth from "./utils/basicAuth";
|
||||||
@@ -118,66 +118,10 @@
|
|||||||
uid: uid,
|
uid: uid,
|
||||||
});
|
});
|
||||||
|
|
||||||
this.apiStore.loadConfig()
|
await initPostHogForSetup(config);
|
||||||
.then(apiConfig => {
|
|
||||||
this.initStats(apiConfig, config, uid);
|
|
||||||
})
|
|
||||||
|
|
||||||
return config;
|
return config;
|
||||||
},
|
},
|
||||||
initStats(apiConfig, config, uid) {
|
|
||||||
if (!this.configs || this.configs["isAnonymousUsageEnabled"] === false) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// only run posthog in production
|
|
||||||
if (import.meta.env.MODE === "production") {
|
|
||||||
posthog.init(
|
|
||||||
apiConfig.posthog.token,
|
|
||||||
{
|
|
||||||
api_host: apiConfig.posthog.apiHost,
|
|
||||||
ui_host: "https://eu.posthog.com",
|
|
||||||
capture_pageview: false,
|
|
||||||
capture_pageleave: true,
|
|
||||||
autocapture: false,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
posthog.register_once(this.statsGlobalData(config, uid));
|
|
||||||
|
|
||||||
if (!posthog.get_property("__alias")) {
|
|
||||||
posthog.alias(apiConfig.id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
let surveyVisible = false;
|
|
||||||
window.addEventListener("PHSurveyShown", () => {
|
|
||||||
surveyVisible = true;
|
|
||||||
});
|
|
||||||
|
|
||||||
window.addEventListener("PHSurveyClosed", () => {
|
|
||||||
surveyVisible = false;
|
|
||||||
})
|
|
||||||
|
|
||||||
window.addEventListener("KestraRouterAfterEach", () => {
|
|
||||||
if (surveyVisible) {
|
|
||||||
window.dispatchEvent(new Event("PHSurveyClosed"))
|
|
||||||
surveyVisible = false;
|
|
||||||
}
|
|
||||||
})
|
|
||||||
},
|
|
||||||
statsGlobalData(config, uid) {
|
|
||||||
return {
|
|
||||||
from: "APP",
|
|
||||||
iid: config.uuid,
|
|
||||||
uid: uid,
|
|
||||||
app: {
|
|
||||||
version: config.version,
|
|
||||||
type: "OSS"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
watch: {
|
watch: {
|
||||||
$route: {
|
$route: {
|
||||||
|
|||||||
@@ -3,34 +3,37 @@
|
|||||||
<el-table-column v-for="(column, index) in generateTableColumns" :key="index" :prop="column" :label="column">
|
<el-table-column v-for="(column, index) in generateTableColumns" :key="index" :prop="column" :label="column">
|
||||||
<template #default="scope">
|
<template #default="scope">
|
||||||
<template v-if="isComplex(scope.row[column])">
|
<template v-if="isComplex(scope.row[column])">
|
||||||
<editor
|
<el-input
|
||||||
:full-height="false"
|
type="textarea"
|
||||||
:input="true"
|
:model-value="truncate(JSON.stringify(scope.row[column], null, 2))"
|
||||||
:navbar="false"
|
readonly
|
||||||
:model-value="JSON.stringify(scope.row[column])"
|
:rows="3"
|
||||||
lang="json"
|
autosize
|
||||||
read-only
|
class="ks-editor"
|
||||||
|
resize="none"
|
||||||
/>
|
/>
|
||||||
</template>
|
</template>
|
||||||
<template v-else>
|
<template v-else>
|
||||||
{{ scope.row[column] }}
|
{{ truncate(scope.row[column]) }}
|
||||||
</template>
|
</template>
|
||||||
</template>
|
</template>
|
||||||
</el-table-column>
|
</el-table-column>
|
||||||
</el-table>
|
</el-table>
|
||||||
</template>
|
</template>
|
||||||
<script>
|
<script>
|
||||||
import Editor from "./inputs/Editor.vue";
|
|
||||||
|
|
||||||
export default {
|
export default {
|
||||||
name: "ListPreview",
|
name: "ListPreview",
|
||||||
components: {Editor},
|
|
||||||
props: {
|
props: {
|
||||||
value: {
|
value: {
|
||||||
type: Array,
|
type: Array,
|
||||||
required: true
|
required: true
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
data() {
|
||||||
|
return {
|
||||||
|
maxColumnLength: 100
|
||||||
|
}
|
||||||
|
},
|
||||||
computed: {
|
computed: {
|
||||||
generateTableColumns() {
|
generateTableColumns() {
|
||||||
const allKeys = new Set();
|
const allKeys = new Set();
|
||||||
@@ -43,6 +46,12 @@
|
|||||||
methods: {
|
methods: {
|
||||||
isComplex(data) {
|
isComplex(data) {
|
||||||
return data instanceof Array || data instanceof Object;
|
return data instanceof Array || data instanceof Object;
|
||||||
|
},
|
||||||
|
truncate(text) {
|
||||||
|
if (typeof text !== "string") return text;
|
||||||
|
return text.length > this.maxColumnLength
|
||||||
|
? text.slice(0, this.maxColumnLength) + "..."
|
||||||
|
: text;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -48,6 +48,7 @@
|
|||||||
v-on="activeTab['v-on'] ?? {}"
|
v-on="activeTab['v-on'] ?? {}"
|
||||||
ref="tabContent"
|
ref="tabContent"
|
||||||
:is="activeTab.component"
|
:is="activeTab.component"
|
||||||
|
:namespace="namespaceToForward"
|
||||||
@go-to-detail="blueprintId => selectedBlueprintId = blueprintId"
|
@go-to-detail="blueprintId => selectedBlueprintId = blueprintId"
|
||||||
:embed="activeTab.props && activeTab.props.embed !== undefined ? activeTab.props.embed : true"
|
:embed="activeTab.props && activeTab.props.embed !== undefined ? activeTab.props.embed : true"
|
||||||
/>
|
/>
|
||||||
@@ -163,16 +164,11 @@
|
|||||||
},
|
},
|
||||||
getTabClasses(tab) {
|
getTabClasses(tab) {
|
||||||
const isEnterpriseTab = tab.locked;
|
const isEnterpriseTab = tab.locked;
|
||||||
const isGanttTab = tab.name === "gantt";
|
|
||||||
const ROUTES = ["/flows/edit/", "/namespaces/edit/"];
|
|
||||||
const EDIT_ROUTES = ROUTES.some(route => this.$route.path.startsWith(route));
|
|
||||||
const isOverviewTab = EDIT_ROUTES && tab.title === "Overview";
|
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"container": !isEnterpriseTab && !isOverviewTab,
|
"container": !isEnterpriseTab,
|
||||||
"mt-4": !isEnterpriseTab && !isOverviewTab,
|
"mt-4": !isEnterpriseTab,
|
||||||
"px-0": isEnterpriseTab && isOverviewTab,
|
"px-0": isEnterpriseTab,
|
||||||
"gantt-container": isGanttTab
|
|
||||||
};
|
};
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@@ -209,6 +205,11 @@
|
|||||||
Object.entries(this.$attrs)
|
Object.entries(this.$attrs)
|
||||||
.filter(([key]) => key !== "class")
|
.filter(([key]) => key !== "class")
|
||||||
);
|
);
|
||||||
|
},
|
||||||
|
namespaceToForward(){
|
||||||
|
return this.activeTab.props?.namespace ?? this.namespace;
|
||||||
|
// in the special case of Namespace creation on Namespaces page, the tabs are loaded before the namespace creation
|
||||||
|
// in this case this.props.namespace will be used
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|||||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user