mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 11:12:12 -05:00
Compare commits
119 Commits
fix/remove
...
v0.24.5
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0a81f67981 | ||
|
|
c4757ed915 | ||
|
|
4577070e32 | ||
|
|
7bd519ddb4 | ||
|
|
62c85078b6 | ||
|
|
3718be9658 | ||
|
|
2df6c1b730 | ||
|
|
9af86ea677 | ||
|
|
8601905994 | ||
|
|
9de1a15d02 | ||
|
|
25b056ebb3 | ||
|
|
87d8f9867f | ||
|
|
a00c1f8397 | ||
|
|
f4470095ff | ||
|
|
cbfaa8815d | ||
|
|
10e55bbb77 | ||
|
|
59d5d4cb91 | ||
|
|
e8ee3b0a84 | ||
|
|
602ff849e3 | ||
|
|
155bdca83f | ||
|
|
faaaeada3a | ||
|
|
6ef35974d7 | ||
|
|
46f9bb768f | ||
|
|
ab87f63e8c | ||
|
|
cdb73ccbd7 | ||
|
|
8fc936e0a3 | ||
|
|
1e0ebc94b8 | ||
|
|
5318592eff | ||
|
|
2da08f160d | ||
|
|
8cbc9e7aff | ||
|
|
f8e15d103f | ||
|
|
49794a4f2a | ||
|
|
bafa5fe03c | ||
|
|
208b244f0f | ||
|
|
b93976091d | ||
|
|
eec52d76f0 | ||
|
|
b96fd87572 | ||
|
|
1aa5bfab43 | ||
|
|
c4572e86a5 | ||
|
|
f2f97bb70c | ||
|
|
804c740d3c | ||
|
|
75cd4f44e0 | ||
|
|
f167a2a2bb | ||
|
|
08d9416e3a | ||
|
|
2a879c617c | ||
|
|
3227ca7c11 | ||
|
|
428a52ce02 | ||
|
|
f58bc4caba | ||
|
|
e99ae9513f | ||
|
|
c8b51fcacf | ||
|
|
813b2f6439 | ||
|
|
c6b5bca25b | ||
|
|
de35d2cdb9 | ||
|
|
a6ffbd59d0 | ||
|
|
568740a214 | ||
|
|
aa0d2c545f | ||
|
|
cda77d5146 | ||
|
|
d4fd1f61ba | ||
|
|
9859ea5eb6 | ||
|
|
aca374a28f | ||
|
|
c413ba95e1 | ||
|
|
9c6b92619e | ||
|
|
8173e8df51 | ||
|
|
5c95505911 | ||
|
|
33f0b533bb | ||
|
|
23e35a7f97 | ||
|
|
0357321c58 | ||
|
|
5c08403398 | ||
|
|
a63cb71218 | ||
|
|
317885b91c | ||
|
|
87637302e4 | ||
|
|
056faaaf9f | ||
|
|
54c74a1328 | ||
|
|
fae0c88c5e | ||
|
|
db5d83d1cb | ||
|
|
066b947762 | ||
|
|
b6597475b1 | ||
|
|
f2610baf15 | ||
|
|
b619bf76d8 | ||
|
|
117f453a77 | ||
|
|
053d6276ff | ||
|
|
3870eca70b | ||
|
|
afd7c216f9 | ||
|
|
59a17e88e7 | ||
|
|
99f8dca1c2 | ||
|
|
1068c9fe51 | ||
|
|
ea6d30df7c | ||
|
|
04ba7363c2 | ||
|
|
281a987944 | ||
|
|
c9ce54b0be | ||
|
|
ccd9baef3c | ||
|
|
97869b9c75 | ||
|
|
1c681c1492 | ||
|
|
de2a446f93 | ||
|
|
d778947017 | ||
|
|
3f97845fdd | ||
|
|
631cd169a1 | ||
|
|
1648fa076c | ||
|
|
474806882e | ||
|
|
65467bd118 | ||
|
|
387bbb80ac | ||
|
|
19d4c64f19 | ||
|
|
809c0a228c | ||
|
|
6a045900fb | ||
|
|
4ada5fe8f3 | ||
|
|
998087ca30 | ||
|
|
146338e48f | ||
|
|
de177b925e | ||
|
|
04bfb19095 | ||
|
|
c913c48785 | ||
|
|
0d5b593d42 | ||
|
|
83f92535c5 | ||
|
|
fd6a0a6c11 | ||
|
|
104c4c97b4 | ||
|
|
21cd21269f | ||
|
|
679befa2fe | ||
|
|
8a0ecdeb8a | ||
|
|
ee8762e138 | ||
|
|
d16324f265 |
4
.github/workflows/auto-translate-ui-keys.yml
vendored
4
.github/workflows/auto-translate-ui-keys.yml
vendored
@@ -2,7 +2,7 @@ name: Auto-Translate UI keys and create PR
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: "0 9-21 * * *" # Every hour from 9 AM to 9 PM
|
||||
- cron: "0 9-21/3 * * *" # Every 3 hours from 9 AM to 9 PM
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
retranslate_modified_keys:
|
||||
@@ -20,7 +20,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 10
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
name: Checkout
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
2
.github/workflows/codeql-analysis.yml
vendored
2
.github/workflows/codeql-analysis.yml
vendored
@@ -27,7 +27,7 @@ jobs:
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
# We must fetch at least the immediate parents so that if this is
|
||||
# a pull request then we can checkout the head.
|
||||
|
||||
147
.github/workflows/docker.yml
vendored
147
.github/workflows/docker.yml
vendored
@@ -1,147 +0,0 @@
|
||||
name: Create Docker images on Release
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
retag-latest:
|
||||
description: 'Retag latest Docker images'
|
||||
required: true
|
||||
type: string
|
||||
default: "false"
|
||||
options:
|
||||
- "true"
|
||||
- "false"
|
||||
release-tag:
|
||||
description: 'Kestra Release Tag'
|
||||
required: false
|
||||
type: string
|
||||
plugin-version:
|
||||
description: 'Plugin version'
|
||||
required: false
|
||||
type: string
|
||||
default: "LATEST"
|
||||
env:
|
||||
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
jobs:
|
||||
plugins:
|
||||
name: List Plugins
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
plugins: ${{ steps.plugins.outputs.plugins }}
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
# Get Plugins List
|
||||
- name: Get Plugins List
|
||||
uses: ./.github/actions/plugins-list
|
||||
id: plugins
|
||||
with:
|
||||
plugin-version: ${{ env.PLUGIN_VERSION }}
|
||||
docker:
|
||||
name: Publish Docker
|
||||
needs: [ plugins ]
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
image:
|
||||
- name: "-no-plugins"
|
||||
plugins: ""
|
||||
packages: jattach
|
||||
python-libs: ""
|
||||
- name: ""
|
||||
plugins: ${{needs.plugins.outputs.plugins}}
|
||||
packages: python3 python-is-python3 python3-pip curl jattach
|
||||
python-libs: kestra
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
# Vars
|
||||
- name: Set image name
|
||||
id: vars
|
||||
run: |
|
||||
if [[ "${{ inputs.release-tag }}" == "" ]]; then
|
||||
TAG=${GITHUB_REF#refs/*/}
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
else
|
||||
TAG="${{ inputs.release-tag }}"
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
if [[ "${{ env.PLUGIN_VERSION }}" == *"-SNAPSHOT" ]]; then
|
||||
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ ${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT;
|
||||
else
|
||||
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
# Download release
|
||||
- name: Download release
|
||||
uses: robinraju/release-downloader@v1.12
|
||||
with:
|
||||
tag: ${{steps.vars.outputs.tag}}
|
||||
fileName: 'kestra-*'
|
||||
out-file-path: build/executable
|
||||
|
||||
- name: Copy exe to image
|
||||
run: |
|
||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||
|
||||
# Docker setup
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Docker - Fix Qemu
|
||||
shell: bash
|
||||
run: |
|
||||
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
# Docker Login
|
||||
- name: Login to DockerHub
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
username: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
|
||||
# Docker Build and push
|
||||
- name: Push to Docker Hub
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
context: .
|
||||
push: true
|
||||
tags: ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }}
|
||||
platforms: linux/amd64,linux/arm64
|
||||
build-args: |
|
||||
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }}
|
||||
APT_PACKAGES=${{ matrix.image.packages }}
|
||||
PYTHON_LIBRARIES=${{ matrix.image.python-libs }}
|
||||
|
||||
- name: Install regctl
|
||||
if: github.event.inputs.retag-latest == 'true'
|
||||
uses: regclient/actions/regctl-installer@main
|
||||
|
||||
- name: Retag to latest
|
||||
if: github.event.inputs.retag-latest == 'true'
|
||||
run: |
|
||||
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
|
||||
|
||||
end:
|
||||
runs-on: ubuntu-latest
|
||||
needs:
|
||||
- docker
|
||||
if: always()
|
||||
env:
|
||||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
steps:
|
||||
|
||||
# Slack
|
||||
- name: Slack notification
|
||||
uses: Gamesight/slack-workflow-status@master
|
||||
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
|
||||
with:
|
||||
repo_token: ${{ secrets.GITHUB_TOKEN }}
|
||||
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
name: GitHub Actions
|
||||
icon_emoji: ':github-actions:'
|
||||
channel: 'C02DQ1A7JLR' # _int_git channel
|
||||
4
.github/workflows/e2e.yml
vendored
4
.github/workflows/e2e.yml
vendored
@@ -19,7 +19,7 @@ on:
|
||||
default: "no input"
|
||||
jobs:
|
||||
check:
|
||||
timeout-minutes: 10
|
||||
timeout-minutes: 15
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
|
||||
@@ -32,7 +32,7 @@ jobs:
|
||||
password: ${{ github.token }}
|
||||
|
||||
- name: Checkout kestra
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
path: kestra
|
||||
|
||||
|
||||
4
.github/workflows/gradle-release-plugins.yml
vendored
4
.github/workflows/gradle-release-plugins.yml
vendored
@@ -21,12 +21,12 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
|
||||
4
.github/workflows/gradle-release.yml
vendored
4
.github/workflows/gradle-release.yml
vendored
@@ -33,13 +33,13 @@ jobs:
|
||||
exit 1;
|
||||
fi
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
path: kestra
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
|
||||
10
.github/workflows/main.yml
vendored
10
.github/workflows/main.yml
vendored
@@ -4,9 +4,8 @@ on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
default: 'LATEST'
|
||||
required: true
|
||||
description: "plugins version"
|
||||
required: false
|
||||
type: string
|
||||
push:
|
||||
branches:
|
||||
@@ -34,7 +33,7 @@ jobs:
|
||||
if: "!startsWith(github.ref, 'refs/heads/releases')"
|
||||
uses: ./.github/workflows/workflow-release.yml
|
||||
with:
|
||||
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
plugin-version: ${{ inputs.plugin-version != '' && inputs.plugin-version || (github.ref == 'refs/heads/develop' && 'LATEST-SNAPSHOT' || 'LATEST') }}
|
||||
secrets:
|
||||
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
@@ -43,7 +42,8 @@ jobs:
|
||||
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
|
||||
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
|
||||
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
|
||||
|
||||
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
|
||||
end:
|
||||
runs-on: ubuntu-latest
|
||||
needs:
|
||||
|
||||
2
.github/workflows/setversion-tag-plugins.yml
vendored
2
.github/workflows/setversion-tag-plugins.yml
vendored
@@ -17,7 +17,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
|
||||
9
.github/workflows/setversion-tag.yml
vendored
9
.github/workflows/setversion-tag.yml
vendored
@@ -34,11 +34,14 @@ jobs:
|
||||
fi
|
||||
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
token: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
|
||||
- name: Configure Git
|
||||
# Configure
|
||||
- name: Git - Configure
|
||||
run: |
|
||||
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
|
||||
git config --global user.name "github-actions[bot]"
|
||||
@@ -54,4 +57,4 @@ jobs:
|
||||
git commit -m"chore(version): update to version '$RELEASE_VERSION'"
|
||||
git push
|
||||
git tag -a "v$RELEASE_VERSION" -m"v$RELEASE_VERSION"
|
||||
git push --tags
|
||||
git push --tags
|
||||
|
||||
12
.github/workflows/vulnerabilities-check.yml
vendored
12
.github/workflows/vulnerabilities-check.yml
vendored
@@ -17,12 +17,12 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
@@ -66,12 +66,12 @@ jobs:
|
||||
actions: read
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
@@ -111,12 +111,12 @@ jobs:
|
||||
actions: read
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
|
||||
2
.github/workflows/workflow-backend-test.yml
vendored
2
.github/workflows/workflow-backend-test.yml
vendored
@@ -29,7 +29,7 @@ jobs:
|
||||
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
|
||||
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
name: Checkout - Current ref
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
76
.github/workflows/workflow-build-artifacts.yml
vendored
76
.github/workflows/workflow-build-artifacts.yml
vendored
@@ -1,23 +1,7 @@
|
||||
name: Build Artifacts
|
||||
|
||||
on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
default: 'LATEST'
|
||||
required: true
|
||||
type: string
|
||||
outputs:
|
||||
docker-tag:
|
||||
value: ${{ jobs.build.outputs.docker-tag }}
|
||||
description: "The Docker image Tag for Kestra"
|
||||
docker-artifact-name:
|
||||
value: ${{ jobs.build.outputs.docker-artifact-name }}
|
||||
description: "The GitHub artifact containing the Kestra docker image name."
|
||||
plugins:
|
||||
value: ${{ jobs.build.outputs.plugins }}
|
||||
description: "The Kestra plugins list used for the build."
|
||||
workflow_call: {}
|
||||
|
||||
jobs:
|
||||
build:
|
||||
@@ -31,7 +15,7 @@ jobs:
|
||||
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
steps:
|
||||
- name: Checkout - Current ref
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
@@ -82,55 +66,6 @@ jobs:
|
||||
run: |
|
||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||
|
||||
# Docker Tag
|
||||
- name: Setup - Docker vars
|
||||
id: vars
|
||||
shell: bash
|
||||
run: |
|
||||
TAG=${GITHUB_REF#refs/*/}
|
||||
if [[ $TAG = "master" ]]
|
||||
then
|
||||
TAG="latest";
|
||||
elif [[ $TAG = "develop" ]]
|
||||
then
|
||||
TAG="develop";
|
||||
elif [[ $TAG = v* ]]
|
||||
then
|
||||
TAG="${TAG}";
|
||||
else
|
||||
TAG="build-${{ github.run_id }}";
|
||||
fi
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
echo "artifact=docker-kestra-${TAG}" >> $GITHUB_OUTPUT
|
||||
|
||||
# Docker setup
|
||||
- name: Docker - Setup QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Docker - Fix Qemu
|
||||
shell: bash
|
||||
run: |
|
||||
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
|
||||
|
||||
- name: Docker - Setup Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
# Docker Build
|
||||
- name: Docker - Build & export image
|
||||
uses: docker/build-push-action@v6
|
||||
if: "!startsWith(github.ref, 'refs/tags/v')"
|
||||
with:
|
||||
context: .
|
||||
push: false
|
||||
file: Dockerfile
|
||||
tags: |
|
||||
kestra/kestra:${{ steps.vars.outputs.tag }}
|
||||
build-args: |
|
||||
KESTRA_PLUGINS=${{ steps.plugins.outputs.plugins }}
|
||||
APT_PACKAGES=${{ env.DOCKER_APT_PACKAGES }}
|
||||
PYTHON_LIBRARIES=${{ env.DOCKER_PYTHON_LIBRARIES }}
|
||||
outputs: type=docker,dest=/tmp/${{ steps.vars.outputs.artifact }}.tar
|
||||
|
||||
# Upload artifacts
|
||||
- name: Artifacts - Upload JAR
|
||||
uses: actions/upload-artifact@v4
|
||||
@@ -143,10 +78,3 @@ jobs:
|
||||
with:
|
||||
name: exe
|
||||
path: build/executable/
|
||||
|
||||
- name: Artifacts - Upload Docker
|
||||
uses: actions/upload-artifact@v4
|
||||
if: "!startsWith(github.ref, 'refs/tags/v')"
|
||||
with:
|
||||
name: ${{ steps.vars.outputs.artifact }}
|
||||
path: /tmp/${{ steps.vars.outputs.artifact }}.tar
|
||||
|
||||
2
.github/workflows/workflow-frontend-test.yml
vendored
2
.github/workflows/workflow-frontend-test.yml
vendored
@@ -20,7 +20,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
|
||||
- name: Cache Node Modules
|
||||
id: cache-node-modules
|
||||
|
||||
24
.github/workflows/workflow-github-release.yml
vendored
24
.github/workflows/workflow-github-release.yml
vendored
@@ -1,14 +1,17 @@
|
||||
name: Github - Release
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
workflow_call:
|
||||
secrets:
|
||||
GH_PERSONAL_TOKEN:
|
||||
description: "The Github personal token."
|
||||
required: true
|
||||
push:
|
||||
tags:
|
||||
- '*'
|
||||
SLACK_RELEASES_WEBHOOK_URL:
|
||||
description: "The Slack webhook URL."
|
||||
required: true
|
||||
|
||||
|
||||
|
||||
jobs:
|
||||
publish:
|
||||
@@ -17,14 +20,14 @@ jobs:
|
||||
steps:
|
||||
# Check out
|
||||
- name: Checkout - Repository
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
submodules: true
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- name: Checkout - Actions
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
sparse-checkout-cone-mode: true
|
||||
@@ -35,7 +38,7 @@ jobs:
|
||||
# Download Exec
|
||||
# Must be done after checkout actions
|
||||
- name: Artifacts - Download executable
|
||||
uses: actions/download-artifact@v4
|
||||
uses: actions/download-artifact@v5
|
||||
if: startsWith(github.ref, 'refs/tags/v')
|
||||
with:
|
||||
name: exe
|
||||
@@ -75,4 +78,11 @@ jobs:
|
||||
"new_version": "${{ github.ref_name }}",
|
||||
"github_repository": "${{ github.repository }}",
|
||||
"github_actor": "${{ github.actor }}"
|
||||
}
|
||||
}
|
||||
|
||||
- name: Merge Release Notes
|
||||
if: ${{ startsWith(github.ref, 'refs/tags/v') }}
|
||||
uses: ./actions/.github/actions/github-release-note-merge
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
RELEASE_TAG: ${{ github.ref_name }}
|
||||
|
||||
210
.github/workflows/workflow-publish-docker.yml
vendored
210
.github/workflows/workflow-publish-docker.yml
vendored
@@ -1,22 +1,37 @@
|
||||
name: Publish - Docker
|
||||
name: Create Docker images on Release
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
default: 'LATEST'
|
||||
retag-latest:
|
||||
description: 'Retag latest Docker images'
|
||||
required: true
|
||||
type: choice
|
||||
default: "false"
|
||||
options:
|
||||
- "true"
|
||||
- "false"
|
||||
release-tag:
|
||||
description: 'Kestra Release Tag (by default, deduced with the ref)'
|
||||
required: false
|
||||
type: string
|
||||
plugin-version:
|
||||
description: 'Plugin version'
|
||||
required: false
|
||||
type: string
|
||||
default: "LATEST"
|
||||
force-download-artifact:
|
||||
description: 'Force download artifact'
|
||||
required: false
|
||||
type: string
|
||||
type: choice
|
||||
default: "true"
|
||||
options:
|
||||
- "true"
|
||||
- "false"
|
||||
workflow_call:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
description: "Plugin version"
|
||||
default: 'LATEST'
|
||||
required: false
|
||||
type: string
|
||||
@@ -33,47 +48,93 @@ on:
|
||||
description: "The Dockerhub password."
|
||||
required: true
|
||||
|
||||
env:
|
||||
PLUGIN_VERSION: ${{ inputs.plugin-version != null && inputs.plugin-version || 'LATEST' }}
|
||||
jobs:
|
||||
plugins:
|
||||
name: List Plugins
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
plugins: ${{ steps.plugins.outputs.plugins }}
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v5
|
||||
|
||||
# Get Plugins List
|
||||
- name: Get Plugins List
|
||||
uses: ./.github/actions/plugins-list
|
||||
id: plugins
|
||||
with: # remap LATEST-SNAPSHOT to LATEST
|
||||
plugin-version: ${{ env.PLUGIN_VERSION == 'LATEST-SNAPSHOT' && 'LATEST' || env.PLUGIN_VERSION }}
|
||||
|
||||
# ********************************************************************************************************************
|
||||
# Build
|
||||
# ********************************************************************************************************************
|
||||
build-artifacts:
|
||||
name: Build Artifacts
|
||||
if: ${{ github.event.inputs.force-download-artifact == 'true' }}
|
||||
if: ${{ inputs.force-download-artifact == 'true' }}
|
||||
uses: ./.github/workflows/workflow-build-artifacts.yml
|
||||
with:
|
||||
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
# ********************************************************************************************************************
|
||||
# Docker
|
||||
# ********************************************************************************************************************
|
||||
publish:
|
||||
name: Publish - Docker
|
||||
|
||||
docker:
|
||||
name: Publish Docker
|
||||
needs: [ plugins, build-artifacts ]
|
||||
if: always()
|
||||
runs-on: ubuntu-latest
|
||||
needs: build-artifacts
|
||||
if: |
|
||||
always() &&
|
||||
(needs.build-artifacts.result == 'success' ||
|
||||
github.event.inputs.force-download-artifact != 'true')
|
||||
env:
|
||||
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
strategy:
|
||||
matrix:
|
||||
image:
|
||||
- tag: -no-plugins
|
||||
- name: "-no-plugins"
|
||||
plugins: ""
|
||||
packages: jattach
|
||||
plugins: false
|
||||
python-libraries: ""
|
||||
|
||||
- tag: ""
|
||||
plugins: true
|
||||
packages: python3 python3-venv python-is-python3 python3-pip nodejs npm curl zip unzip jattach
|
||||
python-libraries: kestra
|
||||
python-libs: ""
|
||||
- name: ""
|
||||
plugins: ${{needs.plugins.outputs.plugins}}
|
||||
packages: python3 python-is-python3 python3-pip curl jattach
|
||||
python-libs: kestra
|
||||
steps:
|
||||
- name: Checkout - Current ref
|
||||
uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
|
||||
# Vars
|
||||
- name: Set image name
|
||||
id: vars
|
||||
run: |
|
||||
if [[ "${{ inputs.release-tag }}" == "" ]]; then
|
||||
TAG=${GITHUB_REF#refs/*/}
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
else
|
||||
TAG="${{ inputs.release-tag }}"
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
if [[ $GITHUB_REF == refs/tags/* ]]; then
|
||||
if [[ $TAG =~ ^v[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
|
||||
# this will remove the patch version number
|
||||
MINOR_SEMVER=${TAG%.*}
|
||||
echo "minor_semver=${MINOR_SEMVER}" >> $GITHUB_OUTPUT
|
||||
else
|
||||
echo "Tag '$TAG' is not a valid semver (vMAJOR.MINOR.PATCH), skipping minor_semver"
|
||||
fi
|
||||
fi
|
||||
|
||||
if [[ "${{ env.PLUGIN_VERSION }}" == *"-SNAPSHOT" ]]; then
|
||||
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ ${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT;
|
||||
else
|
||||
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
# Download executable from artifact
|
||||
- name: Artifacts - Download executable
|
||||
uses: actions/download-artifact@v5
|
||||
with:
|
||||
name: exe
|
||||
path: build/executable
|
||||
|
||||
- name: Copy exe to image
|
||||
run: |
|
||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||
|
||||
# Docker setup
|
||||
- name: Docker - Setup QEMU
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Docker - Fix Qemu
|
||||
@@ -81,66 +142,59 @@ jobs:
|
||||
run: |
|
||||
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
|
||||
|
||||
- name: Docker - Setup Docker Buildx
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
# Docker Login
|
||||
- name: Docker - Login to DockerHub
|
||||
- name: Login to DockerHub
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
username: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
|
||||
|
||||
# # Get Plugins List
|
||||
- name: Plugins - Get List
|
||||
uses: ./.github/actions/plugins-list
|
||||
id: plugins-list
|
||||
if: ${{ matrix.image.plugins}}
|
||||
with:
|
||||
plugin-version: ${{ env.PLUGIN_VERSION }}
|
||||
|
||||
# Vars
|
||||
- name: Docker - Set variables
|
||||
shell: bash
|
||||
id: vars
|
||||
run: |
|
||||
TAG=${GITHUB_REF#refs/*/}
|
||||
PLUGINS="${{ matrix.image.plugins == true && steps.plugins-list.outputs.plugins || '' }}"
|
||||
if [[ $TAG == v* ]]; then
|
||||
TAG="${TAG}";
|
||||
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
|
||||
elif [[ $TAG = "develop" ]]; then
|
||||
TAG="develop";
|
||||
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ $PLUGINS" >> $GITHUB_OUTPUT
|
||||
else
|
||||
TAG="build-${{ github.run_id }}";
|
||||
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ $PLUGINS" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
echo "tag=${TAG}${{ matrix.image.tag }}" >> $GITHUB_OUTPUT
|
||||
|
||||
# Build Docker Image
|
||||
- name: Artifacts - Download executable
|
||||
uses: actions/download-artifact@v4
|
||||
with:
|
||||
name: exe
|
||||
path: build/executable
|
||||
|
||||
- name: Docker - Copy exe to image
|
||||
shell: bash
|
||||
run: |
|
||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||
|
||||
# Docker Build and push
|
||||
- name: Docker - Build image
|
||||
- name: Push to Docker Hub
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
context: .
|
||||
push: true
|
||||
tags: kestra/kestra:${{ steps.vars.outputs.tag }}
|
||||
tags: ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }}
|
||||
platforms: linux/amd64,linux/arm64
|
||||
build-args: |
|
||||
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }}
|
||||
APT_PACKAGES=${{ matrix.image.packages }}
|
||||
PYTHON_LIBRARIES=${{ matrix.image.python-libraries }}
|
||||
PYTHON_LIBRARIES=${{ matrix.image.python-libs }}
|
||||
|
||||
- name: Install regctl
|
||||
if: startsWith(github.ref, 'refs/tags/v')
|
||||
uses: regclient/actions/regctl-installer@main
|
||||
|
||||
- name: Retag to minor semver version
|
||||
if: startsWith(github.ref, 'refs/tags/v') && steps.vars.outputs.minor_semver != ''
|
||||
run: |
|
||||
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.minor_semver, matrix.image.name) }}
|
||||
|
||||
- name: Retag to latest
|
||||
if: startsWith(github.ref, 'refs/tags/v') && inputs.retag-latest == 'true'
|
||||
run: |
|
||||
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
|
||||
|
||||
end:
|
||||
runs-on: ubuntu-latest
|
||||
needs:
|
||||
- docker
|
||||
if: always()
|
||||
env:
|
||||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
steps:
|
||||
|
||||
# Slack
|
||||
- name: Slack notification
|
||||
uses: Gamesight/slack-workflow-status@master
|
||||
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
|
||||
with:
|
||||
repo_token: ${{ secrets.GITHUB_TOKEN }}
|
||||
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
name: GitHub Actions
|
||||
icon_emoji: ':github-actions:'
|
||||
channel: 'C02DQ1A7JLR' # _int_git channel
|
||||
2
.github/workflows/workflow-publish-maven.yml
vendored
2
.github/workflows/workflow-publish-maven.yml
vendored
@@ -25,7 +25,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout - Current ref
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
|
||||
# Setup build
|
||||
- name: Setup - Build
|
||||
|
||||
19
.github/workflows/workflow-release.yml
vendored
19
.github/workflows/workflow-release.yml
vendored
@@ -4,7 +4,7 @@ on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
description: "plugins version"
|
||||
default: 'LATEST'
|
||||
required: false
|
||||
type: string
|
||||
@@ -16,7 +16,7 @@ on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
description: "plugins version"
|
||||
default: 'LATEST'
|
||||
required: false
|
||||
type: string
|
||||
@@ -42,21 +42,25 @@ on:
|
||||
SONATYPE_GPG_FILE:
|
||||
description: "The Sonatype GPG file."
|
||||
required: true
|
||||
GH_PERSONAL_TOKEN:
|
||||
description: "GH personnal Token."
|
||||
required: true
|
||||
SLACK_RELEASES_WEBHOOK_URL:
|
||||
description: "Slack webhook for releases channel."
|
||||
required: true
|
||||
jobs:
|
||||
build-artifacts:
|
||||
name: Build - Artifacts
|
||||
uses: ./.github/workflows/workflow-build-artifacts.yml
|
||||
with:
|
||||
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
|
||||
Docker:
|
||||
name: Publish Docker
|
||||
needs: build-artifacts
|
||||
uses: ./.github/workflows/workflow-publish-docker.yml
|
||||
if: startsWith(github.ref, 'refs/heads/develop') || github.event.inputs.publish-docker == 'true'
|
||||
if: github.ref == 'refs/heads/develop' || inputs.publish-docker == 'true'
|
||||
with:
|
||||
force-download-artifact: 'false'
|
||||
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
plugin-version: ${{ inputs.plugin-version != null && inputs.plugin-version || 'LATEST' }}
|
||||
secrets:
|
||||
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
@@ -77,4 +81,5 @@ jobs:
|
||||
if: startsWith(github.ref, 'refs/tags/v')
|
||||
uses: ./.github/workflows/workflow-github-release.yml
|
||||
secrets:
|
||||
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
|
||||
2
.github/workflows/workflow-test.yml
vendored
2
.github/workflows/workflow-test.yml
vendored
@@ -27,7 +27,7 @@ jobs:
|
||||
ui: ${{ steps.changes.outputs.ui }}
|
||||
backend: ${{ steps.changes.outputs.backend }}
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
if: "!startsWith(github.ref, 'refs/tags/v')"
|
||||
- uses: dorny/paths-filter@v3
|
||||
if: "!startsWith(github.ref, 'refs/tags/v')"
|
||||
|
||||
5
.plugins
5
.plugins
@@ -87,13 +87,18 @@
|
||||
#plugin-powerbi:io.kestra.plugin:plugin-powerbi:LATEST
|
||||
#plugin-pulsar:io.kestra.plugin:plugin-pulsar:LATEST
|
||||
#plugin-redis:io.kestra.plugin:plugin-redis:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-bun:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-deno:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-go:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-groovy:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-jbang:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-julia:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-jython:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-lua:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-nashorn:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-node:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-perl:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-php:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-powershell:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-python:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-r:LATEST
|
||||
|
||||
@@ -18,6 +18,10 @@ micronaut:
|
||||
root:
|
||||
paths: classpath:root
|
||||
mapping: /**
|
||||
codec:
|
||||
json:
|
||||
additional-types:
|
||||
- application/scim+json
|
||||
server:
|
||||
max-request-size: 10GB
|
||||
multipart:
|
||||
@@ -78,8 +82,19 @@ micronaut:
|
||||
type: scheduled
|
||||
core-pool-size: 1
|
||||
|
||||
# Disable OpenTelemetry metrics by default, users that need it must enable it and configure the collector URL.
|
||||
metrics:
|
||||
binders:
|
||||
retry:
|
||||
enabled: true
|
||||
netty:
|
||||
queues:
|
||||
enabled: true
|
||||
bytebuf-allocators:
|
||||
enabled: true
|
||||
channels:
|
||||
enabled: true
|
||||
|
||||
# Disable OpenTelemetry metrics by default, users that need it must enable it and configure the collector URL.
|
||||
export:
|
||||
otlp:
|
||||
enabled: false
|
||||
@@ -92,6 +107,8 @@ jackson:
|
||||
serialization-inclusion: non_null
|
||||
deserialization:
|
||||
FAIL_ON_UNKNOWN_PROPERTIES: false
|
||||
mapper:
|
||||
ACCEPT_CASE_INSENSITIVE_ENUMS: true
|
||||
|
||||
endpoints:
|
||||
all:
|
||||
@@ -100,6 +117,10 @@ endpoints:
|
||||
sensitive: false
|
||||
health:
|
||||
details-visible: ANONYMOUS
|
||||
disk-space:
|
||||
enabled: false
|
||||
discovery-client:
|
||||
enabled: false
|
||||
loggers:
|
||||
write-sensitive: false
|
||||
env:
|
||||
@@ -133,12 +154,44 @@ kestra:
|
||||
tutorial-flows:
|
||||
# Automatically loads all tutorial flows at startup.
|
||||
enabled: true
|
||||
|
||||
retries:
|
||||
attempts: 5
|
||||
multiplier: 2.0
|
||||
delay: 1s
|
||||
maxDelay: ""
|
||||
|
||||
server:
|
||||
basic-auth:
|
||||
# These URLs will not be authenticated, by default we open some of the Micronaut default endpoints but not all for security reasons
|
||||
open-urls:
|
||||
- "/ping"
|
||||
- "/api/v1/executions/webhook/"
|
||||
|
||||
preview:
|
||||
initial-rows: 100
|
||||
max-rows: 5000
|
||||
|
||||
# The expected time for this server to complete all its tasks before initiating a graceful shutdown.
|
||||
terminationGracePeriod: 5m
|
||||
workerTaskRestartStrategy: AFTER_TERMINATION_GRACE_PERIOD
|
||||
# Configuration for Liveness and Heartbeat mechanism between servers.
|
||||
liveness:
|
||||
enabled: true
|
||||
# The expected time between liveness probe.
|
||||
interval: 10s
|
||||
# The timeout used to detect service failures.
|
||||
timeout: 1m
|
||||
# The time to wait before executing a liveness probe.
|
||||
initialDelay: 1m
|
||||
# The expected time between service heartbeats.
|
||||
heartbeatInterval: 3s
|
||||
service:
|
||||
purge:
|
||||
initial-delay: 1h
|
||||
fixed-delay: 1d
|
||||
retention: 30d
|
||||
|
||||
jdbc:
|
||||
queues:
|
||||
min-poll-interval: 25ms
|
||||
@@ -150,7 +203,7 @@ kestra:
|
||||
fixed-delay: 1h
|
||||
retention: 7d
|
||||
types:
|
||||
- type : io.kestra.core.models.executions.LogEntry
|
||||
- type: io.kestra.core.models.executions.LogEntry
|
||||
retention: 1h
|
||||
- type: io.kestra.core.models.executions.MetricEntry
|
||||
retention: 1h
|
||||
@@ -182,37 +235,12 @@ kestra:
|
||||
traces:
|
||||
root: DISABLED
|
||||
|
||||
server:
|
||||
basic-auth:
|
||||
# These URLs will not be authenticated, by default we open some of the Micronaut default endpoints but not all for security reasons
|
||||
open-urls:
|
||||
- "/ping"
|
||||
- "/api/v1/executions/webhook/"
|
||||
preview:
|
||||
initial-rows: 100
|
||||
max-rows: 5000
|
||||
# The expected time for this server to complete all its tasks before initiating a graceful shutdown.
|
||||
terminationGracePeriod: 5m
|
||||
workerTaskRestartStrategy: AFTER_TERMINATION_GRACE_PERIOD
|
||||
# Configuration for Liveness and Heartbeat mechanism between servers.
|
||||
liveness:
|
||||
enabled: true
|
||||
# The expected time between liveness probe.
|
||||
interval: 10s
|
||||
# The timeout used to detect service failures.
|
||||
timeout: 1m
|
||||
# The time to wait before executing a liveness probe.
|
||||
initialDelay: 1m
|
||||
# The expected time between service heartbeats.
|
||||
heartbeatInterval: 3s
|
||||
service:
|
||||
purge:
|
||||
initial-delay: 1h
|
||||
fixed-delay: 1d
|
||||
retention: 30d
|
||||
ui-anonymous-usage-report:
|
||||
enabled: true
|
||||
|
||||
anonymous-usage-report:
|
||||
enabled: true
|
||||
uri: https://api.kestra.io/v1/reports/usages
|
||||
uri: https://api.kestra.io/v1/reports/server-events
|
||||
initial-delay: 5m
|
||||
fixed-delay: 1h
|
||||
|
||||
|
||||
@@ -63,6 +63,10 @@ dependencies {
|
||||
exclude group: 'com.fasterxml.jackson.core'
|
||||
}
|
||||
|
||||
// micrometer
|
||||
implementation "io.micronaut.micrometer:micronaut-micrometer-observation"
|
||||
implementation 'io.micrometer:micrometer-java21'
|
||||
|
||||
// test
|
||||
testAnnotationProcessor project(':processor')
|
||||
testImplementation project(':tests')
|
||||
|
||||
@@ -122,12 +122,13 @@ public class JsonSchemaGenerator {
|
||||
if (jsonNode instanceof ObjectNode clazzSchema && clazzSchema.get("required") instanceof ArrayNode requiredPropsNode && clazzSchema.get("properties") instanceof ObjectNode properties) {
|
||||
List<String> requiredFieldValues = StreamSupport.stream(requiredPropsNode.spliterator(), false)
|
||||
.map(JsonNode::asText)
|
||||
.toList();
|
||||
.collect(Collectors.toList());
|
||||
|
||||
properties.fields().forEachRemaining(e -> {
|
||||
int indexInRequiredArray = requiredFieldValues.indexOf(e.getKey());
|
||||
if (indexInRequiredArray != -1 && e.getValue() instanceof ObjectNode valueNode && valueNode.has("default")) {
|
||||
requiredPropsNode.remove(indexInRequiredArray);
|
||||
requiredFieldValues.remove(indexInRequiredArray);
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@@ -6,8 +6,14 @@ import io.kestra.core.http.HttpRequest;
|
||||
import io.kestra.core.http.HttpResponse;
|
||||
import io.kestra.core.http.client.apache.*;
|
||||
import io.kestra.core.http.client.configurations.HttpConfiguration;
|
||||
import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.micrometer.common.KeyValues;
|
||||
import io.micrometer.core.instrument.binder.httpcomponents.hc5.ApacheHttpClientContext;
|
||||
import io.micrometer.core.instrument.binder.httpcomponents.hc5.DefaultApacheHttpClientObservationConvention;
|
||||
import io.micrometer.core.instrument.binder.httpcomponents.hc5.ObservationExecChainHandler;
|
||||
import io.micrometer.observation.ObservationRegistry;
|
||||
import io.micronaut.http.MediaType;
|
||||
import jakarta.annotation.Nullable;
|
||||
import lombok.Builder;
|
||||
@@ -16,6 +22,7 @@ import org.apache.commons.lang3.ArrayUtils;
|
||||
import org.apache.hc.client5.http.ContextBuilder;
|
||||
import org.apache.hc.client5.http.auth.*;
|
||||
import org.apache.hc.client5.http.config.ConnectionConfig;
|
||||
import org.apache.hc.client5.http.impl.ChainElement;
|
||||
import org.apache.hc.client5.http.impl.DefaultAuthenticationStrategy;
|
||||
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
|
||||
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
|
||||
@@ -50,11 +57,16 @@ public class HttpClient implements Closeable {
|
||||
private transient CloseableHttpClient client;
|
||||
private final RunContext runContext;
|
||||
private final HttpConfiguration configuration;
|
||||
private ObservationRegistry observationRegistry;
|
||||
|
||||
@Builder
|
||||
public HttpClient(RunContext runContext, @Nullable HttpConfiguration configuration) throws IllegalVariableEvaluationException {
|
||||
this.runContext = runContext;
|
||||
this.configuration = configuration == null ? HttpConfiguration.builder().build() : configuration;
|
||||
if (runContext instanceof DefaultRunContext defaultRunContext) {
|
||||
this.observationRegistry = defaultRunContext.getApplicationContext().findBean(ObservationRegistry.class).orElse(null);
|
||||
}
|
||||
|
||||
this.client = this.createClient();
|
||||
}
|
||||
|
||||
@@ -67,6 +79,13 @@ public class HttpClient implements Closeable {
|
||||
.disableDefaultUserAgent()
|
||||
.setUserAgent("Kestra");
|
||||
|
||||
if (observationRegistry != null) {
|
||||
// micrometer, must be placed before the retry strategy (see https://docs.micrometer.io/micrometer/reference/reference/httpcomponents.html#_retry_strategy_considerations)
|
||||
builder.addExecInterceptorAfter(ChainElement.RETRY.name(), "micrometer",
|
||||
new ObservationExecChainHandler(observationRegistry, new CustomApacheHttpClientObservationConvention())
|
||||
);
|
||||
}
|
||||
|
||||
// logger
|
||||
if (this.configuration.getLogs() != null && this.configuration.getLogs().length > 0) {
|
||||
if (ArrayUtils.contains(this.configuration.getLogs(), HttpConfiguration.LoggingType.REQUEST_HEADERS) ||
|
||||
@@ -297,4 +316,14 @@ public class HttpClient implements Closeable {
|
||||
this.client.close();
|
||||
}
|
||||
}
|
||||
|
||||
public static class CustomApacheHttpClientObservationConvention extends DefaultApacheHttpClientObservationConvention {
|
||||
@Override
|
||||
public KeyValues getLowCardinalityKeyValues(ApacheHttpClientContext context) {
|
||||
return KeyValues.concat(
|
||||
super.getLowCardinalityKeyValues(context),
|
||||
KeyValues.of("type", "core-client")
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,34 @@
|
||||
package io.kestra.core.metrics;
|
||||
|
||||
import io.micrometer.core.instrument.binder.jvm.JvmThreadDeadlockMetrics;
|
||||
import io.micrometer.java21.instrument.binder.jdk.VirtualThreadMetrics;
|
||||
import io.micronaut.configuration.metrics.annotation.RequiresMetrics;
|
||||
import io.micronaut.context.annotation.Bean;
|
||||
import io.micronaut.context.annotation.Factory;
|
||||
import io.micronaut.context.annotation.Primary;
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import static io.micronaut.configuration.metrics.micrometer.MeterRegistryFactory.MICRONAUT_METRICS_BINDERS;
|
||||
import static io.micronaut.core.util.StringUtils.FALSE;
|
||||
|
||||
@Factory
|
||||
@RequiresMetrics
|
||||
|
||||
public class MeterRegistryBinderFactory {
|
||||
@Bean
|
||||
@Primary
|
||||
@Singleton
|
||||
@Requires(property = MICRONAUT_METRICS_BINDERS + ".jvm.enabled", notEquals = FALSE)
|
||||
public VirtualThreadMetrics virtualThreadMetrics() {
|
||||
return new VirtualThreadMetrics();
|
||||
}
|
||||
|
||||
@Bean
|
||||
@Primary
|
||||
@Singleton
|
||||
@Requires(property = MICRONAUT_METRICS_BINDERS + ".jvm.enabled", notEquals = FALSE)
|
||||
public JvmThreadDeadlockMetrics threadDeadlockMetricsMetrics() {
|
||||
return new JvmThreadDeadlockMetrics();
|
||||
}
|
||||
}
|
||||
@@ -1040,6 +1040,16 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find all children of this {@link TaskRun}.
|
||||
*/
|
||||
public List<TaskRun> findChildren(TaskRun parentTaskRun) {
|
||||
return taskRunList.stream()
|
||||
.filter(taskRun -> parentTaskRun.getId().equals(taskRun.getParentTaskRunId()))
|
||||
.toList();
|
||||
}
|
||||
|
||||
|
||||
public List<String> findParentsValues(TaskRun taskRun, boolean withCurrent) {
|
||||
return (withCurrent ?
|
||||
Stream.concat(findParents(taskRun).stream(), Stream.of(taskRun)) :
|
||||
|
||||
@@ -116,7 +116,7 @@ public class State {
|
||||
}
|
||||
|
||||
public Instant maxDate() {
|
||||
if (this.histories.size() == 0) {
|
||||
if (this.histories.isEmpty()) {
|
||||
return Instant.now();
|
||||
}
|
||||
|
||||
@@ -124,7 +124,7 @@ public class State {
|
||||
}
|
||||
|
||||
public Instant minDate() {
|
||||
if (this.histories.size() == 0) {
|
||||
if (this.histories.isEmpty()) {
|
||||
return Instant.now();
|
||||
}
|
||||
|
||||
@@ -173,6 +173,11 @@ public class State {
|
||||
return this.current.isBreakpoint();
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public boolean isQueued() {
|
||||
return this.current.isQueued();
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public boolean isRetrying() {
|
||||
return this.current.isRetrying();
|
||||
@@ -206,6 +211,14 @@ public class State {
|
||||
return this.histories.get(this.histories.size() - 2).state.isPaused();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true if the execution has failed, then was restarted.
|
||||
* This is to disambiguate between a RESTARTED after PAUSED and RESTARTED after FAILED state.
|
||||
*/
|
||||
public boolean failedThenRestarted() {
|
||||
return this.current == Type.RESTARTED && this.histories.get(this.histories.size() - 2).state.isFailed();
|
||||
}
|
||||
|
||||
@Introspected
|
||||
public enum Type {
|
||||
CREATED,
|
||||
@@ -264,6 +277,10 @@ public class State {
|
||||
return this == Type.KILLED;
|
||||
}
|
||||
|
||||
public boolean isQueued(){
|
||||
return this == Type.QUEUED;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return states that are terminal to an execution
|
||||
*/
|
||||
|
||||
@@ -68,6 +68,19 @@ public class Property<T> {
|
||||
String getExpression() {
|
||||
return expression;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new {@link Property} with no cached rendered value,
|
||||
* so that the next render will evaluate its original Pebble expression.
|
||||
* <p>
|
||||
* The returned property will still cache its rendered result.
|
||||
* To re-evaluate on a subsequent render, call {@code skipCache()} again.
|
||||
*
|
||||
* @return a new {@link Property} without a pre-rendered value
|
||||
*/
|
||||
public Property<T> skipCache() {
|
||||
return Property.ofExpression(expression);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a new Property object with a value already set.<br>
|
||||
|
||||
@@ -5,6 +5,7 @@ import io.kestra.core.models.Pauseable;
|
||||
import io.kestra.core.utils.Either;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.List;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public interface QueueInterface<T> extends Closeable, Pauseable {
|
||||
@@ -18,7 +19,15 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
|
||||
emitAsync(null, message);
|
||||
}
|
||||
|
||||
void emitAsync(String consumerGroup, T message) throws QueueException;
|
||||
default void emitAsync(String consumerGroup, T message) throws QueueException {
|
||||
emitAsync(consumerGroup, List.of(message));
|
||||
}
|
||||
|
||||
default void emitAsync(List<T> messages) throws QueueException {
|
||||
emitAsync(null, messages);
|
||||
}
|
||||
|
||||
void emitAsync(String consumerGroup, List<T> messages) throws QueueException;
|
||||
|
||||
default void delete(T message) throws QueueException {
|
||||
delete(null, message);
|
||||
@@ -27,7 +36,7 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
|
||||
void delete(String consumerGroup, T message) throws QueueException;
|
||||
|
||||
default Runnable receive(Consumer<Either<T, DeserializationException>> consumer) {
|
||||
return receive((String) null, consumer);
|
||||
return receive(null, consumer, false);
|
||||
}
|
||||
|
||||
default Runnable receive(String consumerGroup, Consumer<Either<T, DeserializationException>> consumer) {
|
||||
|
||||
@@ -0,0 +1,12 @@
|
||||
package io.kestra.core.queues;
|
||||
|
||||
import java.io.Serial;
|
||||
|
||||
public class UnsupportedMessageException extends QueueException {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public UnsupportedMessageException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
||||
@@ -161,7 +161,7 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
|
||||
}
|
||||
|
||||
List<Execution> lastExecutions(
|
||||
@Nullable String tenantId,
|
||||
String tenantId,
|
||||
@Nullable List<FlowFilter> flows
|
||||
);
|
||||
}
|
||||
|
||||
@@ -86,7 +86,7 @@ public class Executor {
|
||||
|
||||
public Boolean canBeProcessed() {
|
||||
return !(this.getException() != null || this.getFlow() == null || this.getFlow() instanceof FlowWithException || this.getFlow().getTasks() == null ||
|
||||
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint());
|
||||
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint() || this.getExecution().getState().isQueued());
|
||||
}
|
||||
|
||||
public Executor withFlow(FlowWithSource flow) {
|
||||
|
||||
@@ -237,9 +237,9 @@ public class ExecutorService {
|
||||
try {
|
||||
state = flowableParent.resolveState(runContext, execution, parentTaskRun);
|
||||
} catch (Exception e) {
|
||||
// This will lead to the next task being still executed but at least Kestra will not crash.
|
||||
// This will lead to the next task being still executed, but at least Kestra will not crash.
|
||||
// This is the best we can do, Flowable task should not fail, so it's a kind of panic mode.
|
||||
runContext.logger().error("Unable to resolve state from the Flowable task: " + e.getMessage(), e);
|
||||
runContext.logger().error("Unable to resolve state from the Flowable task: {}", e.getMessage(), e);
|
||||
state = Optional.of(State.Type.FAILED);
|
||||
}
|
||||
Optional<WorkerTaskResult> endedTask = childWorkerTaskTypeToWorkerTask(
|
||||
@@ -589,6 +589,23 @@ public class ExecutorService {
|
||||
list = list.stream().filter(workerTaskResult -> !workerTaskResult.getTaskRun().getId().equals(taskRun.getParentTaskRunId()))
|
||||
.collect(Collectors.toCollection(ArrayList::new));
|
||||
}
|
||||
|
||||
// If the task is a flowable and its terminated, check that all children are terminated.
|
||||
// This may not be the case for parallel flowable tasks like Parallel, Dag, ForEach...
|
||||
// After a fail task, some child flowable may not be correctly terminated.
|
||||
if (task instanceof FlowableTask<?> && taskRun.getState().isTerminated()) {
|
||||
List<TaskRun> updated = executor.getExecution().findChildren(taskRun).stream()
|
||||
.filter(child -> !child.getState().isTerminated())
|
||||
.map(throwFunction(child -> child.withState(taskRun.getState().getCurrent())))
|
||||
.toList();
|
||||
if (!updated.isEmpty()) {
|
||||
Execution execution = executor.getExecution();
|
||||
for (TaskRun child : updated) {
|
||||
execution = execution.withTaskRun(child);
|
||||
}
|
||||
executor = executor.withExecution(execution, "handledTerminatedFlowableTasks");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
metricRegistry
|
||||
|
||||
@@ -29,7 +29,7 @@ import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
||||
private static final int MAX_MESSAGE_LENGTH = 1024 * 10;
|
||||
private static final int MAX_MESSAGE_LENGTH = 1024 * 15;
|
||||
public static final String ORIGINAL_TIMESTAMP_KEY = "originalTimestamp";
|
||||
|
||||
private final String loggerName;
|
||||
@@ -80,7 +80,6 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
||||
}
|
||||
|
||||
List<LogEntry> result = new ArrayList<>();
|
||||
long i = 0;
|
||||
for (String s : split) {
|
||||
result.add(LogEntry.builder()
|
||||
.namespace(logEntry.getNamespace())
|
||||
@@ -98,7 +97,6 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
||||
.thread(event.getThreadName())
|
||||
.build()
|
||||
);
|
||||
i++;
|
||||
}
|
||||
|
||||
return result;
|
||||
@@ -331,14 +329,11 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
||||
protected void append(ILoggingEvent e) {
|
||||
e = this.transform(e);
|
||||
|
||||
logEntries(e, logEntry)
|
||||
.forEach(l -> {
|
||||
try {
|
||||
logQueue.emitAsync(l);
|
||||
} catch (QueueException ex) {
|
||||
log.warn("Unable to emit logQueue", ex);
|
||||
}
|
||||
});
|
||||
try {
|
||||
logQueue.emitAsync(logEntries(e, logEntry));
|
||||
} catch (QueueException ex) {
|
||||
log.warn("Unable to emit logQueue", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,15 +4,11 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import jakarta.validation.ConstraintViolation;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import jakarta.validation.Validator;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
|
||||
@@ -27,12 +23,19 @@ public class RunContextProperty<T> {
|
||||
private final RunContext runContext;
|
||||
private final Task task;
|
||||
private final AbstractTrigger trigger;
|
||||
|
||||
private final boolean skipCache;
|
||||
|
||||
RunContextProperty(Property<T> property, RunContext runContext) {
|
||||
this(property, runContext, false);
|
||||
}
|
||||
|
||||
RunContextProperty(Property<T> property, RunContext runContext, boolean skipCache) {
|
||||
this.property = property;
|
||||
this.runContext = runContext;
|
||||
this.task = ((DefaultRunContext) runContext).getTask();
|
||||
this.trigger = ((DefaultRunContext) runContext).getTrigger();
|
||||
this.skipCache = skipCache;
|
||||
}
|
||||
|
||||
private void validate() {
|
||||
@@ -45,6 +48,19 @@ public class RunContextProperty<T> {
|
||||
log.trace("Unable to do validation: no task or trigger found");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new {@link RunContextProperty} that will always be rendered by evaluating
|
||||
* its original Pebble expression, without using any previously cached value.
|
||||
* <p>
|
||||
* This ensures that each time the property is rendered, the underlying
|
||||
* expression is re-evaluated to produce a fresh result.
|
||||
*
|
||||
* @return a new {@link Property} that bypasses the cache
|
||||
*/
|
||||
public RunContextProperty<T> skipCache() {
|
||||
return new RunContextProperty<>(this.property, this.runContext, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Render a property then convert it to its target type and validate it.<br>
|
||||
@@ -55,13 +71,13 @@ public class RunContextProperty<T> {
|
||||
* Warning, due to the caching mechanism, this method is not thread-safe.
|
||||
*/
|
||||
public Optional<T> as(Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.as(prop, this.runContext, clazz)));
|
||||
|
||||
validate();
|
||||
return as;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Render a property with additional variables, then convert it to its target type and validate it.<br>
|
||||
*
|
||||
@@ -71,7 +87,7 @@ public class RunContextProperty<T> {
|
||||
* Warning, due to the caching mechanism, this method is not thread-safe.
|
||||
*/
|
||||
public Optional<T> as(Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.as(prop, this.runContext, clazz, variables)));
|
||||
|
||||
validate();
|
||||
@@ -89,7 +105,7 @@ public class RunContextProperty<T> {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public <I> T asList(Class<I> itemClazz) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz)))
|
||||
.orElse((T) Collections.emptyList());
|
||||
|
||||
@@ -108,7 +124,7 @@ public class RunContextProperty<T> {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public <I> T asList(Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz, variables)))
|
||||
.orElse((T) Collections.emptyList());
|
||||
|
||||
@@ -127,7 +143,7 @@ public class RunContextProperty<T> {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass)))
|
||||
.orElse((T) Collections.emptyMap());
|
||||
|
||||
@@ -146,11 +162,15 @@ public class RunContextProperty<T> {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass, variables)))
|
||||
.orElse((T) Collections.emptyMap());
|
||||
|
||||
validate();
|
||||
return as;
|
||||
}
|
||||
|
||||
private Property<T> getProperty() {
|
||||
return skipCache ? this.property.skipCache() : this.property;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -508,14 +508,11 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
Execution execution = workerTrigger.getTrigger().isFailOnTriggerError() ? TriggerService.generateExecution(workerTrigger.getTrigger(), workerTrigger.getConditionContext(), workerTrigger.getTriggerContext(), (Output) null)
|
||||
.withState(FAILED) : null;
|
||||
if (execution != null) {
|
||||
RunContextLogger.logEntries(Execution.loggingEventFromException(e), LogEntry.of(execution))
|
||||
.forEach(log -> {
|
||||
try {
|
||||
logQueue.emitAsync(log);
|
||||
} catch (QueueException ex) {
|
||||
// fail silently
|
||||
}
|
||||
});
|
||||
try {
|
||||
logQueue.emitAsync(RunContextLogger.logEntries(Execution.loggingEventFromException(e), LogEntry.of(execution)));
|
||||
} catch (QueueException ex) {
|
||||
// fail silently
|
||||
}
|
||||
}
|
||||
this.workerTriggerResultQueue.emit(
|
||||
WorkerTriggerResult.builder()
|
||||
@@ -764,6 +761,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
workerTask = workerTask.withTaskRun(workerTask.getTaskRun().withState(state));
|
||||
|
||||
WorkerTaskResult workerTaskResult = new WorkerTaskResult(workerTask.getTaskRun(), dynamicTaskRuns);
|
||||
|
||||
this.workerTaskResultQueue.emit(workerTaskResult);
|
||||
|
||||
// upload the cache file, hash may not be present if we didn't succeed in computing it
|
||||
@@ -796,6 +794,10 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
// If it's a message too big, we remove the outputs
|
||||
failed = failed.withOutputs(Variables.empty());
|
||||
}
|
||||
if (e instanceof UnsupportedMessageException) {
|
||||
// we expect the offending char is in the output so we remove it
|
||||
failed = failed.withOutputs(Variables.empty());
|
||||
}
|
||||
WorkerTaskResult workerTaskResult = new WorkerTaskResult(failed);
|
||||
RunContextLogger contextLogger = runContextLoggerFactory.create(workerTask);
|
||||
contextLogger.logger().error("Unable to emit the worker task result to the queue: {}", e.getMessage(), e);
|
||||
@@ -818,7 +820,11 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
private Optional<String> hashTask(RunContext runContext, Task task) {
|
||||
try {
|
||||
var map = JacksonMapper.toMap(task);
|
||||
var rMap = runContext.render(map);
|
||||
// If there are task provided variables, rendering the task may fail.
|
||||
// The best we can do is to add a fake 'workingDir' as it's an often added variables,
|
||||
// and it should not be part of the task hash.
|
||||
Map<String, Object> variables = Map.of("workingDir", "workingDir");
|
||||
var rMap = runContext.render(map, variables);
|
||||
var json = JacksonMapper.ofJson().writeValueAsBytes(rMap);
|
||||
MessageDigest digest = MessageDigest.getInstance("SHA-256");
|
||||
digest.update(json);
|
||||
|
||||
@@ -8,6 +8,7 @@ import io.kestra.core.events.CrudEventType;
|
||||
import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
@@ -29,10 +30,7 @@ import io.kestra.core.server.Service;
|
||||
import io.kestra.core.server.ServiceStateChangeEvent;
|
||||
import io.kestra.core.server.ServiceType;
|
||||
import io.kestra.core.services.*;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.Either;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.*;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.event.ApplicationEventPublisher;
|
||||
import io.micronaut.core.util.CollectionUtils;
|
||||
@@ -91,7 +89,9 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
private volatile Boolean isReady = false;
|
||||
|
||||
private final ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||
private ScheduledFuture<?> scheduledFuture;
|
||||
private final ScheduledExecutorService executionMonitorExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||
private ScheduledFuture<?> executionMonitorFuture;
|
||||
|
||||
@Getter
|
||||
protected SchedulerTriggerStateInterface triggerState;
|
||||
@@ -152,7 +152,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
this.flowListeners.run();
|
||||
this.flowListeners.listen(this::initializedTriggers);
|
||||
|
||||
ScheduledFuture<?> evaluationLoop = scheduleExecutor.scheduleAtFixedRate(
|
||||
scheduledFuture = scheduleExecutor.scheduleAtFixedRate(
|
||||
this::handle,
|
||||
0,
|
||||
1,
|
||||
@@ -162,10 +162,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
// look at exception on the evaluation loop thread
|
||||
Thread.ofVirtual().name("scheduler-evaluation-loop-watch").start(
|
||||
() -> {
|
||||
Await.until(evaluationLoop::isDone);
|
||||
Await.until(scheduledFuture::isDone);
|
||||
|
||||
try {
|
||||
evaluationLoop.get();
|
||||
scheduledFuture.get();
|
||||
} catch (CancellationException ignored) {
|
||||
|
||||
} catch (ExecutionException | InterruptedException e) {
|
||||
@@ -177,7 +177,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
);
|
||||
|
||||
// Periodically report metrics and logs of running executions
|
||||
ScheduledFuture<?> monitoringLoop = executionMonitorExecutor.scheduleWithFixedDelay(
|
||||
executionMonitorFuture = executionMonitorExecutor.scheduleWithFixedDelay(
|
||||
this::executionMonitor,
|
||||
30,
|
||||
10,
|
||||
@@ -187,10 +187,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
// look at exception on the monitoring loop thread
|
||||
Thread.ofVirtual().name("scheduler-monitoring-loop-watch").start(
|
||||
() -> {
|
||||
Await.until(monitoringLoop::isDone);
|
||||
Await.until(executionMonitorFuture::isDone);
|
||||
|
||||
try {
|
||||
monitoringLoop.get();
|
||||
executionMonitorFuture.get();
|
||||
} catch (CancellationException ignored) {
|
||||
|
||||
} catch (ExecutionException | InterruptedException e) {
|
||||
@@ -318,7 +318,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
}
|
||||
|
||||
synchronized (this) { // we need a sync block as we read then update so we should not do it in multiple threads concurrently
|
||||
List<Trigger> triggers = triggerState.findAllForAllTenants();
|
||||
Map<String, Trigger> triggers = triggerState.findAllForAllTenants().stream().collect(Collectors.toMap(HasUID::uid, Function.identity()));
|
||||
|
||||
flows
|
||||
.stream()
|
||||
@@ -328,7 +328,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
.flatMap(flow -> flow.getTriggers().stream().filter(trigger -> trigger instanceof WorkerTriggerInterface).map(trigger -> new FlowAndTrigger(flow, trigger)))
|
||||
.distinct()
|
||||
.forEach(flowAndTrigger -> {
|
||||
Optional<Trigger> trigger = triggers.stream().filter(t -> t.uid().equals(Trigger.uid(flowAndTrigger.flow(), flowAndTrigger.trigger()))).findFirst(); // must have one or none
|
||||
String triggerUid = Trigger.uid(flowAndTrigger.flow(), flowAndTrigger.trigger());
|
||||
Optional<Trigger> trigger = Optional.ofNullable(triggers.get(triggerUid));
|
||||
if (trigger.isEmpty()) {
|
||||
RunContext runContext = runContextFactory.of(flowAndTrigger.flow(), flowAndTrigger.trigger());
|
||||
ConditionContext conditionContext = conditionService.conditionContext(runContext, flowAndTrigger.flow(), null);
|
||||
@@ -467,9 +468,12 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
|
||||
private List<FlowWithTriggers> computeSchedulable(List<FlowWithSource> flows, List<Trigger> triggerContextsToEvaluate, ScheduleContextInterface scheduleContext) {
|
||||
List<String> flowToKeep = triggerContextsToEvaluate.stream().map(Trigger::getFlowId).toList();
|
||||
List<String> flowIds = flows.stream().map(FlowId::uidWithoutRevision).toList();
|
||||
Map<String, Trigger> triggerById = triggerContextsToEvaluate.stream().collect(Collectors.toMap(HasUID::uid, Function.identity()));
|
||||
|
||||
// delete trigger which flow has been deleted
|
||||
triggerContextsToEvaluate.stream()
|
||||
.filter(trigger -> !flows.stream().map(FlowId::uidWithoutRevision).toList().contains(FlowId.uid(trigger)))
|
||||
.filter(trigger -> !flowIds.contains(FlowId.uid(trigger)))
|
||||
.forEach(trigger -> {
|
||||
try {
|
||||
this.triggerState.delete(trigger);
|
||||
@@ -491,12 +495,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
.map(abstractTrigger -> {
|
||||
RunContext runContext = runContextFactory.of(flow, abstractTrigger);
|
||||
ConditionContext conditionContext = conditionService.conditionContext(runContext, flow, null);
|
||||
Trigger triggerContext = null;
|
||||
Trigger lastTrigger = triggerContextsToEvaluate
|
||||
.stream()
|
||||
.filter(triggerContextToFind -> triggerContextToFind.uid().equals(Trigger.uid(flow, abstractTrigger)))
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
Trigger triggerContext;
|
||||
Trigger lastTrigger = triggerById.get(Trigger.uid(flow, abstractTrigger));
|
||||
// If a trigger is not found in triggers to evaluate, then we ignore it
|
||||
if (lastTrigger == null) {
|
||||
return null;
|
||||
@@ -1006,8 +1006,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
|
||||
setState(ServiceState.TERMINATING);
|
||||
this.receiveCancellations.forEach(Runnable::run);
|
||||
this.scheduleExecutor.shutdown();
|
||||
this.executionMonitorExecutor.shutdown();
|
||||
ExecutorsUtils.closeScheduledThreadPool(this.scheduleExecutor, Duration.ofSeconds(5), List.of(scheduledFuture));
|
||||
ExecutorsUtils.closeScheduledThreadPool(executionMonitorExecutor, Duration.ofSeconds(5), List.of(executionMonitorFuture));
|
||||
try {
|
||||
if (onClose != null) {
|
||||
onClose.run();
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.core.server;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.kestra.core.utils.ExecutorsUtils;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
@@ -8,9 +9,11 @@ import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
@@ -25,6 +28,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
|
||||
protected final ServerConfig serverConfig;
|
||||
private final AtomicBoolean isStopped = new AtomicBoolean(false);
|
||||
private ScheduledExecutorService scheduledExecutorService;
|
||||
private ScheduledFuture<?> scheduledFuture;
|
||||
private Instant lastScheduledExecution;
|
||||
|
||||
/**
|
||||
@@ -98,7 +102,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
|
||||
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, name));
|
||||
Duration scheduleInterval = getScheduleInterval();
|
||||
log.debug("Scheduling '{}' at fixed rate {}.", name, scheduleInterval);
|
||||
scheduledExecutorService.scheduleAtFixedRate(
|
||||
scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(
|
||||
this,
|
||||
0,
|
||||
scheduleInterval.toSeconds(),
|
||||
@@ -133,20 +137,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
|
||||
@Override
|
||||
public void close() {
|
||||
if (isStopped.compareAndSet(false, true) && scheduledExecutorService != null) {
|
||||
scheduledExecutorService.shutdown();
|
||||
if (scheduledExecutorService.isTerminated()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (!scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS)) {
|
||||
log.debug("Failed to wait for scheduled '{}' task termination. Cause: Timeout", name);
|
||||
}
|
||||
log.debug("Stopped scheduled '{}' task.", name);
|
||||
} catch (InterruptedException e) {
|
||||
scheduledExecutorService.shutdownNow();
|
||||
Thread.currentThread().interrupt();
|
||||
log.debug("Failed to wait for scheduled '{}' task termination. Cause: Interrupted.", name);
|
||||
}
|
||||
ExecutorsUtils.closeScheduledThreadPool(scheduledExecutorService, Duration.ofSeconds(5), List.of(scheduledFuture));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -317,6 +317,32 @@ public class ExecutionService {
|
||||
return revision != null ? newExecution.withFlowRevision(revision) : newExecution;
|
||||
}
|
||||
|
||||
public Execution changeTaskRunState(final Execution execution, Flow flow, String taskRunId, State.Type newState) throws Exception {
|
||||
Execution newExecution = markAs(execution, flow, taskRunId, newState);
|
||||
|
||||
// if the execution was terminated, it could have executed errors/finally/afterExecutions, we must remove them as the execution will be restarted
|
||||
if (execution.getState().isTerminated()) {
|
||||
List<TaskRun> newTaskRuns = newExecution.getTaskRunList();
|
||||
// We need to remove global error tasks and flowable error tasks if any
|
||||
flow
|
||||
.allErrorsWithChildren()
|
||||
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
|
||||
|
||||
// We need to remove global finally tasks and flowable error tasks if any
|
||||
flow
|
||||
.allFinallyWithChildren()
|
||||
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
|
||||
|
||||
// We need to remove afterExecution tasks
|
||||
ListUtils.emptyOnNull(flow.getAfterExecution())
|
||||
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
|
||||
|
||||
return newExecution.withTaskRunList(newTaskRuns);
|
||||
} else {
|
||||
return newExecution;
|
||||
}
|
||||
}
|
||||
|
||||
public Execution markAs(final Execution execution, FlowInterface flow, String taskRunId, State.Type newState) throws Exception {
|
||||
return this.markAs(execution, flow, taskRunId, newState, null, null);
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.FlowWithException;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.flows.GenericFlow;
|
||||
import io.kestra.core.models.tasks.RunnableTask;
|
||||
import io.kestra.core.models.topologies.FlowTopology;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.validations.ModelValidator;
|
||||
@@ -51,7 +52,6 @@ import java.util.stream.StreamSupport;
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class FlowService {
|
||||
|
||||
@Inject
|
||||
Optional<FlowRepositoryInterface> flowRepository;
|
||||
|
||||
@@ -236,6 +236,7 @@ public class FlowService {
|
||||
}
|
||||
|
||||
List<String> warnings = new ArrayList<>(checkValidSubflows(flow, tenantId));
|
||||
|
||||
List<io.kestra.plugin.core.trigger.Flow> flowTriggers = ListUtils.emptyOnNull(flow.getTriggers()).stream()
|
||||
.filter(io.kestra.plugin.core.trigger.Flow.class::isInstance)
|
||||
.map(io.kestra.plugin.core.trigger.Flow.class::cast)
|
||||
@@ -246,6 +247,21 @@ public class FlowService {
|
||||
}
|
||||
});
|
||||
|
||||
// add warning for runnable properties (timeout, workerGroup, taskCache) when used not in a runnable
|
||||
flow.allTasksWithChilds().forEach(task -> {
|
||||
if (!(task instanceof RunnableTask<?>)) {
|
||||
if (task.getTimeout() != null) {
|
||||
warnings.add("The task '" + task.getId() + "' cannot use the 'timeout' property as it's only relevant for runnable tasks.");
|
||||
}
|
||||
if (task.getTaskCache() != null) {
|
||||
warnings.add("The task '" + task.getId() + "' cannot use the 'taskCache' property as it's only relevant for runnable tasks.");
|
||||
}
|
||||
if (task.getWorkerGroup() != null) {
|
||||
warnings.add("The task '" + task.getId() + "' cannot use the 'workerGroup' property as it's only relevant for runnable tasks.");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return warnings;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.core.services;
|
||||
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionKind;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowWithException;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
@@ -10,7 +11,6 @@ import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInte
|
||||
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
@@ -24,14 +24,15 @@ import java.util.stream.Stream;
|
||||
|
||||
@Singleton
|
||||
public class FlowTriggerService {
|
||||
@Inject
|
||||
private ConditionService conditionService;
|
||||
private final ConditionService conditionService;
|
||||
private final RunContextFactory runContextFactory;
|
||||
private final FlowService flowService;
|
||||
|
||||
@Inject
|
||||
private RunContextFactory runContextFactory;
|
||||
|
||||
@Inject
|
||||
private FlowService flowService;
|
||||
public FlowTriggerService(ConditionService conditionService, RunContextFactory runContextFactory, FlowService flowService) {
|
||||
this.conditionService = conditionService;
|
||||
this.runContextFactory = runContextFactory;
|
||||
this.flowService = flowService;
|
||||
}
|
||||
|
||||
// used in EE only
|
||||
public Stream<FlowWithFlowTrigger> withFlowTriggersOnly(Stream<FlowWithSource> allFlows) {
|
||||
@@ -53,6 +54,8 @@ public class FlowTriggerService {
|
||||
List<FlowWithFlowTrigger> validTriggersBeforeMultipleConditionEval = allFlows.stream()
|
||||
// prevent recursive flow triggers
|
||||
.filter(flow -> flowService.removeUnwanted(flow, execution))
|
||||
// filter out Test Executions
|
||||
.filter(flow -> execution.getKind() == null)
|
||||
// ensure flow & triggers are enabled
|
||||
.filter(flow -> !flow.isDisabled() && !(flow instanceof FlowWithException))
|
||||
.filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty())
|
||||
|
||||
@@ -173,18 +173,15 @@ public class PluginDefaultService {
|
||||
try {
|
||||
return this.injectAllDefaults(flow, false);
|
||||
} catch (Exception e) {
|
||||
RunContextLogger
|
||||
.logEntries(
|
||||
Execution.loggingEventFromException(e),
|
||||
LogEntry.of(execution)
|
||||
)
|
||||
.forEach(logEntry -> {
|
||||
try {
|
||||
logQueue.emitAsync(logEntry);
|
||||
} catch (QueueException e1) {
|
||||
// silently do nothing
|
||||
}
|
||||
});
|
||||
try {
|
||||
logQueue.emitAsync(RunContextLogger
|
||||
.logEntries(
|
||||
Execution.loggingEventFromException(e),
|
||||
LogEntry.of(execution)
|
||||
));
|
||||
} catch (QueueException e1) {
|
||||
// silently do nothing
|
||||
}
|
||||
return readWithoutDefaultsOrThrow(flow);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,12 +3,16 @@ package io.kestra.core.utils;
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class ExecutorsUtils {
|
||||
@Inject
|
||||
private ThreadMainFactoryBuilder threadFactoryBuilder;
|
||||
@@ -61,6 +65,29 @@ public class ExecutorsUtils {
|
||||
);
|
||||
}
|
||||
|
||||
public static void closeScheduledThreadPool(ScheduledExecutorService scheduledExecutorService, Duration gracePeriod, List<ScheduledFuture<?>> taskFutures) {
|
||||
scheduledExecutorService.shutdown();
|
||||
if (scheduledExecutorService.isTerminated()) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
if (!scheduledExecutorService.awaitTermination(gracePeriod.toMillis(), TimeUnit.MILLISECONDS)) {
|
||||
log.warn("Failed to shutdown the ScheduledThreadPoolExecutor during grace period, forcing it to shutdown now");
|
||||
|
||||
// Ensure the scheduled task reaches a terminal state to avoid possible memory leak
|
||||
ListUtils.emptyOnNull(taskFutures).forEach(taskFuture -> taskFuture.cancel(true));
|
||||
|
||||
scheduledExecutorService.shutdownNow();
|
||||
}
|
||||
log.debug("Stopped scheduled ScheduledThreadPoolExecutor.");
|
||||
} catch (InterruptedException e) {
|
||||
scheduledExecutorService.shutdownNow();
|
||||
Thread.currentThread().interrupt();
|
||||
log.debug("Failed to shutdown the ScheduledThreadPoolExecutor.");
|
||||
}
|
||||
}
|
||||
|
||||
private ExecutorService wrap(String name, ExecutorService executorService) {
|
||||
return ExecutorServiceMetrics.monitor(
|
||||
meterRegistry,
|
||||
|
||||
@@ -54,9 +54,10 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
|
||||
violations.add("Namespace '" + value.getNamespace() + "' does not exist but is required to exist before a flow can be created in it.");
|
||||
}
|
||||
|
||||
List<Task> allTasks = value.allTasksWithChilds();
|
||||
|
||||
// tasks unique id
|
||||
List<String> taskIds = value.allTasksWithChilds()
|
||||
.stream()
|
||||
List<String> taskIds = allTasks.stream()
|
||||
.map(Task::getId)
|
||||
.toList();
|
||||
|
||||
@@ -72,8 +73,8 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
|
||||
violations.add("Duplicate trigger id with name [" + String.join(", ", duplicateIds) + "]");
|
||||
}
|
||||
|
||||
value.allTasksWithChilds()
|
||||
.stream().filter(task -> task instanceof ExecutableTask<?> executableTask
|
||||
allTasks.stream()
|
||||
.filter(task -> task instanceof ExecutableTask<?> executableTask
|
||||
&& value.getId().equals(executableTask.subflowId().flowId())
|
||||
&& value.getNamespace().equals(executableTask.subflowId().namespace()))
|
||||
.forEach(task -> violations.add("Recursive call to flow [" + value.getNamespace() + "." + value.getId() + "]"));
|
||||
@@ -102,7 +103,7 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
|
||||
.map(input -> Pattern.compile("\\{\\{\\s*inputs." + input.getId() + "\\s*\\}\\}"))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
List<String> invalidTasks = value.allTasks()
|
||||
List<String> invalidTasks = allTasks.stream()
|
||||
.filter(task -> checkObjectFieldsWithPatterns(task, inputsWithMinusPatterns))
|
||||
.map(task -> task.getId())
|
||||
.collect(Collectors.toList());
|
||||
@@ -112,12 +113,12 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
|
||||
" [" + String.join(", ", invalidTasks) + "]");
|
||||
}
|
||||
|
||||
List<Pattern> outputsWithMinusPattern = value.allTasks()
|
||||
List<Pattern> outputsWithMinusPattern = allTasks.stream()
|
||||
.filter(output -> Optional.ofNullable(output.getId()).orElse("").contains("-"))
|
||||
.map(output -> Pattern.compile("\\{\\{\\s*outputs\\." + output.getId() + "\\.[^}]+\\s*\\}\\}"))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
invalidTasks = value.allTasks()
|
||||
invalidTasks = allTasks.stream()
|
||||
.filter(task -> checkObjectFieldsWithPatterns(task, outputsWithMinusPattern))
|
||||
.map(task -> task.getId())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
@@ -90,7 +90,7 @@ public class ExecutionOutputs extends Condition implements ScheduleCondition {
|
||||
private static final String OUTPUTS_VAR = "outputs";
|
||||
|
||||
@NotNull
|
||||
private Property<String> expression;
|
||||
private Property<Boolean> expression;
|
||||
|
||||
/** {@inheritDoc} **/
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -105,9 +105,8 @@ public class ExecutionOutputs extends Condition implements ScheduleCondition {
|
||||
conditionContext.getVariables(),
|
||||
Map.of(TRIGGER_VAR, Map.of(OUTPUTS_VAR, conditionContext.getExecution().getOutputs()))
|
||||
);
|
||||
|
||||
String render = conditionContext.getRunContext().render(expression).as(String.class, variables).orElseThrow();
|
||||
return !(render.isBlank() || render.trim().equals("false"));
|
||||
|
||||
return conditionContext.getRunContext().render(expression).skipCache().as(Boolean.class, variables).orElseThrow();
|
||||
}
|
||||
|
||||
private boolean hasNoOutputs(final Execution execution) {
|
||||
|
||||
@@ -19,7 +19,6 @@ import lombok.experimental.SuperBuilder;
|
||||
@NoArgsConstructor
|
||||
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
|
||||
@EqualsAndHashCode
|
||||
//@TriggersDataFilterValidation
|
||||
@Schema(
|
||||
title = "Display Execution data in a dashboard chart.",
|
||||
description = "Execution data can be displayed in charts broken out by Namespace and filtered by State, for example."
|
||||
|
||||
@@ -208,48 +208,50 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
boolean isOutputsAllowed = runContext
|
||||
.<Boolean>pluginConfiguration(PLUGIN_FLOW_OUTPUTS_ENABLED)
|
||||
.orElse(true);
|
||||
|
||||
final Output.OutputBuilder builder = Output.builder()
|
||||
.executionId(execution.getId())
|
||||
.state(execution.getState().getCurrent());
|
||||
|
||||
final Map<String, Object> subflowOutputs = Optional
|
||||
.ofNullable(flow.getOutputs())
|
||||
.map(outputs -> outputs
|
||||
.stream()
|
||||
.collect(Collectors.toMap(
|
||||
io.kestra.core.models.flows.Output::getId,
|
||||
io.kestra.core.models.flows.Output::getValue)
|
||||
)
|
||||
)
|
||||
.orElseGet(() -> isOutputsAllowed ? this.getOutputs() : null);
|
||||
|
||||
VariablesService variablesService = ((DefaultRunContext) runContext).getApplicationContext().getBean(VariablesService.class);
|
||||
if (subflowOutputs != null) {
|
||||
try {
|
||||
Map<String, Object> outputs = runContext.render(subflowOutputs);
|
||||
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
|
||||
if (flow.getOutputs() != null && flowInputOutput != null) {
|
||||
outputs = flowInputOutput.typedOutputs(flow, execution, outputs);
|
||||
}
|
||||
builder.outputs(outputs);
|
||||
} catch (Exception e) {
|
||||
runContext.logger().warn("Failed to extract outputs with the error: '{}'", e.getLocalizedMessage(), e);
|
||||
var state = State.Type.fail(this);
|
||||
Variables variables = variablesService.of(StorageContext.forTask(taskRun), builder.build());
|
||||
taskRun = taskRun
|
||||
.withState(state)
|
||||
.withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(state)).build()))
|
||||
.withOutputs(variables);
|
||||
if (this.wait) { // we only compute outputs if we wait for the subflow
|
||||
boolean isOutputsAllowed = runContext
|
||||
.<Boolean>pluginConfiguration(PLUGIN_FLOW_OUTPUTS_ENABLED)
|
||||
.orElse(true);
|
||||
|
||||
return Optional.of(SubflowExecutionResult.builder()
|
||||
.executionId(execution.getId())
|
||||
.state(State.Type.FAILED)
|
||||
.parentTaskRun(taskRun)
|
||||
.build());
|
||||
final Map<String, Object> subflowOutputs = Optional
|
||||
.ofNullable(flow.getOutputs())
|
||||
.map(outputs -> outputs
|
||||
.stream()
|
||||
.collect(Collectors.toMap(
|
||||
io.kestra.core.models.flows.Output::getId,
|
||||
io.kestra.core.models.flows.Output::getValue)
|
||||
)
|
||||
)
|
||||
.orElseGet(() -> isOutputsAllowed ? this.getOutputs() : null);
|
||||
|
||||
if (subflowOutputs != null) {
|
||||
try {
|
||||
Map<String, Object> outputs = runContext.render(subflowOutputs);
|
||||
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
|
||||
if (flow.getOutputs() != null && flowInputOutput != null) {
|
||||
outputs = flowInputOutput.typedOutputs(flow, execution, outputs);
|
||||
}
|
||||
builder.outputs(outputs);
|
||||
} catch (Exception e) {
|
||||
runContext.logger().warn("Failed to extract outputs with the error: '{}'", e.getLocalizedMessage(), e);
|
||||
var state = State.Type.fail(this);
|
||||
Variables variables = variablesService.of(StorageContext.forTask(taskRun), builder.build());
|
||||
taskRun = taskRun
|
||||
.withState(state)
|
||||
.withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(state)).build()))
|
||||
.withOutputs(variables);
|
||||
|
||||
return Optional.of(SubflowExecutionResult.builder()
|
||||
.executionId(execution.getId())
|
||||
.state(State.Type.FAILED)
|
||||
.parentTaskRun(taskRun)
|
||||
.build());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -9,11 +9,11 @@ import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.services.FlowService;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.codehaus.commons.nullanalysis.NotNull;
|
||||
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
|
||||
@@ -103,8 +103,10 @@ public class Set extends Task implements RunnableTask<VoidOutput> {
|
||||
|
||||
KVStore kvStore = runContext.namespaceKv(renderedNamespace);
|
||||
|
||||
if (kvType != null && renderedValue instanceof String renderedValueStr) {
|
||||
renderedValue = switch (runContext.render(kvType).as(KVType.class).orElseThrow()) {
|
||||
if (kvType != null){
|
||||
KVType renderedKvType = runContext.render(kvType).as(KVType.class).orElseThrow();
|
||||
if (renderedValue instanceof String renderedValueStr) {
|
||||
renderedValue = switch (renderedKvType) {
|
||||
case NUMBER -> JacksonMapper.ofJson().readValue(renderedValueStr, Number.class);
|
||||
case BOOLEAN -> Boolean.parseBoolean((String) renderedValue);
|
||||
case DATETIME, DATE -> Instant.parse(renderedValueStr);
|
||||
@@ -112,7 +114,10 @@ public class Set extends Task implements RunnableTask<VoidOutput> {
|
||||
case JSON -> JacksonMapper.toObject(renderedValueStr);
|
||||
default -> renderedValue;
|
||||
};
|
||||
} else if (renderedValue instanceof Number valueNumber && renderedKvType == KVType.STRING) {
|
||||
renderedValue = valueNumber.toString();
|
||||
}
|
||||
}
|
||||
|
||||
kvStore.put(renderedKey, new KVValueAndMetadata(
|
||||
new KVMetadata(
|
||||
|
||||
@@ -56,7 +56,8 @@ public class OverrideRetryInterceptor implements MethodInterceptor<Object, Objec
|
||||
retry.get("delay", Duration.class).orElse(Duration.ofSeconds(1)),
|
||||
retry.get("maxDelay", Duration.class).orElse(null),
|
||||
new DefaultRetryPredicate(resolveIncludes(retry, "includes"), resolveIncludes(retry, "excludes")),
|
||||
Throwable.class
|
||||
Throwable.class,
|
||||
0
|
||||
);
|
||||
|
||||
MutableConvertibleValues<Object> attrs = context.getAttributes();
|
||||
|
||||
@@ -0,0 +1,11 @@
|
||||
<svg width="512" height="512" viewBox="0 0 512 512" fill="currentColor" xmlns="http://www.w3.org/2000/svg">
|
||||
<g clip-path="url(#clip0_1765_9330)">
|
||||
<path d="M244.592 215.915C251.569 208.938 262.881 208.938 269.858 215.915L298.537 244.595C305.514 251.572 305.514 262.883 298.537 269.86L269.858 298.54C262.881 305.517 251.569 305.517 244.592 298.54L215.913 269.86C208.936 262.883 208.936 251.572 215.913 244.595L244.592 215.915Z" />
|
||||
<path d="M376.685 215.687C383.537 208.835 394.646 208.835 401.498 215.687L430.63 244.818C437.482 251.67 437.482 262.78 430.63 269.632L401.498 298.763C394.646 305.615 383.537 305.615 376.685 298.763L347.553 269.632C340.701 262.78 340.701 251.67 347.553 244.818L376.685 215.687Z" />
|
||||
<path d="M244.818 83.8243C251.671 76.9722 262.78 76.9722 269.632 83.8243L298.763 112.956C305.616 119.808 305.616 130.917 298.763 137.769L269.632 166.901C262.78 173.753 251.671 173.753 244.818 166.901L215.687 137.769C208.835 130.917 208.835 119.808 215.687 112.956L244.818 83.8243Z" />
|
||||
<path d="M232.611 178.663C239.588 185.64 239.588 196.951 232.611 203.928L203.931 232.608C196.955 239.585 185.643 239.585 178.666 232.608L149.986 203.928C143.01 196.952 143.01 185.64 149.986 178.663L178.666 149.983C185.643 143.006 196.955 143.006 203.931 149.983L232.611 178.663Z" />
|
||||
<path d="M166.901 244.818C173.753 251.67 173.753 262.78 166.901 269.632L137.77 298.763C130.918 305.615 119.808 305.615 112.956 298.763L83.8246 269.632C76.9725 262.78 76.9725 251.67 83.8246 244.818L112.956 215.687C119.808 208.835 130.918 208.835 137.77 215.687L166.901 244.818Z" />
|
||||
<path d="M364.472 178.663C371.449 185.64 371.449 196.951 364.472 203.928L335.793 232.608C328.816 239.585 317.504 239.585 310.527 232.608L281.848 203.928C274.871 196.952 274.871 185.64 281.848 178.663L310.527 149.983C317.504 143.006 328.816 143.006 335.793 149.983L364.472 178.663Z" />
|
||||
<path d="M285.45 367.015C301.037 382.602 301.037 407.873 285.45 423.46C269.863 439.047 244.591 439.047 229.004 423.46C213.417 407.873 213.417 382.602 229.004 367.015C244.591 351.428 269.863 351.428 285.45 367.015Z" />
|
||||
</g>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 2.1 KiB |
@@ -0,0 +1,11 @@
|
||||
<svg width="512" height="512" viewBox="0 0 512 512" fill="currentColor" xmlns="http://www.w3.org/2000/svg">
|
||||
<g clip-path="url(#clip0_1765_9330)">
|
||||
<path d="M244.592 215.915C251.569 208.938 262.881 208.938 269.858 215.915L298.537 244.595C305.514 251.572 305.514 262.883 298.537 269.86L269.858 298.54C262.881 305.517 251.569 305.517 244.592 298.54L215.913 269.86C208.936 262.883 208.936 251.572 215.913 244.595L244.592 215.915Z" />
|
||||
<path d="M376.685 215.687C383.537 208.835 394.646 208.835 401.498 215.687L430.63 244.818C437.482 251.67 437.482 262.78 430.63 269.632L401.498 298.763C394.646 305.615 383.537 305.615 376.685 298.763L347.553 269.632C340.701 262.78 340.701 251.67 347.553 244.818L376.685 215.687Z" />
|
||||
<path d="M244.818 83.8243C251.671 76.9722 262.78 76.9722 269.632 83.8243L298.763 112.956C305.616 119.808 305.616 130.917 298.763 137.769L269.632 166.901C262.78 173.753 251.671 173.753 244.818 166.901L215.687 137.769C208.835 130.917 208.835 119.808 215.687 112.956L244.818 83.8243Z" />
|
||||
<path d="M232.611 178.663C239.588 185.64 239.588 196.951 232.611 203.928L203.931 232.608C196.955 239.585 185.643 239.585 178.666 232.608L149.986 203.928C143.01 196.952 143.01 185.64 149.986 178.663L178.666 149.983C185.643 143.006 196.955 143.006 203.931 149.983L232.611 178.663Z" />
|
||||
<path d="M166.901 244.818C173.753 251.67 173.753 262.78 166.901 269.632L137.77 298.763C130.918 305.615 119.808 305.615 112.956 298.763L83.8246 269.632C76.9725 262.78 76.9725 251.67 83.8246 244.818L112.956 215.687C119.808 208.835 130.918 208.835 137.77 215.687L166.901 244.818Z" />
|
||||
<path d="M364.472 178.663C371.449 185.64 371.449 196.951 364.472 203.928L335.793 232.608C328.816 239.585 317.504 239.585 310.527 232.608L281.848 203.928C274.871 196.952 274.871 185.64 281.848 178.663L310.527 149.983C317.504 143.006 328.816 143.006 335.793 149.983L364.472 178.663Z" />
|
||||
<path d="M285.45 367.015C301.037 382.602 301.037 407.873 285.45 423.46C269.863 439.047 244.591 439.047 229.004 423.46C213.417 407.873 213.417 382.602 229.004 367.015C244.591 351.428 269.863 351.428 285.45 367.015Z" />
|
||||
</g>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 2.1 KiB |
@@ -112,7 +112,7 @@ class JsonSchemaGeneratorTest {
|
||||
|
||||
var requiredWithDefault = definitions.get("io.kestra.core.docs.JsonSchemaGeneratorTest-RequiredWithDefault");
|
||||
assertThat(requiredWithDefault, is(notNullValue()));
|
||||
assertThat((List<String>) requiredWithDefault.get("required"), not(contains("requiredWithDefault")));
|
||||
assertThat((List<String>) requiredWithDefault.get("required"), not(containsInAnyOrder("requiredWithDefault", "anotherRequiredWithDefault")));
|
||||
|
||||
var properties = (Map<String, Map<String, Object>>) flow.get("properties");
|
||||
var listeners = properties.get("listeners");
|
||||
@@ -253,7 +253,7 @@ class JsonSchemaGeneratorTest {
|
||||
void requiredAreRemovedIfThereIsADefault() {
|
||||
Map<String, Object> generate = jsonSchemaGenerator.properties(Task.class, RequiredWithDefault.class);
|
||||
assertThat(generate, is(not(nullValue())));
|
||||
assertThat((List<String>) generate.get("required"), not(containsInAnyOrder("requiredWithDefault")));
|
||||
assertThat((List<String>) generate.get("required"), not(containsInAnyOrder("requiredWithDefault", "anotherRequiredWithDefault")));
|
||||
assertThat((List<String>) generate.get("required"), containsInAnyOrder("requiredWithNoDefault"));
|
||||
}
|
||||
|
||||
@@ -466,6 +466,11 @@ class JsonSchemaGeneratorTest {
|
||||
@Builder.Default
|
||||
private Property<TaskWithEnum.TestClass> requiredWithDefault = Property.ofValue(TaskWithEnum.TestClass.builder().testProperty("test").build());
|
||||
|
||||
@PluginProperty
|
||||
@NotNull
|
||||
@Builder.Default
|
||||
private Property<TaskWithEnum.TestClass> anotherRequiredWithDefault = Property.ofValue(TaskWithEnum.TestClass.builder().testProperty("test2").build());
|
||||
|
||||
@PluginProperty
|
||||
@NotNull
|
||||
private Property<TaskWithEnum.TestClass> requiredWithNoDefault;
|
||||
|
||||
@@ -44,6 +44,7 @@ import java.time.ZonedDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static io.kestra.core.models.flows.FlowScope.USER;
|
||||
@@ -740,4 +741,16 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
|
||||
assertThat(executions.size()).isEqualTo(0L);
|
||||
}
|
||||
|
||||
@Test
|
||||
protected void shouldReturnLastExecutionsWhenInputsAreNull() {
|
||||
inject();
|
||||
|
||||
List<Execution> lastExecutions = executionRepository.lastExecutions(MAIN_TENANT, null);
|
||||
|
||||
assertThat(lastExecutions).isNotEmpty();
|
||||
Set<String> flowIds = lastExecutions.stream().map(Execution::getFlowId).collect(Collectors.toSet());
|
||||
assertThat(flowIds.size()).isEqualTo(lastExecutions.size());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -387,6 +387,13 @@ public abstract class AbstractRunnerTest {
|
||||
forEachItemCaseTest.forEachItemInIf();
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/for-each-item-subflow-after-execution.yaml",
|
||||
"flows/valids/for-each-item-after-execution.yaml"})
|
||||
protected void forEachItemWithAfterExecution() throws Exception {
|
||||
forEachItemCaseTest.forEachItemWithAfterExecution();
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-cancel.yml"})
|
||||
void concurrencyCancel() throws Exception {
|
||||
@@ -423,6 +430,18 @@ public abstract class AbstractRunnerTest {
|
||||
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem();
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-queue-fail.yml"})
|
||||
protected void concurrencyQueueRestarted() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueueRestarted();
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-queue-after-execution.yml"})
|
||||
void concurrencyQueueAfterExecution() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueueAfterExecution();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ExecuteFlow("flows/valids/executable-fail.yml")
|
||||
void badExecutable(Execution execution) {
|
||||
|
||||
@@ -442,4 +442,22 @@ class ExecutionServiceTest {
|
||||
assertThat(killed.findTaskRunsByTaskId("pause").getFirst().getState().getCurrent()).isEqualTo(State.Type.KILLED);
|
||||
assertThat(killed.getState().getHistories()).hasSize(5);
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/change-state-errors.yaml"})
|
||||
void changeStateWithErrorBranch() throws Exception {
|
||||
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "change-state-errors");
|
||||
Flow flow = flowRepository.findByExecution(execution);
|
||||
|
||||
assertThat(execution.getTaskRunList()).hasSize(3);
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||
|
||||
Execution restart = executionService.changeTaskRunState(execution, flow, execution.findTaskRunsByTaskId("make_error").getFirst().getId(), State.Type.SUCCESS);
|
||||
|
||||
assertThat(restart.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
|
||||
assertThat(restart.getMetadata().getAttemptNumber()).isEqualTo(2);
|
||||
assertThat(restart.getState().getHistories()).hasSize(4);
|
||||
assertThat(restart.getTaskRunList()).hasSize(2);
|
||||
assertThat(restart.findTaskRunsByTaskId("make_error").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
}
|
||||
}
|
||||
@@ -8,6 +8,7 @@ import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.services.ExecutionService;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -53,6 +54,9 @@ public class FlowConcurrencyCaseTest {
|
||||
@Named(QueueFactoryInterface.EXECUTION_NAMED)
|
||||
protected QueueInterface<Execution> executionQueue;
|
||||
|
||||
@Inject
|
||||
private ExecutionService executionService;
|
||||
|
||||
public void flowConcurrencyCancel() throws TimeoutException, QueueException, InterruptedException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
|
||||
Execution execution2 = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel");
|
||||
@@ -278,6 +282,115 @@ public class FlowConcurrencyCaseTest {
|
||||
assertThat(terminated.getState().getCurrent()).isEqualTo(Type.SUCCESS);
|
||||
}
|
||||
|
||||
public void flowConcurrencyQueueRestarted() throws Exception {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-fail", null, null, Duration.ofSeconds(30));
|
||||
Flow flow = flowRepository
|
||||
.findById(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-fail", Optional.empty())
|
||||
.orElseThrow();
|
||||
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
||||
executionQueue.emit(execution2);
|
||||
|
||||
assertThat(execution1.getState().isRunning()).isTrue();
|
||||
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CREATED);
|
||||
|
||||
var executionResult1 = new AtomicReference<Execution>();
|
||||
var executionResult2 = new AtomicReference<Execution>();
|
||||
|
||||
CountDownLatch latch1 = new CountDownLatch(2);
|
||||
AtomicReference<Execution> failedExecution = new AtomicReference<>();
|
||||
CountDownLatch latch2 = new CountDownLatch(1);
|
||||
CountDownLatch latch3 = new CountDownLatch(1);
|
||||
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
|
||||
if (e.getLeft().getId().equals(execution1.getId())) {
|
||||
executionResult1.set(e.getLeft());
|
||||
if (e.getLeft().getState().getCurrent() == Type.FAILED) {
|
||||
failedExecution.set(e.getLeft());
|
||||
latch1.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
if (e.getLeft().getId().equals(execution2.getId())) {
|
||||
executionResult2.set(e.getLeft());
|
||||
if (e.getLeft().getState().getCurrent() == State.Type.RUNNING) {
|
||||
latch2.countDown();
|
||||
}
|
||||
if (e.getLeft().getState().getCurrent() == Type.FAILED) {
|
||||
latch3.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue(latch2.await(1, TimeUnit.MINUTES));
|
||||
assertThat(failedExecution.get()).isNotNull();
|
||||
// here the first fail and the second is now running.
|
||||
// we restart the first one, it should be queued then fail again.
|
||||
Execution restarted = executionService.restart(failedExecution.get(), null);
|
||||
executionQueue.emit(restarted);
|
||||
|
||||
assertTrue(latch3.await(1, TimeUnit.MINUTES));
|
||||
assertTrue(latch1.await(1, TimeUnit.MINUTES));
|
||||
receive.blockLast();
|
||||
|
||||
assertThat(executionResult1.get().getState().getCurrent()).isEqualTo(Type.FAILED);
|
||||
// it should have been queued after restarted
|
||||
assertThat(executionResult1.get().getState().getHistories().stream().anyMatch(history -> history.getState() == Type.RESTARTED)).isTrue();
|
||||
assertThat(executionResult1.get().getState().getHistories().stream().anyMatch(history -> history.getState() == Type.QUEUED)).isTrue();
|
||||
assertThat(executionResult2.get().getState().getCurrent()).isEqualTo(Type.FAILED);
|
||||
assertThat(executionResult2.get().getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
|
||||
assertThat(executionResult2.get().getState().getHistories().get(1).getState()).isEqualTo(State.Type.QUEUED);
|
||||
assertThat(executionResult2.get().getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
|
||||
}
|
||||
|
||||
public void flowConcurrencyQueueAfterExecution() throws TimeoutException, QueueException, InterruptedException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-after-execution", null, null, Duration.ofSeconds(30));
|
||||
Flow flow = flowRepository
|
||||
.findById(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-after-execution", Optional.empty())
|
||||
.orElseThrow();
|
||||
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
||||
executionQueue.emit(execution2);
|
||||
|
||||
assertThat(execution1.getState().isRunning()).isTrue();
|
||||
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CREATED);
|
||||
|
||||
var executionResult1 = new AtomicReference<Execution>();
|
||||
var executionResult2 = new AtomicReference<Execution>();
|
||||
|
||||
CountDownLatch latch1 = new CountDownLatch(1);
|
||||
CountDownLatch latch2 = new CountDownLatch(1);
|
||||
CountDownLatch latch3 = new CountDownLatch(1);
|
||||
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
|
||||
if (e.getLeft().getId().equals(execution1.getId())) {
|
||||
executionResult1.set(e.getLeft());
|
||||
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
|
||||
latch1.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
if (e.getLeft().getId().equals(execution2.getId())) {
|
||||
executionResult2.set(e.getLeft());
|
||||
if (e.getLeft().getState().getCurrent() == State.Type.RUNNING) {
|
||||
latch2.countDown();
|
||||
}
|
||||
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
|
||||
latch3.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue(latch1.await(1, TimeUnit.MINUTES));
|
||||
assertTrue(latch2.await(1, TimeUnit.MINUTES));
|
||||
assertTrue(latch3.await(1, TimeUnit.MINUTES));
|
||||
receive.blockLast();
|
||||
|
||||
assertThat(executionResult1.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(executionResult2.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(executionResult2.get().getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
|
||||
assertThat(executionResult2.get().getState().getHistories().get(1).getState()).isEqualTo(State.Type.QUEUED);
|
||||
assertThat(executionResult2.get().getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
|
||||
}
|
||||
|
||||
private URI storageUpload() throws URISyntaxException, IOException {
|
||||
File tempFile = File.createTempFile("file", ".txt");
|
||||
|
||||
|
||||
@@ -83,4 +83,24 @@ class RunContextPropertyTest {
|
||||
runContextProperty = new RunContextProperty<>(Property.<Map<String, String>>builder().expression("{ \"key\": \"{{ key }}\"}").build(), runContext);
|
||||
assertThat(runContextProperty.asMap(String.class, String.class, Map.of("key", "value"))).containsEntry("key", "value");
|
||||
}
|
||||
|
||||
@Test
|
||||
void asShouldReturnCachedRenderedProperty() throws IllegalVariableEvaluationException {
|
||||
var runContext = runContextFactory.of();
|
||||
|
||||
var runContextProperty = new RunContextProperty<>(Property.<String>builder().expression("{{ variable }}").build(), runContext);
|
||||
|
||||
assertThat(runContextProperty.as(String.class, Map.of("variable", "value1"))).isEqualTo(Optional.of("value1"));
|
||||
assertThat(runContextProperty.as(String.class, Map.of("variable", "value2"))).isEqualTo(Optional.of("value1"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void asShouldNotReturnCachedRenderedPropertyWithSkipCache() throws IllegalVariableEvaluationException {
|
||||
var runContext = runContextFactory.of();
|
||||
|
||||
var runContextProperty = new RunContextProperty<>(Property.<String>builder().expression("{{ variable }}").build(), runContext);
|
||||
|
||||
assertThat(runContextProperty.as(String.class, Map.of("variable", "value1"))).isEqualTo(Optional.of("value1"));
|
||||
assertThat(runContextProperty.skipCache().as(String.class, Map.of("variable", "value2"))).isEqualTo(Optional.of("value2"));
|
||||
}
|
||||
}
|
||||
@@ -140,7 +140,7 @@ class RunContextTest {
|
||||
List<LogEntry> logs = new CopyOnWriteArrayList<>();
|
||||
Flux<LogEntry> receive = TestsUtils.receive(workerTaskLogQueue, either -> logs.add(either.getLeft()));
|
||||
|
||||
char[] chars = new char[1024 * 11];
|
||||
char[] chars = new char[1024 * 16];
|
||||
Arrays.fill(chars, 'a');
|
||||
|
||||
Map<String, Object> inputs = new HashMap<>(InputsTest.inputs);
|
||||
|
||||
@@ -5,6 +5,7 @@ import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.RunnableTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
@@ -18,6 +19,7 @@ import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
@@ -77,8 +79,12 @@ public class TaskCacheTest {
|
||||
@Plugin
|
||||
public static class CounterTask extends Task implements RunnableTask<CounterTask.Output> {
|
||||
|
||||
private String workingDir;
|
||||
|
||||
@Override
|
||||
public Output run(RunContext runContext) throws Exception {
|
||||
Map<String, Object> variables = Map.of("workingDir", runContext.workingDir().path().toString());
|
||||
runContext.render(this.workingDir, variables);
|
||||
return Output.builder()
|
||||
.counter(COUNTER.incrementAndGet())
|
||||
.build();
|
||||
|
||||
@@ -372,4 +372,44 @@ class FlowServiceTest {
|
||||
|
||||
assertThat(exceptions.size()).isZero();
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldReturnValidationForRunnablePropsOnFlowable() {
|
||||
// Given
|
||||
String source = """
|
||||
id: dolphin_164914
|
||||
namespace: company.team
|
||||
|
||||
tasks:
|
||||
- id: for
|
||||
type: io.kestra.plugin.core.flow.ForEach
|
||||
values: [1, 2, 3]
|
||||
workerGroup:
|
||||
key: toto
|
||||
timeout: PT10S
|
||||
taskCache:
|
||||
enabled: true
|
||||
tasks:
|
||||
- id: hello
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: Hello World! 🚀
|
||||
workerGroup:
|
||||
key: toto
|
||||
timeout: PT10S
|
||||
taskCache:
|
||||
enabled: true
|
||||
""";
|
||||
|
||||
// When
|
||||
List<ValidateConstraintViolation> results = flowService.validate("my-tenant", source);
|
||||
|
||||
// Then
|
||||
assertThat(results).hasSize(1);
|
||||
assertThat(results.getFirst().getWarnings()).hasSize(3);
|
||||
assertThat(results.getFirst().getWarnings()).containsExactlyInAnyOrder(
|
||||
"The task 'for' cannot use the 'timeout' property as it's only relevant for runnable tasks.",
|
||||
"The task 'for' cannot use the 'taskCache' property as it's only relevant for runnable tasks.",
|
||||
"The task 'for' cannot use the 'workerGroup' property as it's only relevant for runnable tasks."
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,142 @@
|
||||
package io.kestra.core.services;
|
||||
|
||||
import io.kestra.core.context.TestRunContextFactory;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionKind;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.plugin.core.log.Log;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static io.kestra.core.repositories.AbstractFlowRepositoryTest.TEST_NAMESPACE;
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@KestraTest
|
||||
class FlowTriggerServiceTest {
|
||||
public static final List<Label> EMPTY_LABELS = List.of();
|
||||
public static final Optional<MultipleConditionStorageInterface> EMPTY_MULTIPLE_CONDITION_STORAGE = Optional.empty();
|
||||
|
||||
@Inject
|
||||
private TestRunContextFactory runContextFactory;
|
||||
@Inject
|
||||
private ConditionService conditionService;
|
||||
@Inject
|
||||
private FlowService flowService;
|
||||
private FlowTriggerService flowTriggerService;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
flowTriggerService = new FlowTriggerService(conditionService, runContextFactory, flowService);
|
||||
}
|
||||
|
||||
@Test
|
||||
void computeExecutionsFromFlowTriggers_ok() {
|
||||
var simpleFlow = aSimpleFlow();
|
||||
var flowWithFlowTrigger = Flow.builder()
|
||||
.id("flow-with-flow-trigger")
|
||||
.namespace(TEST_NAMESPACE)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.tasks(List.of(simpleLogTask()))
|
||||
.triggers(List.of(
|
||||
flowTriggerWithNoConditions()
|
||||
))
|
||||
.build();
|
||||
|
||||
var simpleFlowExecution = Execution.newExecution(simpleFlow, EMPTY_LABELS).withState(State.Type.SUCCESS);
|
||||
|
||||
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
|
||||
simpleFlowExecution,
|
||||
List.of(simpleFlow, flowWithFlowTrigger),
|
||||
EMPTY_MULTIPLE_CONDITION_STORAGE
|
||||
);
|
||||
|
||||
assertThat(resultingExecutionsToRun).size().isEqualTo(1);
|
||||
assertThat(resultingExecutionsToRun.get(0).getFlowId()).isEqualTo(flowWithFlowTrigger.getId());
|
||||
}
|
||||
|
||||
@Test
|
||||
void computeExecutionsFromFlowTriggers_filteringOutCreatedExecutions() {
|
||||
var simpleFlow = aSimpleFlow();
|
||||
var flowWithFlowTrigger = Flow.builder()
|
||||
.id("flow-with-flow-trigger")
|
||||
.namespace(TEST_NAMESPACE)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.tasks(List.of(simpleLogTask()))
|
||||
.triggers(List.of(
|
||||
flowTriggerWithNoConditions()
|
||||
))
|
||||
.build();
|
||||
|
||||
var simpleFlowExecution = Execution.newExecution(simpleFlow, EMPTY_LABELS).withState(State.Type.CREATED);
|
||||
|
||||
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
|
||||
simpleFlowExecution,
|
||||
List.of(simpleFlow, flowWithFlowTrigger),
|
||||
EMPTY_MULTIPLE_CONDITION_STORAGE
|
||||
);
|
||||
|
||||
assertThat(resultingExecutionsToRun).size().isEqualTo(0);
|
||||
}
|
||||
|
||||
@Test
|
||||
void computeExecutionsFromFlowTriggers_filteringOutTestExecutions() {
|
||||
var simpleFlow = aSimpleFlow();
|
||||
var flowWithFlowTrigger = Flow.builder()
|
||||
.id("flow-with-flow-trigger")
|
||||
.namespace(TEST_NAMESPACE)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.tasks(List.of(simpleLogTask()))
|
||||
.triggers(List.of(
|
||||
flowTriggerWithNoConditions()
|
||||
))
|
||||
.build();
|
||||
|
||||
var simpleFlowExecutionComingFromATest = Execution.newExecution(simpleFlow, EMPTY_LABELS)
|
||||
.withState(State.Type.SUCCESS)
|
||||
.toBuilder()
|
||||
.kind(ExecutionKind.TEST)
|
||||
.build();
|
||||
|
||||
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
|
||||
simpleFlowExecutionComingFromATest,
|
||||
List.of(simpleFlow, flowWithFlowTrigger),
|
||||
EMPTY_MULTIPLE_CONDITION_STORAGE
|
||||
);
|
||||
|
||||
assertThat(resultingExecutionsToRun).size().isEqualTo(0);
|
||||
}
|
||||
|
||||
private static Flow aSimpleFlow() {
|
||||
return Flow.builder()
|
||||
.id("simple-flow")
|
||||
.namespace(TEST_NAMESPACE)
|
||||
.tenantId(MAIN_TENANT)
|
||||
.tasks(List.of(simpleLogTask()))
|
||||
.build();
|
||||
}
|
||||
|
||||
private static io.kestra.plugin.core.trigger.Flow flowTriggerWithNoConditions() {
|
||||
return io.kestra.plugin.core.trigger.Flow.builder()
|
||||
.id("flowTrigger")
|
||||
.type(io.kestra.plugin.core.trigger.Flow.class.getName())
|
||||
.build();
|
||||
}
|
||||
|
||||
private static Log simpleLogTask() {
|
||||
return Log.builder()
|
||||
.id(IdUtils.create())
|
||||
.type(Log.class.getName())
|
||||
.message("Hello World")
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@@ -372,6 +372,51 @@ public class ForEachItemCaseTest {
|
||||
assertThat(correlationId.get().value()).isEqualTo(execution.getId());
|
||||
}
|
||||
|
||||
public void forEachItemWithAfterExecution() throws TimeoutException, InterruptedException, URISyntaxException, IOException, QueueException {
|
||||
CountDownLatch countDownLatch = new CountDownLatch(26);
|
||||
AtomicReference<Execution> triggered = new AtomicReference<>();
|
||||
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
|
||||
Execution execution = either.getLeft();
|
||||
if (execution.getFlowId().equals("for-each-item-subflow-after-execution") && execution.getState().getCurrent().isTerminated()) {
|
||||
triggered.set(execution);
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
URI file = storageUpload();
|
||||
Map<String, Object> inputs = Map.of("file", file.toString(), "batch", 4);
|
||||
Execution execution = runnerUtils.runOne(MAIN_TENANT, TEST_NAMESPACE, "for-each-item-after-execution", null,
|
||||
(flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs),
|
||||
Duration.ofSeconds(30));
|
||||
|
||||
// we should have triggered 26 subflows
|
||||
assertThat(countDownLatch.await(1, TimeUnit.MINUTES)).isTrue();
|
||||
receive.blockLast();
|
||||
|
||||
// assert on the main flow execution
|
||||
assertThat(execution.getTaskRunList()).hasSize(5);
|
||||
assertThat(execution.getTaskRunList().get(2).getAttempts()).hasSize(1);
|
||||
assertThat(execution.getTaskRunList().get(2).getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
Map<String, Object> outputs = execution.getTaskRunList().get(2).getOutputs();
|
||||
assertThat(outputs.get("numberOfBatches")).isEqualTo(26);
|
||||
assertThat(outputs.get("iterations")).isNotNull();
|
||||
Map<String, Integer> iterations = (Map<String, Integer>) outputs.get("iterations");
|
||||
assertThat(iterations.get("CREATED")).isZero();
|
||||
assertThat(iterations.get("RUNNING")).isZero();
|
||||
assertThat(iterations.get("SUCCESS")).isEqualTo(26);
|
||||
|
||||
// assert on the last subflow execution
|
||||
assertThat(triggered.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(triggered.get().getFlowId()).isEqualTo("for-each-item-subflow-after-execution");
|
||||
assertThat((String) triggered.get().getInputs().get("items")).matches("kestra:///io/kestra/tests/for-each-item-after-execution/executions/.*/tasks/each-split/.*\\.txt");
|
||||
assertThat(triggered.get().getTaskRunList()).hasSize(2);
|
||||
Optional<Label> correlationId = triggered.get().getLabels().stream().filter(label -> label.key().equals(Label.CORRELATION_ID)).findAny();
|
||||
assertThat(correlationId.isPresent()).isTrue();
|
||||
assertThat(correlationId.get().value()).isEqualTo(execution.getId());
|
||||
}
|
||||
|
||||
private URI storageUpload() throws URISyntaxException, IOException {
|
||||
File tempFile = File.createTempFile("file", ".txt");
|
||||
|
||||
|
||||
@@ -58,4 +58,15 @@ class ParallelTest {
|
||||
assertThat(execution.findTaskRunsByTaskId("a2").getFirst().getState().getStartDate().isAfter(execution.findTaskRunsByTaskId("a1").getFirst().getState().getEndDate().orElseThrow())).isTrue();
|
||||
assertThat(execution.findTaskRunsByTaskId("e2").getFirst().getState().getStartDate().isAfter(execution.findTaskRunsByTaskId("e1").getFirst().getState().getEndDate().orElseThrow())).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ExecuteFlow("flows/valids/parallel-fail-with-flowable.yaml")
|
||||
void parallelFailWithFlowable(Execution execution) {
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
|
||||
assertThat(execution.getTaskRunList()).hasSize(5);
|
||||
// all tasks must be terminated except the Sleep that will ends later as everything is concurrent
|
||||
execution.getTaskRunList().stream()
|
||||
.filter(taskRun -> !"sleep".equals(taskRun.getTaskId()))
|
||||
.forEach(run -> assertThat(run.getState().isTerminated()).isTrue());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,16 +4,24 @@ import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||
import io.kestra.core.runners.RunnerUtils;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@KestraTest(startRunner = true)
|
||||
class SubflowRunnerTest {
|
||||
@@ -24,6 +32,10 @@ class SubflowRunnerTest {
|
||||
@Inject
|
||||
private ExecutionRepositoryInterface executionRepository;
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.EXECUTION_NAMED)
|
||||
protected QueueInterface<Execution> executionQueue;
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/subflow-inherited-labels-child.yaml", "flows/valids/subflow-inherited-labels-parent.yaml"})
|
||||
void inheritedLabelsAreOverridden() throws QueueException, TimeoutException {
|
||||
@@ -50,4 +62,29 @@ class SubflowRunnerTest {
|
||||
new Label("parentFlowLabel2", "value2") // inherited from the parent flow
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/subflow-parent-no-wait.yaml", "flows/valids/subflow-child-with-output.yaml"})
|
||||
void subflowOutputWithoutWait() throws QueueException, TimeoutException, InterruptedException {
|
||||
AtomicReference<Execution> childExecution = new AtomicReference<>();
|
||||
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
Runnable closing = executionQueue.receive(either -> {
|
||||
if (either.isLeft() && either.getLeft().getFlowId().equals("subflow-child-with-output") && either.getLeft().getState().isTerminated()) {
|
||||
childExecution.set(either.getLeft());
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Execution parentExecution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "subflow-parent-no-wait");
|
||||
String childExecutionId = (String) parentExecution.findTaskRunsByTaskId("subflow").getFirst().getOutputs().get("executionId");
|
||||
assertThat(childExecutionId).isNotBlank();
|
||||
assertThat(parentExecution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(parentExecution.getTaskRunList()).hasSize(1);
|
||||
|
||||
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
|
||||
assertThat(childExecution.get().getId()).isEqualTo(childExecutionId);
|
||||
assertThat(childExecution.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(childExecution.get().getTaskRunList()).hasSize(1);
|
||||
closing.run();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -206,14 +206,20 @@ class SetTest {
|
||||
|
||||
kv = createAndPerformSetTask("[{\"some\":\"value\"},{\"another\":\"value\"}]", KVType.JSON);
|
||||
assertThat(kv.getValue(TEST_KEY).orElseThrow().value()).isEqualTo(List.of(Map.of("some", "value"), Map.of("another", "value")));
|
||||
|
||||
kv = createAndPerformSetTask("{{ 200 }}", KVType.STRING);
|
||||
assertThat(kv.getValue(TEST_KEY).orElseThrow().value()).isEqualTo("200");
|
||||
|
||||
kv = createAndPerformSetTask("{{ 200.1 }}", KVType.STRING);
|
||||
assertThat(kv.getValue(TEST_KEY).orElseThrow().value()).isEqualTo("200.1");
|
||||
}
|
||||
|
||||
private KVStore createAndPerformSetTask(String value, KVType type) throws Exception {
|
||||
Set set = Set.builder()
|
||||
.id(Set.class.getSimpleName())
|
||||
.type(Set.class.getName())
|
||||
.key(new Property<>(TEST_KEY))
|
||||
.value(new Property<>(value))
|
||||
.key(Property.ofValue(TEST_KEY))
|
||||
.value(value.contains("{{") ? Property.ofExpression(value) : Property.ofValue(value))
|
||||
.kvType(Property.ofValue(type))
|
||||
.build();
|
||||
final RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, set, null);
|
||||
|
||||
@@ -4,6 +4,7 @@ namespace: io.kestra.tests
|
||||
tasks:
|
||||
- id: cache
|
||||
type: io.kestra.core.runners.TaskCacheTest$CounterTask
|
||||
workingDir: "{{workingDir}}"
|
||||
taskCache:
|
||||
enabled: true
|
||||
ttl: PT1S
|
||||
@@ -0,0 +1,18 @@
|
||||
id: change-state-errors
|
||||
namespace: io.kestra.tests
|
||||
|
||||
errors:
|
||||
- id: print_error_log
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: "Failure alert for flow {{ flow.namespace }}.{{ flow.id }} with ID {{ execution.id }}"
|
||||
|
||||
tasks:
|
||||
- id: before_task
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: "this always works"
|
||||
- id: make_error
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: "{{error}}"
|
||||
- id: after-task
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: "after"
|
||||
@@ -0,0 +1,17 @@
|
||||
id: flow-concurrency-queue-after-execution
|
||||
namespace: io.kestra.tests
|
||||
|
||||
concurrency:
|
||||
behavior: QUEUE
|
||||
limit: 1
|
||||
|
||||
tasks:
|
||||
- id: sleep
|
||||
type: io.kestra.plugin.core.flow.Sleep
|
||||
duration: PT2S
|
||||
|
||||
afterExecution:
|
||||
- id: afterExecution
|
||||
type: io.kestra.plugin.core.output.OutputValues
|
||||
values:
|
||||
some: value
|
||||
@@ -0,0 +1,13 @@
|
||||
id: flow-concurrency-queue-fail
|
||||
namespace: io.kestra.tests
|
||||
|
||||
concurrency:
|
||||
behavior: QUEUE
|
||||
limit: 1
|
||||
|
||||
tasks:
|
||||
- id: sleep
|
||||
type: io.kestra.plugin.core.flow.Sleep
|
||||
duration: PT2S
|
||||
- id: fail
|
||||
type: io.kestra.plugin.core.execution.Fail
|
||||
@@ -0,0 +1,26 @@
|
||||
id: for-each-item-after-execution
|
||||
namespace: io.kestra.tests
|
||||
|
||||
inputs:
|
||||
- id: file
|
||||
type: FILE
|
||||
- id: batch
|
||||
type: INT
|
||||
|
||||
tasks:
|
||||
- id: each
|
||||
type: io.kestra.plugin.core.flow.ForEachItem
|
||||
items: "{{ inputs.file }}"
|
||||
batch:
|
||||
rows: "{{inputs.batch}}"
|
||||
namespace: io.kestra.tests
|
||||
flowId: for-each-item-subflow-after-execution
|
||||
wait: true
|
||||
transmitFailed: true
|
||||
inputs:
|
||||
items: "{{ taskrun.items }}"
|
||||
|
||||
afterExecution:
|
||||
- id: afterExecution
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: Hello from afterExecution!
|
||||
@@ -0,0 +1,16 @@
|
||||
id: for-each-item-subflow-after-execution
|
||||
namespace: io.kestra.tests
|
||||
|
||||
inputs:
|
||||
- id: items
|
||||
type: STRING
|
||||
|
||||
tasks:
|
||||
- id: per-item
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: "{{ inputs.items }}"
|
||||
|
||||
afterExecution:
|
||||
- id: afterExecution
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: Hello from afterExecution!
|
||||
@@ -0,0 +1,28 @@
|
||||
id: parallel-fail-with-flowable
|
||||
namespace: io.kestra.tests
|
||||
|
||||
inputs:
|
||||
- id: user
|
||||
type: STRING
|
||||
defaults: Rick Astley
|
||||
|
||||
|
||||
tasks:
|
||||
- id: parallel
|
||||
type: io.kestra.plugin.core.flow.Parallel
|
||||
tasks:
|
||||
- id: if-1
|
||||
type: io.kestra.plugin.core.flow.If
|
||||
condition: "{{ inputs.user == 'Rick Astley'}}"
|
||||
then:
|
||||
- id: sleep
|
||||
type: io.kestra.plugin.core.flow.Sleep
|
||||
duration: PT1S
|
||||
|
||||
- id: if-2
|
||||
type: io.kestra.plugin.core.flow.If
|
||||
condition: "{{ inputs.user == 'Rick Astley'}}"
|
||||
then:
|
||||
- id: fail_missing_variable
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: "{{ vars.nonexistent_variable }}"
|
||||
@@ -0,0 +1,12 @@
|
||||
id: subflow-child-with-output
|
||||
namespace: io.kestra.tests
|
||||
|
||||
tasks:
|
||||
- id: return
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: "Some value"
|
||||
|
||||
outputs:
|
||||
- id: flow_a_output
|
||||
type: STRING
|
||||
value: "{{ outputs.return.value }}"
|
||||
@@ -0,0 +1,9 @@
|
||||
id: subflow-parent-no-wait
|
||||
namespace: io.kestra.tests
|
||||
|
||||
tasks:
|
||||
- id: subflow
|
||||
type: io.kestra.plugin.core.flow.Subflow
|
||||
namespace: io.kestra.tests
|
||||
flowId: subflow-child-with-output
|
||||
wait: false
|
||||
@@ -1,6 +1,6 @@
|
||||
version=0.24.0-SNAPSHOT
|
||||
version=0.24.5
|
||||
|
||||
org.gradle.jvmargs=-Xmx2g -XX:MaxMetaspaceSize=512m -XX:+HeapDumpOnOutOfMemoryError
|
||||
org.gradle.parallel=true
|
||||
org.gradle.caching=true
|
||||
org.gradle.priority=low
|
||||
org.gradle.priority=low
|
||||
|
||||
@@ -4,6 +4,7 @@ import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.executions.Variables;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.UnsupportedMessageException;
|
||||
import io.kestra.core.runners.WorkerTaskResult;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.jdbc.runner.JdbcQueueTest;
|
||||
@@ -31,7 +32,8 @@ class PostgresQueueTest extends JdbcQueueTest {
|
||||
.build();
|
||||
|
||||
var exception = assertThrows(QueueException.class, () -> workerTaskResultQueue.emit(workerTaskResult));
|
||||
assertThat(exception.getMessage()).isEqualTo("Unable to emit a message to the queue");
|
||||
assertThat(exception).isInstanceOf(UnsupportedMessageException.class);
|
||||
assertThat(exception.getMessage()).contains("ERROR: unsupported Unicode escape sequence");
|
||||
assertThat(exception.getCause()).isInstanceOf(DataException.class);
|
||||
}
|
||||
}
|
||||
@@ -869,8 +869,8 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
|
||||
@Override
|
||||
public List<Execution> lastExecutions(
|
||||
@Nullable String tenantId,
|
||||
List<FlowFilter> flows
|
||||
String tenantId,
|
||||
@Nullable List<FlowFilter> flows
|
||||
) {
|
||||
return this.jdbcRepository
|
||||
.getDslContextWrapper()
|
||||
@@ -892,14 +892,19 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
|
||||
.and(NORMAL_KIND_CONDITION)
|
||||
.and(field("end_date").isNotNull())
|
||||
.and(DSL.or(
|
||||
flows
|
||||
.stream()
|
||||
.map(flow -> DSL.and(
|
||||
field("namespace").eq(flow.getNamespace()),
|
||||
field("flow_id").eq(flow.getId())
|
||||
))
|
||||
.toList()
|
||||
));
|
||||
ListUtils.emptyOnNull(flows).isEmpty() ?
|
||||
DSL.trueCondition()
|
||||
:
|
||||
DSL.or(
|
||||
flows.stream()
|
||||
.map(flow -> DSL.and(
|
||||
field("namespace").eq(flow.getNamespace()),
|
||||
field("flow_id").eq(flow.getId())
|
||||
))
|
||||
.toList()
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
Table<Record2<Object, Integer>> cte = subquery.asTable("cte");
|
||||
|
||||
|
||||
@@ -20,6 +20,12 @@ public class AbstractJdbcExecutionRunningStorage extends AbstractJdbcRepository
|
||||
this.jdbcRepository = jdbcRepository;
|
||||
}
|
||||
|
||||
public void save(ExecutionRunning executionRunning) {
|
||||
jdbcRepository.getDslContextWrapper().transaction(
|
||||
configuration -> save(DSL.using(configuration), executionRunning)
|
||||
);
|
||||
}
|
||||
|
||||
public void save(DSLContext dslContext, ExecutionRunning executionRunning) {
|
||||
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(executionRunning);
|
||||
this.jdbcRepository.persist(executionRunning, dslContext, fields);
|
||||
|
||||
@@ -72,6 +72,8 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
private static final ObjectMapper MAPPER = JdbcMapper.of();
|
||||
|
||||
private final ScheduledExecutorService scheduledDelay = Executors.newSingleThreadScheduledExecutor();
|
||||
private ScheduledFuture<?> executionDelayFuture;
|
||||
private ScheduledFuture<?> monitorSLAFuture;
|
||||
|
||||
@Inject
|
||||
private AbstractJdbcExecutionRepository executionRepository;
|
||||
@@ -312,14 +314,14 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
this.receiveCancellations.addFirst(this.executionRunningQueue.receive(Executor.class, this::executionRunningQueue));
|
||||
this.clusterEventQueue.ifPresent(clusterEventQueueInterface -> this.receiveCancellations.addFirst(clusterEventQueueInterface.receive(this::clusterEventQueue)));
|
||||
|
||||
ScheduledFuture<?> scheduledDelayFuture = scheduledDelay.scheduleAtFixedRate(
|
||||
executionDelayFuture = scheduledDelay.scheduleAtFixedRate(
|
||||
this::executionDelaySend,
|
||||
0,
|
||||
1,
|
||||
TimeUnit.SECONDS
|
||||
);
|
||||
|
||||
ScheduledFuture<?> scheduledSLAMonitorFuture = scheduledDelay.scheduleAtFixedRate(
|
||||
monitorSLAFuture = scheduledDelay.scheduleAtFixedRate(
|
||||
this::executionSLAMonitor,
|
||||
0,
|
||||
1,
|
||||
@@ -329,11 +331,13 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
// look at exceptions on the scheduledDelay thread
|
||||
Thread.ofVirtual().name("jdbc-delay-exception-watcher").start(
|
||||
() -> {
|
||||
Await.until(scheduledDelayFuture::isDone);
|
||||
Await.until(executionDelayFuture::isDone);
|
||||
|
||||
try {
|
||||
scheduledDelayFuture.get();
|
||||
} catch (ExecutionException | InterruptedException | CancellationException e) {
|
||||
executionDelayFuture.get();
|
||||
} catch (CancellationException ignored) {
|
||||
|
||||
} catch (ExecutionException | InterruptedException e) {
|
||||
if (e.getCause() != null && e.getCause().getClass() != CannotCreateTransactionException.class) {
|
||||
log.error("Executor fatal exception in the scheduledDelay thread", e);
|
||||
close();
|
||||
@@ -346,11 +350,13 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
// look at exceptions on the scheduledSLAMonitorFuture thread
|
||||
Thread.ofVirtual().name("jdbc-sla-monitor-exception-watcher").start(
|
||||
() -> {
|
||||
Await.until(scheduledSLAMonitorFuture::isDone);
|
||||
Await.until(monitorSLAFuture::isDone);
|
||||
|
||||
try {
|
||||
scheduledSLAMonitorFuture.get();
|
||||
} catch (ExecutionException | InterruptedException | CancellationException e) {
|
||||
monitorSLAFuture.get();
|
||||
} catch (CancellationException ignored) {
|
||||
|
||||
} catch (ExecutionException | InterruptedException e) {
|
||||
if (e.getCause() != null && e.getCause().getClass() != CannotCreateTransactionException.class) {
|
||||
log.error("Executor fatal exception in the scheduledSLAMonitor thread", e);
|
||||
close();
|
||||
@@ -546,7 +552,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
}
|
||||
|
||||
// create an SLA monitor if needed
|
||||
if (execution.getState().getCurrent() == State.Type.CREATED && !ListUtils.isEmpty(flow.getSla())) {
|
||||
if ((execution.getState().getCurrent() == State.Type.CREATED || execution.getState().failedThenRestarted()) && !ListUtils.isEmpty(flow.getSla())) {
|
||||
List<SLAMonitor> monitors = flow.getSla().stream()
|
||||
.filter(ExecutionMonitoringSLA.class::isInstance)
|
||||
.map(ExecutionMonitoringSLA.class::cast)
|
||||
@@ -562,7 +568,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
|
||||
// handle concurrency limit, we need to use a different queue to be sure that execution running
|
||||
// are processed sequentially so inside a queue with no parallelism
|
||||
if (execution.getState().getCurrent() == State.Type.CREATED && flow.getConcurrency() != null) {
|
||||
if ((execution.getState().getCurrent() == State.Type.CREATED || execution.getState().failedThenRestarted()) && flow.getConcurrency() != null) {
|
||||
ExecutionRunning executionRunning = ExecutionRunning.builder()
|
||||
.tenantId(executor.getFlow().getTenantId())
|
||||
.namespace(executor.getFlow().getNamespace())
|
||||
@@ -1065,7 +1071,11 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
executorService.log(log, false, executor);
|
||||
}
|
||||
|
||||
// the terminated state can only come from the execution queue, in this case we always have a flow in the executor
|
||||
// the terminated state can come from the execution queue, in this case we always have a flow in the executor
|
||||
// or from a worker task in an afterExecution block, in this case we need to load the flow
|
||||
if (executor.getFlow() == null && executor.getExecution().getState().isTerminated()) {
|
||||
executor = executor.withFlow(findFlow(executor.getExecution()));
|
||||
}
|
||||
boolean isTerminated = executor.getFlow() != null && executionService.isTerminated(executor.getFlow(), executor.getExecution());
|
||||
|
||||
// purge the executionQueue
|
||||
@@ -1121,8 +1131,16 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
executor.getFlow().getId(),
|
||||
throwConsumer(queued -> {
|
||||
var newExecution = queued.withState(State.Type.RUNNING);
|
||||
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
|
||||
ExecutionRunning executionRunning = ExecutionRunning.builder()
|
||||
.tenantId(newExecution.getTenantId())
|
||||
.namespace(newExecution.getNamespace())
|
||||
.flowId(newExecution.getFlowId())
|
||||
.execution(newExecution)
|
||||
.concurrencyState(ExecutionRunning.ConcurrencyState.RUNNING)
|
||||
.build();
|
||||
executionRunningStorage.save(executionRunning);
|
||||
executionQueue.emit(newExecution);
|
||||
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
|
||||
})
|
||||
);
|
||||
}
|
||||
@@ -1207,13 +1225,13 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
try {
|
||||
// Handle paused tasks
|
||||
if (executionDelay.getDelayType().equals(ExecutionDelay.DelayType.RESUME_FLOW) && !pair.getLeft().getState().isTerminated()) {
|
||||
FlowInterface flow = flowMetaStore.findByExecution(pair.getLeft()).orElseThrow();
|
||||
if (executionDelay.getTaskRunId() == null) {
|
||||
// if taskRunId is null, this means we restart a flow that was delayed at startup (scheduled on)
|
||||
Execution markAsExecution = pair.getKey().withState(executionDelay.getState());
|
||||
executor = executor.withExecution(markAsExecution, "pausedRestart");
|
||||
} else {
|
||||
// if there is a taskRun it means we restart a paused task
|
||||
FlowInterface flow = flowMetaStore.findByExecution(pair.getLeft()).orElseThrow();
|
||||
Execution markAsExecution = executionService.markAs(
|
||||
pair.getKey(),
|
||||
flow,
|
||||
@@ -1362,13 +1380,11 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
private Executor handleFailedExecutionFromExecutor(Executor executor, Exception e) {
|
||||
Execution.FailedExecutionWithLog failedExecutionWithLog = executor.getExecution().failedExecutionFromExecutor(e);
|
||||
|
||||
failedExecutionWithLog.getLogs().forEach(log -> {
|
||||
try {
|
||||
logQueue.emitAsync(log);
|
||||
} catch (QueueException ex) {
|
||||
// fail silently
|
||||
}
|
||||
});
|
||||
try {
|
||||
logQueue.emitAsync(failedExecutionWithLog.getLogs());
|
||||
} catch (QueueException ex) {
|
||||
// fail silently
|
||||
}
|
||||
|
||||
return executor.withExecution(failedExecutionWithLog.getExecution(), "exception");
|
||||
}
|
||||
@@ -1386,7 +1402,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
|
||||
setState(ServiceState.TERMINATING);
|
||||
this.receiveCancellations.forEach(Runnable::run);
|
||||
scheduledDelay.shutdown();
|
||||
ExecutorsUtils.closeScheduledThreadPool(scheduledDelay, Duration.ofSeconds(5), List.of(executionDelayFuture, monitorSLAFuture));
|
||||
setState(ServiceState.TERMINATED_GRACEFULLY);
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
@@ -1423,4 +1439,4 @@ public class JdbcExecutor implements ExecutorInterface, Service {
|
||||
public ServiceState getState() {
|
||||
return state.get();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,16 +7,13 @@ import com.google.common.collect.Iterables;
|
||||
import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.queues.QueueService;
|
||||
import io.kestra.core.queues.*;
|
||||
import io.kestra.core.utils.Either;
|
||||
import io.kestra.core.utils.ExecutorsUtils;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.jdbc.JdbcTableConfigs;
|
||||
import io.kestra.jdbc.JdbcMapper;
|
||||
import io.kestra.jdbc.JooqDSLContextWrapper;
|
||||
import io.kestra.core.queues.MessageTooBigException;
|
||||
import io.kestra.jdbc.repository.AbstractJdbcRepository;
|
||||
import io.micrometer.core.instrument.Counter;
|
||||
import io.micrometer.core.instrument.Timer;
|
||||
@@ -42,6 +39,7 @@ import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
import static io.kestra.core.utils.Rethrow.throwRunnable;
|
||||
|
||||
@Slf4j
|
||||
@@ -151,6 +149,11 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
|
||||
.execute();
|
||||
});
|
||||
} catch (DataException e) { // The exception is from the data itself, not the database/network/driver so instead of fail fast, we throw a recoverable QueueException
|
||||
// Postgres refuses to store JSONB with the '\0000' codepoint as it has no textual representation.
|
||||
// We try to detect that and fail with a specific exception so the Worker can recover from it.
|
||||
if (e.getMessage() != null && e.getMessage().contains("ERROR: unsupported Unicode escape sequence")) {
|
||||
throw new UnsupportedMessageException(e.getMessage(), e);
|
||||
}
|
||||
throw new QueueException("Unable to emit a message to the queue", e);
|
||||
}
|
||||
|
||||
@@ -171,8 +174,8 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void emitAsync(String consumerGroup, T message) throws QueueException {
|
||||
this.asyncPoolExecutor.submit(throwRunnable(() -> this.emit(consumerGroup, message)));
|
||||
public void emitAsync(String consumerGroup, List<T> messages) throws QueueException {
|
||||
this.asyncPoolExecutor.submit(throwRunnable(() -> messages.forEach(throwConsumer(message -> this.emit(consumerGroup, message)))));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -27,7 +27,7 @@ dependencies {
|
||||
// as Jackson is in the Micronaut BOM, to force its version we need to use enforcedPlatform but it didn't really work, see later :(
|
||||
api enforcedPlatform("com.fasterxml.jackson:jackson-bom:$jacksonVersion")
|
||||
api enforcedPlatform("org.slf4j:slf4j-api:$slf4jVersion")
|
||||
api platform("io.micronaut.platform:micronaut-platform:4.8.2")
|
||||
api platform("io.micronaut.platform:micronaut-platform:4.9.2")
|
||||
api platform("io.qameta.allure:allure-bom:2.29.1")
|
||||
// we define cloud bom here for GCP, Azure and AWS so they are aligned for all plugins that use them (secret, storage, oss and ee plugins)
|
||||
api platform('com.google.cloud:libraries-bom:26.64.0')
|
||||
@@ -148,4 +148,4 @@ dependencies {
|
||||
api "io.kestra:runner-memory:$version"
|
||||
api "io.kestra:storage-local:$version"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,6 +38,10 @@ public abstract class AbstractTaskRunnerTest {
|
||||
@Test
|
||||
protected void run() throws Exception {
|
||||
var runContext = runContext(this.runContextFactory);
|
||||
simpleRun(runContext);
|
||||
}
|
||||
|
||||
private void simpleRun(RunContext runContext) throws Exception {
|
||||
var commands = initScriptCommands(runContext);
|
||||
Mockito.when(commands.getCommands()).thenReturn(
|
||||
Property.ofValue(ScriptService.scriptCommands(List.of("/bin/sh", "-c"), Collections.emptyList(), List.of("echo 'Hello World'")))
|
||||
@@ -166,6 +170,13 @@ public abstract class AbstractTaskRunnerTest {
|
||||
assertThat(taskException.getLogConsumer().getOutputs().get("logOutput")).isEqualTo("Hello World");
|
||||
}
|
||||
|
||||
@Test
|
||||
protected void canWorkMultipleTimeInSameWdir() throws Exception {
|
||||
var runContext = runContext(this.runContextFactory);
|
||||
simpleRun(runContext);
|
||||
simpleRun(runContext);
|
||||
}
|
||||
|
||||
protected RunContext runContext(RunContextFactory runContextFactory) {
|
||||
return this.runContext(runContextFactory, null);
|
||||
}
|
||||
@@ -236,4 +247,4 @@ public abstract class AbstractTaskRunnerTest {
|
||||
protected boolean needsToSpecifyWorkingDirectory() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
14
ui/package-lock.json
generated
14
ui/package-lock.json
generated
@@ -10,7 +10,7 @@
|
||||
"hasInstallScript": true,
|
||||
"dependencies": {
|
||||
"@js-joda/core": "^5.6.5",
|
||||
"@kestra-io/ui-libs": "^0.0.228",
|
||||
"@kestra-io/ui-libs": "^0.0.232",
|
||||
"@vue-flow/background": "^1.3.2",
|
||||
"@vue-flow/controls": "^1.1.2",
|
||||
"@vue-flow/core": "^1.45.0",
|
||||
@@ -1792,9 +1792,9 @@
|
||||
}
|
||||
},
|
||||
"node_modules/@eslint/plugin-kit": {
|
||||
"version": "0.3.3",
|
||||
"resolved": "https://registry.npmjs.org/@eslint/plugin-kit/-/plugin-kit-0.3.3.tgz",
|
||||
"integrity": "sha512-1+WqvgNMhmlAambTvT3KPtCl/Ibr68VldY2XY40SL1CE0ZXiakFR/cbTspaF5HsnpDMvcYYoJHfl4980NBjGag==",
|
||||
"version": "0.3.4",
|
||||
"resolved": "https://registry.npmjs.org/@eslint/plugin-kit/-/plugin-kit-0.3.4.tgz",
|
||||
"integrity": "sha512-Ul5l+lHEcw3L5+k8POx6r74mxEYKG5kOb6Xpy2gCRW6zweT6TEhAf8vhxGgjhqrd/VO/Dirhsb+1hNpD1ue9hw==",
|
||||
"dev": true,
|
||||
"license": "Apache-2.0",
|
||||
"dependencies": {
|
||||
@@ -3133,9 +3133,9 @@
|
||||
"license": "BSD-3-Clause"
|
||||
},
|
||||
"node_modules/@kestra-io/ui-libs": {
|
||||
"version": "0.0.228",
|
||||
"resolved": "https://registry.npmjs.org/@kestra-io/ui-libs/-/ui-libs-0.0.228.tgz",
|
||||
"integrity": "sha512-ZSUpBEhTJ7Ul0QtMU/ioDlgryoVwZv/BD1ko96q+m9sCA4Uab1yi2LUf+ZpEEzZWH8r37E/CNK6HNjG+tei7eA==",
|
||||
"version": "0.0.232",
|
||||
"resolved": "https://registry.npmjs.org/@kestra-io/ui-libs/-/ui-libs-0.0.232.tgz",
|
||||
"integrity": "sha512-4Z1DNxWEZSEEy2Tv63uNf2remxb/IqVUY01/qCaeYjLcp5axrS7Dn43N8DspA4EPdlhe4JFq2RhG13Pom+JDQA==",
|
||||
"dependencies": {
|
||||
"@nuxtjs/mdc": "^0.16.1",
|
||||
"@popperjs/core": "^2.11.8",
|
||||
|
||||
@@ -24,7 +24,7 @@
|
||||
},
|
||||
"dependencies": {
|
||||
"@js-joda/core": "^5.6.5",
|
||||
"@kestra-io/ui-libs": "^0.0.228",
|
||||
"@kestra-io/ui-libs": "^0.0.232",
|
||||
"@vue-flow/background": "^1.3.2",
|
||||
"@vue-flow/controls": "^1.1.2",
|
||||
"@vue-flow/core": "^1.45.0",
|
||||
@@ -149,7 +149,7 @@
|
||||
"@popperjs/core": "npm:@sxzz/popperjs-es@^2.11.7"
|
||||
},
|
||||
"el-table-infinite-scroll": {
|
||||
"vue": "$vue"
|
||||
"vue": "^3.5.18"
|
||||
},
|
||||
"storybook": "$storybook"
|
||||
},
|
||||
|
||||
@@ -25,4 +25,4 @@ const ANIMALS: string[] = [
|
||||
const getRandomNumber = (minimum: number = MINIMUM, maximum: number = MAXIMUM): number => Math.floor(Math.random() * (maximum - minimum + 1)) + minimum;
|
||||
const getRandomAnimal = (): string => ANIMALS[Math.floor(Math.random() * ANIMALS.length)];
|
||||
|
||||
export const getRandomFlowID = (): string => `${getRandomAnimal()}_${getRandomNumber()}`.toLowerCase();
|
||||
export const getRandomID = (): string => `${getRandomAnimal()}_${getRandomNumber()}`.toLowerCase();
|
||||
@@ -18,7 +18,6 @@
|
||||
import VueTour from "./components/onboarding/VueTour.vue";
|
||||
import DefaultLayout from "override/components/layout/DefaultLayout.vue";
|
||||
import DocIdDisplay from "./components/DocIdDisplay.vue";
|
||||
import posthog from "posthog-js";
|
||||
import "@kestra-io/ui-libs/style.css";
|
||||
|
||||
import {useApiStore} from "./stores/api";
|
||||
@@ -26,6 +25,7 @@
|
||||
import {useLayoutStore} from "./stores/layout";
|
||||
import {useCoreStore} from "./stores/core";
|
||||
import {useDocStore} from "./stores/doc";
|
||||
import {initPostHogForSetup} from "./composables/usePosthog";
|
||||
import {useMiscStore} from "./stores/misc";
|
||||
import {useExecutionsStore} from "./stores/executions";
|
||||
import * as BasicAuth from "./utils/basicAuth";
|
||||
@@ -118,66 +118,10 @@
|
||||
uid: uid,
|
||||
});
|
||||
|
||||
this.apiStore.loadConfig()
|
||||
.then(apiConfig => {
|
||||
this.initStats(apiConfig, config, uid);
|
||||
})
|
||||
await initPostHogForSetup(config);
|
||||
|
||||
return config;
|
||||
},
|
||||
initStats(apiConfig, config, uid) {
|
||||
if (!this.configs || this.configs["isAnonymousUsageEnabled"] === false) {
|
||||
return;
|
||||
}
|
||||
|
||||
// only run posthog in production
|
||||
if (import.meta.env.MODE === "production") {
|
||||
posthog.init(
|
||||
apiConfig.posthog.token,
|
||||
{
|
||||
api_host: apiConfig.posthog.apiHost,
|
||||
ui_host: "https://eu.posthog.com",
|
||||
capture_pageview: false,
|
||||
capture_pageleave: true,
|
||||
autocapture: false,
|
||||
}
|
||||
)
|
||||
|
||||
posthog.register_once(this.statsGlobalData(config, uid));
|
||||
|
||||
if (!posthog.get_property("__alias")) {
|
||||
posthog.alias(apiConfig.id);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
let surveyVisible = false;
|
||||
window.addEventListener("PHSurveyShown", () => {
|
||||
surveyVisible = true;
|
||||
});
|
||||
|
||||
window.addEventListener("PHSurveyClosed", () => {
|
||||
surveyVisible = false;
|
||||
})
|
||||
|
||||
window.addEventListener("KestraRouterAfterEach", () => {
|
||||
if (surveyVisible) {
|
||||
window.dispatchEvent(new Event("PHSurveyClosed"))
|
||||
surveyVisible = false;
|
||||
}
|
||||
})
|
||||
},
|
||||
statsGlobalData(config, uid) {
|
||||
return {
|
||||
from: "APP",
|
||||
iid: config.uuid,
|
||||
uid: uid,
|
||||
app: {
|
||||
version: config.version,
|
||||
type: "OSS"
|
||||
}
|
||||
}
|
||||
},
|
||||
},
|
||||
watch: {
|
||||
$route: {
|
||||
|
||||
@@ -3,34 +3,37 @@
|
||||
<el-table-column v-for="(column, index) in generateTableColumns" :key="index" :prop="column" :label="column">
|
||||
<template #default="scope">
|
||||
<template v-if="isComplex(scope.row[column])">
|
||||
<editor
|
||||
:full-height="false"
|
||||
:input="true"
|
||||
:navbar="false"
|
||||
:model-value="JSON.stringify(scope.row[column])"
|
||||
lang="json"
|
||||
read-only
|
||||
<el-input
|
||||
type="textarea"
|
||||
:model-value="truncate(JSON.stringify(scope.row[column], null, 2))"
|
||||
readonly
|
||||
:rows="3"
|
||||
autosize
|
||||
class="ks-editor"
|
||||
resize="none"
|
||||
/>
|
||||
</template>
|
||||
<template v-else>
|
||||
{{ scope.row[column] }}
|
||||
{{ truncate(scope.row[column]) }}
|
||||
</template>
|
||||
</template>
|
||||
</el-table-column>
|
||||
</el-table>
|
||||
</template>
|
||||
<script>
|
||||
import Editor from "./inputs/Editor.vue";
|
||||
|
||||
export default {
|
||||
name: "ListPreview",
|
||||
components: {Editor},
|
||||
props: {
|
||||
value: {
|
||||
type: Array,
|
||||
required: true
|
||||
}
|
||||
},
|
||||
data() {
|
||||
return {
|
||||
maxColumnLength: 100
|
||||
}
|
||||
},
|
||||
computed: {
|
||||
generateTableColumns() {
|
||||
const allKeys = new Set();
|
||||
@@ -43,6 +46,12 @@
|
||||
methods: {
|
||||
isComplex(data) {
|
||||
return data instanceof Array || data instanceof Object;
|
||||
},
|
||||
truncate(text) {
|
||||
if (typeof text !== "string") return text;
|
||||
return text.length > this.maxColumnLength
|
||||
? text.slice(0, this.maxColumnLength) + "..."
|
||||
: text;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -48,6 +48,7 @@
|
||||
v-on="activeTab['v-on'] ?? {}"
|
||||
ref="tabContent"
|
||||
:is="activeTab.component"
|
||||
:namespace="namespaceToForward"
|
||||
@go-to-detail="blueprintId => selectedBlueprintId = blueprintId"
|
||||
:embed="activeTab.props && activeTab.props.embed !== undefined ? activeTab.props.embed : true"
|
||||
/>
|
||||
@@ -163,16 +164,11 @@
|
||||
},
|
||||
getTabClasses(tab) {
|
||||
const isEnterpriseTab = tab.locked;
|
||||
const isGanttTab = tab.name === "gantt";
|
||||
const ROUTES = ["/flows/edit/", "/namespaces/edit/"];
|
||||
const EDIT_ROUTES = ROUTES.some(route => this.$route.path.startsWith(route));
|
||||
const isOverviewTab = EDIT_ROUTES && tab.title === "Overview";
|
||||
|
||||
return {
|
||||
"container": !isEnterpriseTab && !isOverviewTab,
|
||||
"mt-4": !isEnterpriseTab && !isOverviewTab,
|
||||
"px-0": isEnterpriseTab && isOverviewTab,
|
||||
"gantt-container": isGanttTab
|
||||
"container": !isEnterpriseTab,
|
||||
"mt-4": !isEnterpriseTab,
|
||||
"px-0": isEnterpriseTab,
|
||||
};
|
||||
},
|
||||
},
|
||||
@@ -209,6 +205,11 @@
|
||||
Object.entries(this.$attrs)
|
||||
.filter(([key]) => key !== "class")
|
||||
);
|
||||
},
|
||||
namespaceToForward(){
|
||||
return this.activeTab.props?.namespace ?? this.namespace;
|
||||
// in the special case of Namespace creation on Namespaces page, the tabs are loaded before the namespace creation
|
||||
// in this case this.props.namespace will be used
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -208,7 +208,7 @@
|
||||
import MailChecker from "mailchecker"
|
||||
import {useMiscStore} from "../../stores/misc"
|
||||
import {useSurveySkip} from "../../composables/useSurveyData"
|
||||
import {initPostHogForSetup, trackSetupEvent} from "../../utils/setupPosthog"
|
||||
import {initPostHogForSetup, trackSetupEvent} from "../../composables/usePosthog"
|
||||
|
||||
import Cogs from "vue-material-design-icons/Cogs.vue"
|
||||
import AccountPlus from "vue-material-design-icons/AccountPlus.vue"
|
||||
@@ -312,9 +312,9 @@
|
||||
const setupConfigurationLines = computed<ConfigLine[]>(() => {
|
||||
if (!setupConfiguration.value) return []
|
||||
const configs = miscStore.configs
|
||||
|
||||
|
||||
const basicAuthValue = activeStep.value >= 1 || configs?.isBasicAuthInitialized
|
||||
|
||||
|
||||
return [
|
||||
{name: "repository", icon: Database, value: setupConfiguration.value.repositoryType || "NOT SETUP"},
|
||||
{name: "queue", icon: CurrentDc, value: setupConfiguration.value.queueType || "NOT SETUP"},
|
||||
@@ -346,7 +346,7 @@
|
||||
])
|
||||
|
||||
const EMAIL_REGEX = /^[a-zA-Z0-9_!#$%&'*+/=?`{|}~^.-]+@[a-zA-Z0-9.-]+$/
|
||||
const PASSWORD_REGEX = /^(?=.*[A-Z])(?=.*\d).{8,}$/
|
||||
const PASSWORD_REGEX = /^(?=.*[A-Z])(?=.*\d)\S{8,}$/
|
||||
|
||||
const validateEmail = (_rule: any, value: string, callback: (error?: Error) => void) => {
|
||||
if (!value) {
|
||||
@@ -420,9 +420,9 @@
|
||||
user_email: userFormData.value.username
|
||||
}, userFormData.value)
|
||||
|
||||
|
||||
|
||||
localStorage.setItem("basicAuthUserCreated", "true")
|
||||
|
||||
|
||||
nextStep()
|
||||
} catch (error: any) {
|
||||
trackSetupEvent("setup_flow:account_creation_failed", {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
<template>
|
||||
<div @click="handleClick" class="d-flex my-2 p-2 rounded element" :class="{'moved': moved}">
|
||||
<div class="me-2 icon">
|
||||
<div v-if="props.parentPathComplete !== 'inputs'" class="me-2 icon">
|
||||
<TaskIcon :cls="element.type" :icons only-icon />
|
||||
</div>
|
||||
|
||||
@@ -85,6 +85,7 @@
|
||||
|
||||
<style scoped lang="scss">
|
||||
@import "../../styles/code.scss";
|
||||
@import "@kestra-io/ui-libs/src/scss/_color-palette";
|
||||
|
||||
.element {
|
||||
cursor: pointer;
|
||||
@@ -107,7 +108,8 @@
|
||||
}
|
||||
|
||||
.playground-run-task{
|
||||
background-color: blue;
|
||||
color: $base-white;
|
||||
background-color: $base-blue-400;
|
||||
height: 16px;
|
||||
width: 16px;
|
||||
font-size: 4px;
|
||||
|
||||
@@ -30,7 +30,7 @@
|
||||
</template>
|
||||
|
||||
<script setup lang="ts">
|
||||
import {onMounted, computed, inject, ref, provide} from "vue";
|
||||
import {onMounted, computed, inject, ref, provide, onActivated} from "vue";
|
||||
import {useI18n} from "vue-i18n";
|
||||
import {useStore} from "vuex";
|
||||
import {usePluginsStore} from "../../../stores/plugins";
|
||||
@@ -73,6 +73,10 @@
|
||||
return !complexObject
|
||||
}
|
||||
|
||||
onActivated(() => {
|
||||
pluginsStore.updateDocumentation();
|
||||
});
|
||||
|
||||
function onTaskUpdateField(key: string, val: any) {
|
||||
const realValue = val === null || val === undefined ? undefined :
|
||||
// allow array to be created with null values (specifically for metadata)
|
||||
@@ -160,11 +164,8 @@
|
||||
task: parsedFlow.value,
|
||||
})
|
||||
|
||||
|
||||
const fieldsFromSchemaTop = computed(() => MAIN_KEYS.map(key => getFieldFromKey(key, "main")))
|
||||
|
||||
|
||||
|
||||
const fieldsFromSchemaRest = computed(() => {
|
||||
return Object.keys(pluginsStore.flowRootProperties ?? {})
|
||||
.filter((key) => !MAIN_KEYS.includes(key) && !HIDDEN_FIELDS.includes(key))
|
||||
|
||||
@@ -14,11 +14,11 @@
|
||||
/>
|
||||
</section>
|
||||
|
||||
<Sections :key :dashboard :charts :show-default="dashboard.id === 'default'" :padding="padding" />
|
||||
<Sections ref="dashboardComponent" :dashboard :charts :show-default="dashboard.id === 'default'" :padding="padding" />
|
||||
</template>
|
||||
|
||||
<script setup lang="ts">
|
||||
import {computed, onBeforeMount, ref} from "vue";
|
||||
import {computed, onBeforeMount, ref, useTemplateRef} from "vue";
|
||||
|
||||
import type {Dashboard, Chart} from "./composables/useDashboards";
|
||||
import {ALLOWED_CREATION_ROUTES, getDashboard, processFlowYaml} from "./composables/useDashboards";
|
||||
@@ -43,8 +43,6 @@
|
||||
import YAML_FLOW from "./assets/default_flow_definition.yaml?raw";
|
||||
import YAML_NAMESPACE from "./assets/default_namespace_definition.yaml?raw";
|
||||
|
||||
import UTILS from "../../utils/utils.js";
|
||||
|
||||
import {useRoute, useRouter} from "vue-router";
|
||||
const route = useRoute();
|
||||
const router = useRouter();
|
||||
@@ -65,21 +63,18 @@
|
||||
const dashboard = ref<Dashboard>({id: "", charts: []});
|
||||
const charts = ref<Chart[]>([]);
|
||||
|
||||
// We use a key to force re-rendering of the Sections component
|
||||
let key = ref(UTILS.uid());
|
||||
|
||||
const loadCharts = async (allCharts: Chart[] = []) => {
|
||||
charts.value = [];
|
||||
|
||||
for (const chart of allCharts) {
|
||||
charts.value.push({...chart, content: stringify(chart)});
|
||||
}
|
||||
|
||||
refreshCharts()
|
||||
};
|
||||
|
||||
const dashboardComponent = useTemplateRef("dashboardComponent");
|
||||
|
||||
const refreshCharts = () => {
|
||||
key.value = UTILS.uid();
|
||||
dashboardComponent.value!.refreshCharts();
|
||||
};
|
||||
|
||||
const load = async (id = "default", defaultYAML = YAML_MAIN) => {
|
||||
|
||||
@@ -38,6 +38,8 @@
|
||||
import type {Dashboard} from "../../../components/dashboard/composables/useDashboards";
|
||||
import {getDashboard, processFlowYaml} from "../../../components/dashboard/composables/useDashboards";
|
||||
|
||||
import {getRandomID} from "../../../../scripts/id";
|
||||
|
||||
const dashboard = ref<Dashboard>({id: "", charts: []});
|
||||
const save = async (source: string) => {
|
||||
const response = await dashboardStore.create(source)
|
||||
@@ -69,6 +71,8 @@
|
||||
} else {
|
||||
dashboard.value.sourceCode = name === "namespaces/update" ? YAML_NAMESPACE : YAML_MAIN;
|
||||
}
|
||||
|
||||
dashboard.value.sourceCode = "id: " + getRandomID() + "\n" + dashboard.value.sourceCode;
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@@ -103,8 +103,6 @@
|
||||
</div>
|
||||
</template>
|
||||
<script setup>
|
||||
import {YamlUtils as YAML_UTILS} from "@kestra-io/ui-libs";
|
||||
|
||||
import PluginDocumentation from "../../plugins/PluginDocumentation.vue";
|
||||
import Sections from "../sections/Sections.vue";
|
||||
import ValidationErrors from "../../flows/ValidationError.vue"
|
||||
@@ -125,6 +123,8 @@
|
||||
import yaml from "yaml";
|
||||
import ContentSave from "vue-material-design-icons/ContentSave.vue";
|
||||
import intro from "../../../assets/docs/dashboard_home.md?raw";
|
||||
import * as YAML_UTILS from "@kestra-io/ui-libs/flow-yaml-utils";
|
||||
import {useCoreStore} from "../../../stores/core.js";
|
||||
|
||||
export default {
|
||||
computed: {
|
||||
@@ -144,6 +144,9 @@
|
||||
displaySide() {
|
||||
return this.currentView !== this.views.NONE && this.currentView !== this.views.DASHBOARD;
|
||||
},
|
||||
dashboardId() {
|
||||
return this.initialSource === undefined ? undefined : YAML_UTILS.parse(this.initialSource).id
|
||||
}
|
||||
},
|
||||
props: {
|
||||
allowSaveUnchanged: {
|
||||
@@ -153,6 +156,10 @@
|
||||
initialSource: {
|
||||
type: String,
|
||||
default: undefined
|
||||
},
|
||||
modelValue: {
|
||||
type: String,
|
||||
default: undefined
|
||||
}
|
||||
},
|
||||
mounted() {
|
||||
@@ -164,7 +171,7 @@
|
||||
methods: {
|
||||
async updatePluginDocumentation(event) {
|
||||
if (this.currentView === this.views.DOC) {
|
||||
const type = YAML_UTILS.getTaskType(event.model.getValue(), event.position, this.plugins)
|
||||
const type = YAML_UTILS.getTypeAtPosition(event.model.getValue(), event.position, this.plugins);
|
||||
if (type) {
|
||||
|
||||
this.pluginsStore.load({cls: type})
|
||||
@@ -280,6 +287,23 @@
|
||||
this.errors = undefined;
|
||||
}
|
||||
});
|
||||
|
||||
if (YAML_UTILS.parse(this.source).id !== this.dashboardId) {
|
||||
const coreStore = useCoreStore();
|
||||
coreStore.message = {
|
||||
variant: "error",
|
||||
title: this.$t("readonly property"),
|
||||
message: this.$t("dashboards.edition.id readonly"),
|
||||
};
|
||||
|
||||
this.$nextTick(() => {
|
||||
this.source = YAML_UTILS.replaceBlockWithPath({
|
||||
source: this.source,
|
||||
path: "id",
|
||||
newContent: this.dashboardId
|
||||
});
|
||||
})
|
||||
}
|
||||
}
|
||||
},
|
||||
beforeUnmount() {
|
||||
|
||||
@@ -92,6 +92,20 @@ export function defaultConfig(override, theme) {
|
||||
);
|
||||
}
|
||||
|
||||
export function extractState(value) {
|
||||
if (!value || typeof value !== "string") return value;
|
||||
|
||||
if (value.includes(",")) {
|
||||
const stateNames = State.arrayAllStates().map(state => state.name);
|
||||
const matchedState = value.split(",")
|
||||
.map(part => part.trim())
|
||||
.find(part => stateNames.includes(part.toUpperCase()));
|
||||
return matchedState || value;
|
||||
}
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
export function chartClick(moment, router, route, event, parsedData, elements, type = "label") {
|
||||
const query = {};
|
||||
|
||||
@@ -107,7 +121,7 @@ export function chartClick(moment, router, route, event, parsedData, elements, t
|
||||
state = parsedData.labels[element.index];
|
||||
}
|
||||
if (state) {
|
||||
query.state = state;
|
||||
query.state = extractState(state);
|
||||
query.scope = "USER";
|
||||
query.size = 100;
|
||||
query.page = 1;
|
||||
@@ -137,7 +151,7 @@ export function chartClick(moment, router, route, event, parsedData, elements, t
|
||||
}
|
||||
|
||||
if (event.state) {
|
||||
query.state = event.state;
|
||||
query.state = extractState(event.state);
|
||||
}
|
||||
|
||||
if (route.query.namespace) {
|
||||
|
||||
@@ -131,7 +131,7 @@ export function useChartGenerator(props: {chart: Chart; filters: string[]; showD
|
||||
const data = ref();
|
||||
const generate = async (id: string, pagination?: { pageNumber: number; pageSize: number }) => {
|
||||
const filters = props.filters.concat(decodeSearchParams(route.query, undefined, []) ?? []);
|
||||
const parameters: Parameters = {...(pagination ?? {}), ...(filters ?? {})};
|
||||
const parameters: Parameters = {...(pagination ?? {}), filters: (filters ?? {})};
|
||||
|
||||
if (!props.showDefault) {
|
||||
data.value = await dashboardStore.generate(id, props.chart.id, parameters);
|
||||
|
||||
@@ -11,12 +11,12 @@
|
||||
</template>
|
||||
|
||||
<script lang="ts" setup>
|
||||
import {PropType, computed} from "vue";
|
||||
import {PropType, computed, watch} from "vue";
|
||||
import moment from "moment";
|
||||
import {Bar} from "vue-chartjs";
|
||||
|
||||
import NoData from "../../layout/NoData.vue";
|
||||
import type {Chart} from "../composables/useDashboards";
|
||||
import {Chart, getDashboard} from "../composables/useDashboards";
|
||||
import {useChartGenerator} from "../composables/useDashboards";
|
||||
|
||||
|
||||
@@ -48,7 +48,7 @@
|
||||
const DEFAULTS = {
|
||||
display: true,
|
||||
stacked: true,
|
||||
ticks: {maxTicksLimit: 8 , stepSize: 1},
|
||||
ticks: {maxTicksLimit: 8},
|
||||
grid: {display: false},
|
||||
};
|
||||
|
||||
@@ -159,7 +159,19 @@
|
||||
return {labels, datasets};
|
||||
});
|
||||
|
||||
const {data: generated} = useChartGenerator(props);
|
||||
const {data: generated, generate} = useChartGenerator(props);
|
||||
|
||||
function refresh() {
|
||||
return generate(getDashboard(route, "id")!);
|
||||
}
|
||||
|
||||
defineExpose({
|
||||
refresh
|
||||
});
|
||||
|
||||
watch(() => route.params.filters, () => {
|
||||
refresh();
|
||||
}, {deep: true});
|
||||
</script>
|
||||
|
||||
<style lang="scss" scoped>
|
||||
@@ -182,4 +194,4 @@
|
||||
min-height: var(--chart-height);
|
||||
max-height: var(--chart-height);
|
||||
}
|
||||
</style>
|
||||
</style>
|
||||
|
||||
@@ -10,12 +10,13 @@
|
||||
</template>
|
||||
|
||||
<script setup lang="ts">
|
||||
import {PropType} from "vue";
|
||||
import {PropType, watch} from "vue";
|
||||
|
||||
import type {Chart} from "../composables/useDashboards";
|
||||
import {Chart, getDashboard} from "../composables/useDashboards";
|
||||
import {getChartTitle, getPropertyValue, useChartGenerator} from "../composables/useDashboards";
|
||||
|
||||
import NoData from "../../layout/NoData.vue";
|
||||
import {useRoute} from "vue-router";
|
||||
|
||||
const props = defineProps({
|
||||
chart: {type: Object as PropType<Chart>, required: true},
|
||||
@@ -23,7 +24,21 @@
|
||||
showDefault: {type: Boolean, default: false},
|
||||
});
|
||||
|
||||
const {percentageShown, EMPTY_TEXT, data} = useChartGenerator(props);
|
||||
const route = useRoute();
|
||||
|
||||
const {percentageShown, EMPTY_TEXT, data, generate} = useChartGenerator(props);
|
||||
|
||||
function refresh() {
|
||||
return generate(getDashboard(route, "id")!);
|
||||
}
|
||||
|
||||
defineExpose({
|
||||
refresh
|
||||
});
|
||||
|
||||
watch(() => route.params.filters, () => {
|
||||
refresh();
|
||||
}, {deep: true});
|
||||
</script>
|
||||
|
||||
<style scoped lang="scss">
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
</template>
|
||||
|
||||
<script setup lang="ts">
|
||||
import {PropType, onMounted, watch, ref} from "vue";
|
||||
import {PropType, watch, ref} from "vue";
|
||||
|
||||
import type {RouteLocation} from "vue-router";
|
||||
|
||||
@@ -34,9 +34,17 @@
|
||||
else data.value = props.chart.content ?? props.chart.source?.content;
|
||||
};
|
||||
|
||||
const dashboardID = (route: RouteLocation) => getDashboard(route, "id") || "default"
|
||||
const dashboardID = (route: RouteLocation) => getDashboard(route, "id")!;
|
||||
|
||||
watch(route, async (changed) => await getData(dashboardID(changed)));
|
||||
function refresh() {
|
||||
return getData(dashboardID(route));
|
||||
}
|
||||
|
||||
onMounted(async () => await getData(dashboardID(route)));
|
||||
defineExpose({
|
||||
refresh
|
||||
});
|
||||
|
||||
watch(() => route.params.filters, () => {
|
||||
refresh();
|
||||
}, {deep: true, immediate: true});
|
||||
</script>
|
||||
|
||||
@@ -22,9 +22,9 @@
|
||||
</template>
|
||||
|
||||
<script lang="ts" setup>
|
||||
import {computed,PropType} from "vue";
|
||||
import {computed, PropType, watch} from "vue";
|
||||
|
||||
import type {Chart} from "../composables/useDashboards";
|
||||
import {Chart, getDashboard} from "../composables/useDashboards";
|
||||
import {useChartGenerator} from "../composables/useDashboards";
|
||||
|
||||
|
||||
@@ -183,7 +183,19 @@
|
||||
};
|
||||
});
|
||||
|
||||
const {data: generated} = useChartGenerator(props);
|
||||
const {data: generated, generate} = useChartGenerator(props);
|
||||
|
||||
function refresh() {
|
||||
return generate(getDashboard(route, "id")!);
|
||||
}
|
||||
|
||||
defineExpose({
|
||||
refresh
|
||||
});
|
||||
|
||||
watch(() => route.params.filters, () => {
|
||||
refresh();
|
||||
}, {deep: true});
|
||||
</script>
|
||||
|
||||
<style lang="scss" scoped>
|
||||
@@ -192,4 +204,4 @@
|
||||
.chart {
|
||||
max-height: $height;
|
||||
}
|
||||
</style>
|
||||
</style>
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user