mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-26 14:00:23 -05:00
Compare commits
187 Commits
fix/filter
...
issue/4659
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
343c66f125 | ||
|
|
577f813eef | ||
|
|
06a9f13676 | ||
|
|
1fd6e23f96 | ||
|
|
9a32780c8c | ||
|
|
af140baa66 | ||
|
|
54b0183b95 | ||
|
|
64de3d5fa8 | ||
|
|
4c17aadb81 | ||
|
|
bf424fbf53 | ||
|
|
edcdb88559 | ||
|
|
9a9d0b995a | ||
|
|
5c5d313fb0 | ||
|
|
dfd4d87867 | ||
|
|
367d773a86 | ||
|
|
c819f15c66 | ||
|
|
673b5c994c | ||
|
|
2acf37e0e6 | ||
|
|
0d7fcbb936 | ||
|
|
42b01d6951 | ||
|
|
9edfb01920 | ||
|
|
7813337f48 | ||
|
|
ea0342f82a | ||
|
|
ca8f25108e | ||
|
|
49b6c331a6 | ||
|
|
e409fb7ac0 | ||
|
|
0b64c29794 | ||
|
|
c4665460aa | ||
|
|
5423b6e3a7 | ||
|
|
114669e1b5 | ||
|
|
d75f0ced38 | ||
|
|
0a788d8429 | ||
|
|
8c25d1bbd7 | ||
|
|
4e2e8f294f | ||
|
|
2c34804ce2 | ||
|
|
bab4eef790 | ||
|
|
94aa628ac1 | ||
|
|
da180fbc00 | ||
|
|
c7bd592bc7 | ||
|
|
693d174960 | ||
|
|
8ee492b9c5 | ||
|
|
d6b8ba34ea | ||
|
|
08cc853e00 | ||
|
|
4f68715483 | ||
|
|
edde1b6730 | ||
|
|
399446f52e | ||
|
|
c717890fbc | ||
|
|
5328b0c574 | ||
|
|
de14cae1f0 | ||
|
|
d8a3e703e7 | ||
|
|
90659bc320 | ||
|
|
37d1d8856e | ||
|
|
93a4eb5cbc | ||
|
|
de160c8a2d | ||
|
|
28458b59eb | ||
|
|
2a256d9505 | ||
|
|
9008b21007 | ||
|
|
8c13bf6a71 | ||
|
|
43888cc3dd | ||
|
|
c94093d9f6 | ||
|
|
8779dec28a | ||
|
|
41614c3a6e | ||
|
|
6b4fdd0688 | ||
|
|
0319f3d267 | ||
|
|
0b37fe2cb8 | ||
|
|
e623dd7729 | ||
|
|
db4f7cb4ff | ||
|
|
b14b16db0e | ||
|
|
77f6cec0e4 | ||
|
|
1748b18d66 | ||
|
|
32f96348c1 | ||
|
|
07db0a8c80 | ||
|
|
2035fd42c3 | ||
|
|
2856bf07e8 | ||
|
|
f5327cec33 | ||
|
|
42955936b2 | ||
|
|
771b98e023 | ||
|
|
b80e8487e3 | ||
|
|
f35a0b6d60 | ||
|
|
0c9ed17f1c | ||
|
|
7ca20371f8 | ||
|
|
8ff3454cbd | ||
|
|
09593d9fd2 | ||
|
|
d3cccf36f0 | ||
|
|
eeb91cd9ed | ||
|
|
2679b0f067 | ||
|
|
54281864c8 | ||
|
|
e4f9b11d0c | ||
|
|
12cef0593c | ||
|
|
c6cf8f307f | ||
|
|
3b4eb55f84 | ||
|
|
d32949985d | ||
|
|
c051ca2e66 | ||
|
|
93a456963b | ||
|
|
9a45f17680 | ||
|
|
5fb6806d74 | ||
|
|
f3cff72edd | ||
|
|
0abc660e7d | ||
|
|
f09ca3d92e | ||
|
|
9fd778fca1 | ||
|
|
667af25e1b | ||
|
|
1b1aed5ff1 | ||
|
|
da1bb58199 | ||
|
|
d3e661f9f8 | ||
|
|
2126c8815e | ||
|
|
6cfc5b8799 | ||
|
|
16d44034f0 | ||
|
|
f76e62a4af | ||
|
|
f6645da94c | ||
|
|
93b2bbf0d0 | ||
|
|
9d46e2aece | ||
|
|
133315a2a5 | ||
|
|
b96b9bb414 | ||
|
|
9865d8a7dc | ||
|
|
29f22c2f81 | ||
|
|
3e69469381 | ||
|
|
38c24ccf7f | ||
|
|
12cf41a309 | ||
|
|
7b8ea0d885 | ||
|
|
cf88bbcb12 | ||
|
|
6abe7f96e7 | ||
|
|
e73ac78d8b | ||
|
|
b0687eb702 | ||
|
|
85f9070f56 | ||
|
|
0a42ab40ec | ||
|
|
856d2d1d51 | ||
|
|
a7d6dbc8a3 | ||
|
|
cf82109da6 | ||
|
|
d4168ba424 | ||
|
|
46a294f25a | ||
|
|
a229036d8d | ||
|
|
a518fefecd | ||
|
|
1d3210fd7d | ||
|
|
597f84ecb7 | ||
|
|
5f3c7ac9f0 | ||
|
|
77c4691b04 | ||
|
|
6d34416529 | ||
|
|
40a67d5dcd | ||
|
|
2c68c704f6 | ||
|
|
e59d9f622c | ||
|
|
c951ba39a7 | ||
|
|
a0200cfacb | ||
|
|
c6310f0697 | ||
|
|
21ba59a525 | ||
|
|
4f9e3cd06c | ||
|
|
e74010d1a4 | ||
|
|
465e6467e9 | ||
|
|
c68c1b16d9 | ||
|
|
468c32156e | ||
|
|
6e0a1c61ef | ||
|
|
552d55ef6b | ||
|
|
08b0b682bf | ||
|
|
cff90c93bb | ||
|
|
ea465056d0 | ||
|
|
02f150f0b0 | ||
|
|
95d95d3d3c | ||
|
|
6b8d3d6928 | ||
|
|
1e347073ca | ||
|
|
ac09dcecd9 | ||
|
|
40b337cd22 | ||
|
|
5377d16036 | ||
|
|
f717bc413f | ||
|
|
d6bed2d235 | ||
|
|
07fd74b238 | ||
|
|
60eef29de2 | ||
|
|
20ca7b6380 | ||
|
|
9d82df61c6 | ||
|
|
e78210b5eb | ||
|
|
83143fae83 | ||
|
|
25f5ccc6b5 | ||
|
|
cf3e49a284 | ||
|
|
9a72d378df | ||
|
|
752a927fac | ||
|
|
4053392921 | ||
|
|
8b0483643a | ||
|
|
5feeb41c7a | ||
|
|
d7f5e5c05d | ||
|
|
4840f723fc | ||
|
|
8cf159b281 | ||
|
|
4c79576113 | ||
|
|
f87f2ed753 | ||
|
|
298a6c7ca8 | ||
|
|
ab464fff6e | ||
|
|
6dcba16314 | ||
|
|
80a328e87e | ||
|
|
f2034f4975 | ||
|
|
edca56d168 |
54
.github/actions/plugins-list/action.yml
vendored
54
.github/actions/plugins-list/action.yml
vendored
@@ -1,20 +1,28 @@
|
||||
name: 'Load Kestra Plugin List'
|
||||
description: 'Composite action to load list of plugins'
|
||||
description: 'Composite action to load list of plugins (from .plugins) and output repositories and GA coordinates'
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
description: "Kestra version placeholder to replace LATEST in GA coordinates"
|
||||
default: 'LATEST'
|
||||
required: true
|
||||
plugin-file:
|
||||
description: "File of the plugins"
|
||||
description: "Path to the .plugins file"
|
||||
default: './.plugins'
|
||||
required: true
|
||||
include:
|
||||
description: "Regex include filter applied on repository names"
|
||||
required: false
|
||||
default: ''
|
||||
exclude:
|
||||
description: "Regex exclude filter applied on repository names"
|
||||
required: false
|
||||
default: ''
|
||||
outputs:
|
||||
plugins:
|
||||
description: "List of all Kestra plugins"
|
||||
description: "Space-separated list of GA coordinates (group:artifact:version)"
|
||||
value: ${{ steps.plugins.outputs.plugins }}
|
||||
repositories:
|
||||
description: "List of all Kestra repositories of plugins"
|
||||
description: "Space-separated list of repository names (e.g., plugin-ai plugin-airbyte)"
|
||||
value: ${{ steps.plugins.outputs.repositories }}
|
||||
runs:
|
||||
using: composite
|
||||
@@ -23,7 +31,35 @@ runs:
|
||||
id: plugins
|
||||
shell: bash
|
||||
run: |
|
||||
PLUGINS=$([ -f ${{ inputs.plugin-file }} ] && cat ${{ inputs.plugin-file }} | grep "io\\.kestra\\." | sed -e '/#/s/^.//' | sed -e "s/LATEST/${{ inputs.plugin-version }}/g" | cut -d':' -f2- | xargs || echo '');
|
||||
REPOSITORIES=$([ -f ${{ inputs.plugin-file }} ] && cat ${{ inputs.plugin-file }} | grep "io\\.kestra\\." | sed -e '/#/s/^.//' | cut -d':' -f1 | uniq | sort | xargs || echo '')
|
||||
echo "plugins=$PLUGINS" >> $GITHUB_OUTPUT
|
||||
echo "repositories=$REPOSITORIES" >> $GITHUB_OUTPUT
|
||||
set -euo pipefail
|
||||
|
||||
# Read only uncommented lines that contain io.kestra.* coordinates.
|
||||
# This avoids the previous approach that 'uncommented' lines by stripping the first char after '#'.
|
||||
if [[ -f "${{ inputs.plugin-file }}" ]]; then
|
||||
ENABLED_LINES=$(grep -E '^\s*[^#]' "${{ inputs.plugin-file }}" | grep "io\.kestra\." || true)
|
||||
else
|
||||
ENABLED_LINES=""
|
||||
fi
|
||||
|
||||
# Build GA coordinates by replacing LATEST with the provided plugin-version (if present)
|
||||
PLUGINS=$(echo "$ENABLED_LINES" \
|
||||
| sed -e "s/LATEST/${{ inputs.plugin-version }}/g" \
|
||||
| cut -d':' -f2- \
|
||||
| xargs || echo '')
|
||||
|
||||
# Extract repository names (first column), unique + sorted
|
||||
REPOSITORIES=$(echo "$ENABLED_LINES" \
|
||||
| cut -d':' -f1 \
|
||||
| uniq | sort \
|
||||
| xargs || echo '')
|
||||
|
||||
# Apply include/exclude filters if provided (POSIX ERE via grep -E)
|
||||
if [ -n "${{ inputs.include }}" ] && [ -n "$REPOSITORIES" ]; then
|
||||
REPOSITORIES=$(echo "$REPOSITORIES" | xargs -n1 | grep -E "${{ inputs.include }}" | xargs || true)
|
||||
fi
|
||||
if [ -n "${{ inputs.exclude }}" ] && [ -n "$REPOSITORIES" ]; then
|
||||
REPOSITORIES=$(echo "$REPOSITORIES" | xargs -n1 | grep -Ev "${{ inputs.exclude }}" | xargs || true)
|
||||
fi
|
||||
|
||||
echo "plugins=$PLUGINS" >> "$GITHUB_OUTPUT"
|
||||
echo "repositories=$REPOSITORIES" >> "$GITHUB_OUTPUT"
|
||||
|
||||
37
.github/workflows/docker.yml
vendored
37
.github/workflows/docker.yml
vendored
@@ -20,6 +20,15 @@ on:
|
||||
required: false
|
||||
type: string
|
||||
default: "LATEST"
|
||||
force-download-artifact:
|
||||
description: 'Force download artifact'
|
||||
required: false
|
||||
type: string
|
||||
default: "true"
|
||||
options:
|
||||
- "true"
|
||||
- "false"
|
||||
|
||||
env:
|
||||
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
jobs:
|
||||
@@ -38,9 +47,18 @@ jobs:
|
||||
id: plugins
|
||||
with:
|
||||
plugin-version: ${{ env.PLUGIN_VERSION }}
|
||||
|
||||
# ********************************************************************************************************************
|
||||
# Build
|
||||
# ********************************************************************************************************************
|
||||
build-artifacts:
|
||||
name: Build Artifacts
|
||||
if: ${{ github.event.inputs.force-download-artifact == 'true' }}
|
||||
uses: ./.github/workflows/workflow-build-artifacts.yml
|
||||
|
||||
docker:
|
||||
name: Publish Docker
|
||||
needs: [ plugins ]
|
||||
needs: [ plugins, build-artifacts ]
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
@@ -73,14 +91,27 @@ jobs:
|
||||
else
|
||||
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
# Download release
|
||||
- name: Download release
|
||||
|
||||
# [workflow_dispatch]
|
||||
# Download executable from GitHub Release
|
||||
- name: Artifacts - Download release (workflow_dispatch)
|
||||
id: download-github-release
|
||||
if: github.event_name == 'workflow_dispatch' && github.event.inputs.force-download-artifact == 'false'
|
||||
uses: robinraju/release-downloader@v1.12
|
||||
with:
|
||||
tag: ${{steps.vars.outputs.tag}}
|
||||
fileName: 'kestra-*'
|
||||
out-file-path: build/executable
|
||||
|
||||
# [workflow_call]
|
||||
# Download executable from artifact
|
||||
- name: Artifacts - Download executable
|
||||
if: github.event_name != 'workflow_dispatch' || steps.download-github-release.outcome == 'skipped'
|
||||
uses: actions/download-artifact@v5
|
||||
with:
|
||||
name: exe
|
||||
path: build/executable
|
||||
|
||||
- name: Copy exe to image
|
||||
run: |
|
||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||
|
||||
148
.github/workflows/gradle-release-plugins.yml
vendored
148
.github/workflows/gradle-release-plugins.yml
vendored
@@ -15,24 +15,111 @@ on:
|
||||
description: 'Use DRY_RUN mode'
|
||||
required: false
|
||||
default: 'false'
|
||||
type: choice
|
||||
options: ['false', 'true']
|
||||
repositories:
|
||||
description: 'Space-separated repo names to release (e.g. "plugin-ai plugin-airbyte"). If empty, uses .plugins.'
|
||||
required: false
|
||||
type: string
|
||||
include:
|
||||
description: 'Regex include filter on repo names (applied when using .plugins)'
|
||||
required: false
|
||||
type: string
|
||||
exclude:
|
||||
description: 'Regex exclude filter on repo names (applied when using .plugins)'
|
||||
required: false
|
||||
type: string
|
||||
onlyChanged:
|
||||
description: 'Release only repos changed since last tag (or sinceTag if provided)'
|
||||
required: false
|
||||
default: 'false'
|
||||
type: choice
|
||||
options: ['false', 'true']
|
||||
sinceTag:
|
||||
description: 'Optional tag used as base for change detection (e.g. v0.24.0)'
|
||||
required: false
|
||||
type: string
|
||||
|
||||
jobs:
|
||||
release:
|
||||
name: Release plugins
|
||||
prepare:
|
||||
name: Compute target repositories
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
matrix: ${{ steps.compute.outputs.matrix }}
|
||||
steps:
|
||||
# Checkout
|
||||
# Checkout the current repo (assumed to contain .plugins and the workflow)
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
# Checkout the kestra-io/actions repo (for setup-build, etc.)
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
ref: main
|
||||
|
||||
# Setup build
|
||||
- name: Install tools
|
||||
run: sudo apt-get update && sudo apt-get install -y jq
|
||||
|
||||
# Load repositories from .plugins (only uncommented lines) with optional include/exclude filters
|
||||
- name: Get Plugins List
|
||||
id: plugins-list
|
||||
uses: ./.github/actions/plugins-list
|
||||
with:
|
||||
plugin-version: 'LATEST'
|
||||
plugin-file: './.plugins'
|
||||
include: ${{ github.event.inputs.include }}
|
||||
exclude: ${{ github.event.inputs.exclude }}
|
||||
|
||||
# Finalize repo list:
|
||||
# - If "repositories" input is provided, it takes precedence.
|
||||
# - Otherwise, use the filtered list from the composite action.
|
||||
- name: Build repo list
|
||||
id: build-list
|
||||
shell: bash
|
||||
env:
|
||||
INP_REPOS: ${{ github.event.inputs.repositories }}
|
||||
run: |
|
||||
set -euo pipefail
|
||||
if [ -n "${INP_REPOS:-}" ]; then
|
||||
LIST="${INP_REPOS}"
|
||||
else
|
||||
LIST="${{ steps.plugins-list.outputs.repositories }}"
|
||||
fi
|
||||
# Convert to JSON array for matrix
|
||||
arr_json=$(printf '%s\n' $LIST | jq -R . | jq -s .)
|
||||
echo "list=$LIST" >> "$GITHUB_OUTPUT"
|
||||
echo "arr_json=$arr_json" >> "$GITHUB_OUTPUT"
|
||||
|
||||
- name: Compute matrix
|
||||
id: compute
|
||||
shell: bash
|
||||
run: |
|
||||
set -euo pipefail
|
||||
echo "matrix={\"repo\": ${{ steps.build-list.outputs.arr_json }}}" >> "$GITHUB_OUTPUT"
|
||||
|
||||
release:
|
||||
name: Release ${{ matrix.repo }}
|
||||
needs: [prepare]
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix: ${{ fromJson(needs.prepare.outputs.matrix) }}
|
||||
steps:
|
||||
# Checkout the current repo (for dev-tools/release-plugins.sh)
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout the kestra-io/actions repo (for setup-build, etc.)
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
ref: main
|
||||
|
||||
# Build toolchain used by plugin builds
|
||||
- uses: ./actions/.github/actions/setup-build
|
||||
id: build
|
||||
with:
|
||||
@@ -41,42 +128,45 @@ jobs:
|
||||
python-enabled: true
|
||||
caches-enabled: true
|
||||
|
||||
# Get Plugins List
|
||||
- name: Get Plugins List
|
||||
uses: ./.github/actions/plugins-list
|
||||
id: plugins-list
|
||||
with:
|
||||
plugin-version: 'LATEST'
|
||||
|
||||
- name: 'Configure Git'
|
||||
- name: Configure Git
|
||||
run: |
|
||||
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
|
||||
git config --global user.name "github-actions[bot]"
|
||||
|
||||
# Execute
|
||||
- name: Run Gradle Release
|
||||
if: ${{ github.event.inputs.dryRun == 'false' }}
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
chmod +x ./dev-tools/release-plugins.sh;
|
||||
|
||||
./dev-tools/release-plugins.sh \
|
||||
--release-version=${{github.event.inputs.releaseVersion}} \
|
||||
--next-version=${{github.event.inputs.nextVersion}} \
|
||||
--yes \
|
||||
${{ steps.plugins-list.outputs.repositories }}
|
||||
chmod +x ./dev-tools/release-plugins.sh
|
||||
ARGS=()
|
||||
ARGS+=(--release-version="${{ github.event.inputs.releaseVersion }}")
|
||||
ARGS+=(--next-version="${{ github.event.inputs.nextVersion }}")
|
||||
ARGS+=(--yes)
|
||||
if [ "${{ github.event.inputs.onlyChanged }}" = "true" ]; then
|
||||
ARGS+=(--only-changed)
|
||||
fi
|
||||
if [ -n "${{ github.event.inputs.sinceTag }}" ]; then
|
||||
ARGS+=(--since-tag="${{ github.event.inputs.sinceTag }}")
|
||||
fi
|
||||
./dev-tools/release-plugins.sh "${ARGS[@]}" "${{ matrix.repo }}"
|
||||
|
||||
# Dry-run release
|
||||
- name: Run Gradle Release (DRY_RUN)
|
||||
if: ${{ github.event.inputs.dryRun == 'true' }}
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
chmod +x ./dev-tools/release-plugins.sh;
|
||||
|
||||
./dev-tools/release-plugins.sh \
|
||||
--release-version=${{github.event.inputs.releaseVersion}} \
|
||||
--next-version=${{github.event.inputs.nextVersion}} \
|
||||
--dry-run \
|
||||
--yes \
|
||||
${{ steps.plugins-list.outputs.repositories }}
|
||||
chmod +x ./dev-tools/release-plugins.sh
|
||||
ARGS=()
|
||||
ARGS+=(--release-version="${{ github.event.inputs.releaseVersion }}")
|
||||
ARGS+=(--next-version="${{ github.event.inputs.nextVersion }}")
|
||||
ARGS+=(--dry-run)
|
||||
ARGS+=(--yes)
|
||||
if [ "${{ github.event.inputs.onlyChanged }}" = "true" ]; then
|
||||
ARGS+=(--only-changed)
|
||||
fi
|
||||
if [ -n "${{ github.event.inputs.sinceTag }}" ]; then
|
||||
ARGS+=(--since-tag="${{ github.event.inputs.sinceTag }}")
|
||||
fi
|
||||
./dev-tools/release-plugins.sh "${ARGS[@]}" "${{ matrix.repo }}"
|
||||
|
||||
3
.github/workflows/main.yml
vendored
3
.github/workflows/main.yml
vendored
@@ -43,7 +43,8 @@ jobs:
|
||||
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
|
||||
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
|
||||
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
|
||||
|
||||
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
|
||||
end:
|
||||
runs-on: ubuntu-latest
|
||||
needs:
|
||||
|
||||
74
.github/workflows/workflow-build-artifacts.yml
vendored
74
.github/workflows/workflow-build-artifacts.yml
vendored
@@ -1,23 +1,7 @@
|
||||
name: Build Artifacts
|
||||
|
||||
on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
default: 'LATEST'
|
||||
required: true
|
||||
type: string
|
||||
outputs:
|
||||
docker-tag:
|
||||
value: ${{ jobs.build.outputs.docker-tag }}
|
||||
description: "The Docker image Tag for Kestra"
|
||||
docker-artifact-name:
|
||||
value: ${{ jobs.build.outputs.docker-artifact-name }}
|
||||
description: "The GitHub artifact containing the Kestra docker image name."
|
||||
plugins:
|
||||
value: ${{ jobs.build.outputs.plugins }}
|
||||
description: "The Kestra plugins list used for the build."
|
||||
workflow_call: {}
|
||||
|
||||
jobs:
|
||||
build:
|
||||
@@ -82,55 +66,6 @@ jobs:
|
||||
run: |
|
||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||
|
||||
# Docker Tag
|
||||
- name: Setup - Docker vars
|
||||
id: vars
|
||||
shell: bash
|
||||
run: |
|
||||
TAG=${GITHUB_REF#refs/*/}
|
||||
if [[ $TAG = "master" ]]
|
||||
then
|
||||
TAG="latest";
|
||||
elif [[ $TAG = "develop" ]]
|
||||
then
|
||||
TAG="develop";
|
||||
elif [[ $TAG = v* ]]
|
||||
then
|
||||
TAG="${TAG}";
|
||||
else
|
||||
TAG="build-${{ github.run_id }}";
|
||||
fi
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
echo "artifact=docker-kestra-${TAG}" >> $GITHUB_OUTPUT
|
||||
|
||||
# Docker setup
|
||||
- name: Docker - Setup QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Docker - Fix Qemu
|
||||
shell: bash
|
||||
run: |
|
||||
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
|
||||
|
||||
- name: Docker - Setup Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
# Docker Build
|
||||
- name: Docker - Build & export image
|
||||
uses: docker/build-push-action@v6
|
||||
if: "!startsWith(github.ref, 'refs/tags/v')"
|
||||
with:
|
||||
context: .
|
||||
push: false
|
||||
file: Dockerfile
|
||||
tags: |
|
||||
kestra/kestra:${{ steps.vars.outputs.tag }}
|
||||
build-args: |
|
||||
KESTRA_PLUGINS=${{ steps.plugins.outputs.plugins }}
|
||||
APT_PACKAGES=${{ env.DOCKER_APT_PACKAGES }}
|
||||
PYTHON_LIBRARIES=${{ env.DOCKER_PYTHON_LIBRARIES }}
|
||||
outputs: type=docker,dest=/tmp/${{ steps.vars.outputs.artifact }}.tar
|
||||
|
||||
# Upload artifacts
|
||||
- name: Artifacts - Upload JAR
|
||||
uses: actions/upload-artifact@v4
|
||||
@@ -143,10 +78,3 @@ jobs:
|
||||
with:
|
||||
name: exe
|
||||
path: build/executable/
|
||||
|
||||
- name: Artifacts - Upload Docker
|
||||
uses: actions/upload-artifact@v4
|
||||
if: "!startsWith(github.ref, 'refs/tags/v')"
|
||||
with:
|
||||
name: ${{ steps.vars.outputs.artifact }}
|
||||
path: /tmp/${{ steps.vars.outputs.artifact }}.tar
|
||||
|
||||
11
.github/workflows/workflow-github-release.yml
vendored
11
.github/workflows/workflow-github-release.yml
vendored
@@ -1,14 +1,17 @@
|
||||
name: Github - Release
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
workflow_call:
|
||||
secrets:
|
||||
GH_PERSONAL_TOKEN:
|
||||
description: "The Github personal token."
|
||||
required: true
|
||||
push:
|
||||
tags:
|
||||
- '*'
|
||||
SLACK_RELEASES_WEBHOOK_URL:
|
||||
description: "The Slack webhook URL."
|
||||
required: true
|
||||
|
||||
|
||||
|
||||
jobs:
|
||||
publish:
|
||||
@@ -35,7 +38,7 @@ jobs:
|
||||
# Download Exec
|
||||
# Must be done after checkout actions
|
||||
- name: Artifacts - Download executable
|
||||
uses: actions/download-artifact@v4
|
||||
uses: actions/download-artifact@v5
|
||||
if: startsWith(github.ref, 'refs/tags/v')
|
||||
with:
|
||||
name: exe
|
||||
|
||||
@@ -41,8 +41,6 @@ jobs:
|
||||
name: Build Artifacts
|
||||
if: ${{ github.event.inputs.force-download-artifact == 'true' }}
|
||||
uses: ./.github/workflows/workflow-build-artifacts.yml
|
||||
with:
|
||||
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
# ********************************************************************************************************************
|
||||
# Docker
|
||||
# ********************************************************************************************************************
|
||||
@@ -122,7 +120,7 @@ jobs:
|
||||
|
||||
# Build Docker Image
|
||||
- name: Artifacts - Download executable
|
||||
uses: actions/download-artifact@v4
|
||||
uses: actions/download-artifact@v5
|
||||
with:
|
||||
name: exe
|
||||
path: build/executable
|
||||
|
||||
15
.github/workflows/workflow-pullrequest-delete-docker.yml
vendored
Normal file
15
.github/workflows/workflow-pullrequest-delete-docker.yml
vendored
Normal file
@@ -0,0 +1,15 @@
|
||||
name: Pull Request - Delete Docker
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
types: [closed]
|
||||
|
||||
jobs:
|
||||
publish:
|
||||
name: Pull Request - Delete Docker
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: dataaxiom/ghcr-cleanup-action@v1
|
||||
with:
|
||||
package: kestra-pr
|
||||
delete-tags: ${{ github.event.pull_request.number }}
|
||||
76
.github/workflows/workflow-pullrequest-publish-docker.yml
vendored
Normal file
76
.github/workflows/workflow-pullrequest-publish-docker.yml
vendored
Normal file
@@ -0,0 +1,76 @@
|
||||
name: Pull Request - Publish Docker
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
branches:
|
||||
- develop
|
||||
|
||||
jobs:
|
||||
build-artifacts:
|
||||
name: Build Artifacts
|
||||
uses: ./.github/workflows/workflow-build-artifacts.yml
|
||||
|
||||
publish:
|
||||
name: Publish Docker
|
||||
runs-on: ubuntu-latest
|
||||
needs: build-artifacts
|
||||
env:
|
||||
GITHUB_IMAGE_PATH: "ghcr.io/kestra-io/kestra-pr"
|
||||
steps:
|
||||
- name: Checkout - Current ref
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Docker setup
|
||||
- name: Docker - Setup QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Docker - Setup Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
# Docker Login
|
||||
- name: Login to GHCR
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: ghcr.io
|
||||
username: ${{ github.actor }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
# Build Docker Image
|
||||
- name: Artifacts - Download executable
|
||||
uses: actions/download-artifact@v5
|
||||
with:
|
||||
name: exe
|
||||
path: build/executable
|
||||
|
||||
- name: Docker - Copy exe to image
|
||||
shell: bash
|
||||
run: |
|
||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||
|
||||
- name: Docker - Build image
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
context: .
|
||||
file: ./Dockerfile.pr
|
||||
push: true
|
||||
tags: ${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }}
|
||||
platforms: linux/amd64,linux/arm64
|
||||
|
||||
# Add comment on pull request
|
||||
- name: Add comment to PR
|
||||
uses: actions/github-script@v7
|
||||
with:
|
||||
github-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
script: |
|
||||
await github.rest.issues.createComment({
|
||||
issue_number: context.issue.number,
|
||||
owner: context.repo.owner,
|
||||
repo: context.repo.repo,
|
||||
body: `**🐋 Docker image**: \`${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }}\`\n` +
|
||||
`\n` +
|
||||
`\`\`\`bash\n` +
|
||||
`docker run --pull=always --rm -it -p 8080:8080 --user=root -v /var/run/docker.sock:/var/run/docker.sock -v /tmp:/tmp ${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }} server local\n` +
|
||||
`\`\`\``
|
||||
})
|
||||
11
.github/workflows/workflow-release.yml
vendored
11
.github/workflows/workflow-release.yml
vendored
@@ -42,12 +42,16 @@ on:
|
||||
SONATYPE_GPG_FILE:
|
||||
description: "The Sonatype GPG file."
|
||||
required: true
|
||||
GH_PERSONAL_TOKEN:
|
||||
description: "GH personnal Token."
|
||||
required: true
|
||||
SLACK_RELEASES_WEBHOOK_URL:
|
||||
description: "Slack webhook for releases channel."
|
||||
required: true
|
||||
jobs:
|
||||
build-artifacts:
|
||||
name: Build - Artifacts
|
||||
uses: ./.github/workflows/workflow-build-artifacts.yml
|
||||
with:
|
||||
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
|
||||
Docker:
|
||||
name: Publish Docker
|
||||
@@ -77,4 +81,5 @@ jobs:
|
||||
if: startsWith(github.ref, 'refs/tags/v')
|
||||
uses: ./.github/workflows/workflow-github-release.yml
|
||||
secrets:
|
||||
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
|
||||
7
.plugins
7
.plugins
@@ -19,6 +19,7 @@
|
||||
#plugin-databricks:io.kestra.plugin:plugin-databricks:LATEST
|
||||
#plugin-datahub:io.kestra.plugin:plugin-datahub:LATEST
|
||||
#plugin-dataform:io.kestra.plugin:plugin-dataform:LATEST
|
||||
#plugin-datagen:io.kestra.plugin:plugin-datagen:LATEST
|
||||
#plugin-dbt:io.kestra.plugin:plugin-dbt:LATEST
|
||||
#plugin-debezium:io.kestra.plugin:plugin-debezium-db2:LATEST
|
||||
#plugin-debezium:io.kestra.plugin:plugin-debezium-mongodb:LATEST
|
||||
@@ -26,6 +27,7 @@
|
||||
#plugin-debezium:io.kestra.plugin:plugin-debezium-oracle:LATEST
|
||||
#plugin-debezium:io.kestra.plugin:plugin-debezium-postgres:LATEST
|
||||
#plugin-debezium:io.kestra.plugin:plugin-debezium-sqlserver:LATEST
|
||||
#plugin-deepseek:io.kestra.plugin:plugin-deepseek:LATEST
|
||||
#plugin-docker:io.kestra.plugin:plugin-docker:LATEST
|
||||
#plugin-elasticsearch:io.kestra.plugin:plugin-elasticsearch:LATEST
|
||||
#plugin-fivetran:io.kestra.plugin:plugin-fivetran:LATEST
|
||||
@@ -86,13 +88,18 @@
|
||||
#plugin-powerbi:io.kestra.plugin:plugin-powerbi:LATEST
|
||||
#plugin-pulsar:io.kestra.plugin:plugin-pulsar:LATEST
|
||||
#plugin-redis:io.kestra.plugin:plugin-redis:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-bun:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-deno:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-go:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-groovy:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-jbang:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-julia:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-jython:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-lua:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-nashorn:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-node:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-perl:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-php:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-powershell:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-python:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-r:LATEST
|
||||
|
||||
7
Dockerfile.pr
Normal file
7
Dockerfile.pr
Normal file
@@ -0,0 +1,7 @@
|
||||
FROM kestra/kestra:develop
|
||||
|
||||
USER root
|
||||
|
||||
COPY --chown=kestra:kestra docker /
|
||||
|
||||
USER kestra
|
||||
@@ -65,10 +65,6 @@ Kestra is an open-source, event-driven orchestration platform that makes both **
|
||||
|
||||
## 🚀 Quick Start
|
||||
|
||||
### Try the Live Demo
|
||||
|
||||
Try Kestra with our [**Live Demo**](https://demo.kestra.io/ui/login?auto). No installation required!
|
||||
|
||||
### Get Started Locally in 5 Minutes
|
||||
|
||||
#### Launch Kestra in Docker
|
||||
|
||||
50
build.gradle
50
build.gradle
@@ -16,7 +16,7 @@ plugins {
|
||||
id "java"
|
||||
id 'java-library'
|
||||
id "idea"
|
||||
id "com.gradleup.shadow" version "8.3.8"
|
||||
id "com.gradleup.shadow" version "8.3.9"
|
||||
id "application"
|
||||
|
||||
// test
|
||||
@@ -225,14 +225,14 @@ subprojects {
|
||||
}
|
||||
|
||||
testlogger {
|
||||
theme 'mocha-parallel'
|
||||
showExceptions true
|
||||
showFullStackTraces true
|
||||
showCauses true
|
||||
slowThreshold 2000
|
||||
showStandardStreams true
|
||||
showPassedStandardStreams false
|
||||
showSkippedStandardStreams true
|
||||
theme = 'mocha-parallel'
|
||||
showExceptions = true
|
||||
showFullStackTraces = true
|
||||
showCauses = true
|
||||
slowThreshold = 2000
|
||||
showStandardStreams = true
|
||||
showPassedStandardStreams = false
|
||||
showSkippedStandardStreams = true
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -410,7 +410,7 @@ jar {
|
||||
shadowJar {
|
||||
archiveClassifier.set(null)
|
||||
mergeServiceFiles()
|
||||
zip64 true
|
||||
zip64 = true
|
||||
}
|
||||
|
||||
distZip.dependsOn shadowJar
|
||||
@@ -427,8 +427,8 @@ def executableDir = layout.buildDirectory.dir("executable")
|
||||
def executable = layout.buildDirectory.file("executable/${project.name}-${project.version}").get().asFile
|
||||
|
||||
tasks.register('writeExecutableJar') {
|
||||
group "build"
|
||||
description "Write an executable jar from shadow jar"
|
||||
group = "build"
|
||||
description = "Write an executable jar from shadow jar"
|
||||
dependsOn = [shadowJar]
|
||||
|
||||
final shadowJarFile = tasks.shadowJar.outputs.files.singleFile
|
||||
@@ -454,8 +454,8 @@ tasks.register('writeExecutableJar') {
|
||||
}
|
||||
|
||||
tasks.register('executableJar', Zip) {
|
||||
group "build"
|
||||
description "Zip the executable jar"
|
||||
group = "build"
|
||||
description = "Zip the executable jar"
|
||||
dependsOn = [writeExecutableJar]
|
||||
|
||||
archiveFileName = "${project.name}-${project.version}.zip"
|
||||
@@ -620,6 +620,28 @@ subprojects {subProject ->
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (subProject.name != 'platform' && subProject.name != 'cli') {
|
||||
// only if a test source set actually exists (avoids empty artifacts)
|
||||
def hasTests = subProject.extensions.findByName('sourceSets')?.findByName('test') != null
|
||||
|
||||
if (hasTests) {
|
||||
// wire the artifact onto every Maven publication of this subproject
|
||||
publishing {
|
||||
publications {
|
||||
withType(MavenPublication).configureEach { pub ->
|
||||
// keep the normal java component + sources/javadoc already configured
|
||||
pub.artifact(subProject.tasks.named('testsJar').get())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// make sure publish tasks build the tests jar first
|
||||
tasks.matching { it.name.startsWith('publish') }.configureEach {
|
||||
dependsOn subProject.tasks.named('testsJar')
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -16,6 +16,6 @@ abstract public class AbstractServerCommand extends AbstractCommand implements S
|
||||
}
|
||||
|
||||
protected static int defaultWorkerThread() {
|
||||
return Runtime.getRuntime().availableProcessors() * 4;
|
||||
return Runtime.getRuntime().availableProcessors() * 8;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -48,7 +48,7 @@ public class StandAloneCommand extends AbstractServerCommand {
|
||||
@CommandLine.Option(names = "--tenant", description = "Tenant identifier, Required to load flows from path with the enterprise edition")
|
||||
private String tenantId;
|
||||
|
||||
@CommandLine.Option(names = {"--worker-thread"}, description = "the number of worker threads, defaults to four times the number of available processors. Set it to 0 to avoid starting a worker.")
|
||||
@CommandLine.Option(names = {"--worker-thread"}, description = "the number of worker threads, defaults to eight times the number of available processors. Set it to 0 to avoid starting a worker.")
|
||||
private int workerThread = defaultWorkerThread();
|
||||
|
||||
@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "a list of execution identifiers to skip, separated by a coma; for troubleshooting purpose only")
|
||||
|
||||
@@ -22,7 +22,7 @@ public class WorkerCommand extends AbstractServerCommand {
|
||||
@Inject
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
@Option(names = {"-t", "--thread"}, description = "The max number of worker threads, defaults to four times the number of available processors")
|
||||
@Option(names = {"-t", "--thread"}, description = "The max number of worker threads, defaults to eight times the number of available processors")
|
||||
private int thread = defaultWorkerThread();
|
||||
|
||||
@Option(names = {"-g", "--worker-group"}, description = "The worker group key, must match the regex [a-zA-Z0-9_-]+ (EE only)")
|
||||
|
||||
@@ -162,7 +162,15 @@ public class FileChangedEventListener {
|
||||
}
|
||||
|
||||
} catch (NoSuchFileException e) {
|
||||
log.error("File not found: {}", entry, e);
|
||||
log.warn("File not found: {}, deleting it", entry, e);
|
||||
// the file might have been deleted while reading so if not found we try to delete the flow
|
||||
flows.stream()
|
||||
.filter(flow -> flow.getPath().equals(filePath.toString()))
|
||||
.findFirst()
|
||||
.ifPresent(flowWithPath -> {
|
||||
flowFilesManager.deleteFlow(flowWithPath.getTenantId(), flowWithPath.getNamespace(), flowWithPath.getId());
|
||||
this.flows.removeIf(fwp -> fwp.uidWithoutRevision().equals(flowWithPath.uidWithoutRevision()));
|
||||
});
|
||||
} catch (IOException e) {
|
||||
log.error("Error reading file: {}", entry, e);
|
||||
}
|
||||
|
||||
@@ -122,12 +122,13 @@ public class JsonSchemaGenerator {
|
||||
if (jsonNode instanceof ObjectNode clazzSchema && clazzSchema.get("required") instanceof ArrayNode requiredPropsNode && clazzSchema.get("properties") instanceof ObjectNode properties) {
|
||||
List<String> requiredFieldValues = StreamSupport.stream(requiredPropsNode.spliterator(), false)
|
||||
.map(JsonNode::asText)
|
||||
.toList();
|
||||
.collect(Collectors.toList());
|
||||
|
||||
properties.fields().forEachRemaining(e -> {
|
||||
int indexInRequiredArray = requiredFieldValues.indexOf(e.getKey());
|
||||
if (indexInRequiredArray != -1 && e.getValue() instanceof ObjectNode valueNode && valueNode.has("default")) {
|
||||
requiredPropsNode.remove(indexInRequiredArray);
|
||||
requiredFieldValues.remove(indexInRequiredArray);
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
package io.kestra.core.models;
|
||||
|
||||
import io.kestra.core.utils.MapUtils;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public record Label(@NotNull String key, @NotNull String value) {
|
||||
@@ -29,11 +28,36 @@ public record Label(@NotNull String key, @NotNull String value) {
|
||||
* @return the nested {@link Map}.
|
||||
*/
|
||||
public static Map<String, Object> toNestedMap(List<Label> labels) {
|
||||
Map<String, Object> asMap = labels.stream()
|
||||
return MapUtils.flattenToNestedMap(toMap(labels));
|
||||
}
|
||||
|
||||
/**
|
||||
* Static helper method for converting a list of labels to a flat map.
|
||||
* Key order is kept.
|
||||
*
|
||||
* @param labels The list of {@link Label} to be converted.
|
||||
* @return the flat {@link Map}.
|
||||
*/
|
||||
public static Map<String, String> toMap(@Nullable List<Label> labels) {
|
||||
if (labels == null || labels.isEmpty()) return Collections.emptyMap();
|
||||
return labels.stream()
|
||||
.filter(label -> label.value() != null && label.key() != null)
|
||||
// using an accumulator in case labels with the same key exists: the first is kept
|
||||
.collect(Collectors.toMap(Label::key, Label::value, (first, second) -> first));
|
||||
return MapUtils.flattenToNestedMap(asMap);
|
||||
// using an accumulator in case labels with the same key exists: the second is kept
|
||||
.collect(Collectors.toMap(Label::key, Label::value, (first, second) -> second, LinkedHashMap::new));
|
||||
}
|
||||
|
||||
/**
|
||||
* Static helper method for deduplicating a list of labels by their key.
|
||||
* Value of the last key occurrence is kept.
|
||||
*
|
||||
* @param labels The list of {@link Label} to be deduplicated.
|
||||
* @return the deduplicated {@link List}.
|
||||
*/
|
||||
public static List<Label> deduplicate(@Nullable List<Label> labels) {
|
||||
if (labels == null || labels.isEmpty()) return Collections.emptyList();
|
||||
return toMap(labels).entrySet().stream()
|
||||
.map(entry -> new Label(entry.getKey(), entry.getValue()))
|
||||
.collect(Collectors.toCollection(ArrayList::new));
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -6,9 +6,9 @@ import com.fasterxml.jackson.annotation.JsonValue;
|
||||
import io.kestra.core.exceptions.InvalidQueryFiltersException;
|
||||
import io.kestra.core.models.dashboards.filters.*;
|
||||
import io.kestra.core.utils.Enums;
|
||||
import java.util.ArrayList;
|
||||
import lombok.Builder;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -49,42 +49,27 @@ public record QueryFilter(
|
||||
PREFIX
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private List<Object> asValues(Object value) {
|
||||
return value instanceof String valueStr ? Arrays.asList(valueStr.split(",")) : (List<Object>) value;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T extends Enum<T>> AbstractFilter<T> toDashboardFilterBuilder(T field, Object value) {
|
||||
switch (this.operation) {
|
||||
case EQUALS:
|
||||
return EqualTo.<T>builder().field(field).value(value).build();
|
||||
case NOT_EQUALS:
|
||||
return NotEqualTo.<T>builder().field(field).value(value).build();
|
||||
case GREATER_THAN:
|
||||
return GreaterThan.<T>builder().field(field).value(value).build();
|
||||
case LESS_THAN:
|
||||
return LessThan.<T>builder().field(field).value(value).build();
|
||||
case GREATER_THAN_OR_EQUAL_TO:
|
||||
return GreaterThanOrEqualTo.<T>builder().field(field).value(value).build();
|
||||
case LESS_THAN_OR_EQUAL_TO:
|
||||
return LessThanOrEqualTo.<T>builder().field(field).value(value).build();
|
||||
case IN:
|
||||
return In.<T>builder().field(field).values(asValues(value)).build();
|
||||
case NOT_IN:
|
||||
return NotIn.<T>builder().field(field).values(asValues(value)).build();
|
||||
case STARTS_WITH:
|
||||
return StartsWith.<T>builder().field(field).value(value.toString()).build();
|
||||
case ENDS_WITH:
|
||||
return EndsWith.<T>builder().field(field).value(value.toString()).build();
|
||||
case CONTAINS:
|
||||
return Contains.<T>builder().field(field).value(value.toString()).build();
|
||||
case REGEX:
|
||||
return Regex.<T>builder().field(field).value(value.toString()).build();
|
||||
case PREFIX:
|
||||
return Regex.<T>builder().field(field).value("^" + value.toString().replace(".", "\\.") + "(?:\\..+)?$").build();
|
||||
default:
|
||||
throw new IllegalArgumentException("Unsupported operation: " + this.operation);
|
||||
}
|
||||
return switch (this.operation) {
|
||||
case EQUALS -> EqualTo.<T>builder().field(field).value(value).build();
|
||||
case NOT_EQUALS -> NotEqualTo.<T>builder().field(field).value(value).build();
|
||||
case GREATER_THAN -> GreaterThan.<T>builder().field(field).value(value).build();
|
||||
case LESS_THAN -> LessThan.<T>builder().field(field).value(value).build();
|
||||
case GREATER_THAN_OR_EQUAL_TO -> GreaterThanOrEqualTo.<T>builder().field(field).value(value).build();
|
||||
case LESS_THAN_OR_EQUAL_TO -> LessThanOrEqualTo.<T>builder().field(field).value(value).build();
|
||||
case IN -> In.<T>builder().field(field).values(asValues(value)).build();
|
||||
case NOT_IN -> NotIn.<T>builder().field(field).values(asValues(value)).build();
|
||||
case STARTS_WITH -> StartsWith.<T>builder().field(field).value(value.toString()).build();
|
||||
case ENDS_WITH -> EndsWith.<T>builder().field(field).value(value.toString()).build();
|
||||
case CONTAINS -> Contains.<T>builder().field(field).value(value.toString()).build();
|
||||
case REGEX -> Regex.<T>builder().field(field).value(value.toString()).build();
|
||||
case PREFIX -> Regex.<T>builder().field(field).value("^" + value.toString().replace(".", "\\.") + "(?:\\..+)?$").build();
|
||||
};
|
||||
}
|
||||
|
||||
public enum Field {
|
||||
@@ -154,6 +139,12 @@ public record QueryFilter(
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
|
||||
}
|
||||
},
|
||||
EXECUTION_ID("executionId") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
|
||||
}
|
||||
},
|
||||
CHILD_FILTER("childFilter") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
@@ -228,7 +219,7 @@ public record QueryFilter(
|
||||
@Override
|
||||
public List<Field> supportedField() {
|
||||
return List.of(Field.QUERY, Field.SCOPE, Field.NAMESPACE, Field.START_DATE,
|
||||
Field.END_DATE, Field.FLOW_ID, Field.TRIGGER_ID, Field.MIN_LEVEL
|
||||
Field.END_DATE, Field.FLOW_ID, Field.TRIGGER_ID, Field.MIN_LEVEL, Field.EXECUTION_ID
|
||||
);
|
||||
}
|
||||
},
|
||||
|
||||
@@ -25,6 +25,7 @@ import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
|
||||
import io.kestra.core.services.LabelService;
|
||||
import io.kestra.core.test.flow.TaskFixture;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.MapUtils;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
import jakarta.annotation.Nullable;
|
||||
@@ -131,12 +132,12 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
* @param labels The Flow labels.
|
||||
* @return a new {@link Execution}.
|
||||
*/
|
||||
public static Execution newExecution(final Flow flow, final List<Label> labels) {
|
||||
public static Execution newExecution(final FlowInterface flow, final List<Label> labels) {
|
||||
return newExecution(flow, null, labels, Optional.empty());
|
||||
}
|
||||
|
||||
public List<Label> getLabels() {
|
||||
return Optional.ofNullable(this.labels).orElse(new ArrayList<>());
|
||||
return ListUtils.emptyOnNull(this.labels);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -181,8 +182,22 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Customization of Lombok-generated builder.
|
||||
*/
|
||||
public static class ExecutionBuilder {
|
||||
|
||||
/**
|
||||
* Enforce unique values of {@link Label} when using the builder.
|
||||
*
|
||||
* @param labels The labels.
|
||||
* @return Deduplicated labels.
|
||||
*/
|
||||
public ExecutionBuilder labels(List<Label> labels) {
|
||||
this.labels = Label.deduplicate(labels);
|
||||
return this;
|
||||
}
|
||||
|
||||
void prebuild() {
|
||||
this.originalId = this.id;
|
||||
this.metadata = ExecutionMetadata.builder()
|
||||
@@ -231,7 +246,6 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
}
|
||||
|
||||
public Execution withLabels(List<Label> labels) {
|
||||
|
||||
return new Execution(
|
||||
this.tenantId,
|
||||
this.id,
|
||||
@@ -241,7 +255,7 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
this.taskRunList,
|
||||
this.inputs,
|
||||
this.outputs,
|
||||
labels,
|
||||
Label.deduplicate(labels),
|
||||
this.variables,
|
||||
this.state,
|
||||
this.parentId,
|
||||
@@ -400,7 +414,7 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
*
|
||||
* @param resolvedTasks normal tasks
|
||||
* @param resolvedErrors errors tasks
|
||||
* @param resolvedErrors finally tasks
|
||||
* @param resolvedFinally finally tasks
|
||||
* @return the flow we need to follow
|
||||
*/
|
||||
public List<ResolvedTask> findTaskDependingFlowState(
|
||||
|
||||
@@ -38,6 +38,8 @@ public abstract class AbstractFlow implements FlowInterface {
|
||||
@Min(value = 1)
|
||||
Integer revision;
|
||||
|
||||
String description;
|
||||
|
||||
@Valid
|
||||
List<Input<?>> inputs;
|
||||
|
||||
|
||||
@@ -61,13 +61,10 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
}
|
||||
});
|
||||
|
||||
String description;
|
||||
|
||||
Map<String, Object> variables;
|
||||
|
||||
@Valid
|
||||
@NotEmpty
|
||||
|
||||
@Schema(additionalProperties = Schema.AdditionalPropertiesValue.TRUE)
|
||||
List<Task> tasks;
|
||||
|
||||
@@ -125,7 +122,7 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
AbstractRetry retry;
|
||||
|
||||
@Valid
|
||||
@PluginProperty(beta = true)
|
||||
@PluginProperty
|
||||
List<SLA> sla;
|
||||
|
||||
public Stream<String> allTypes() {
|
||||
|
||||
@@ -4,6 +4,7 @@ import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.Optional;
|
||||
@@ -57,6 +58,7 @@ public interface FlowId {
|
||||
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
@EqualsAndHashCode
|
||||
class Default implements FlowId {
|
||||
private final String tenantId;
|
||||
private final String namespace;
|
||||
|
||||
@@ -31,6 +31,8 @@ public interface FlowInterface extends FlowId, DeletedInterface, TenantInterface
|
||||
|
||||
Pattern YAML_REVISION_MATCHER = Pattern.compile("(?m)^revision: \\d+\n?");
|
||||
|
||||
String getDescription();
|
||||
|
||||
boolean isDisabled();
|
||||
|
||||
boolean isDeleted();
|
||||
|
||||
@@ -116,7 +116,7 @@ public class State {
|
||||
}
|
||||
|
||||
public Instant maxDate() {
|
||||
if (this.histories.size() == 0) {
|
||||
if (this.histories.isEmpty()) {
|
||||
return Instant.now();
|
||||
}
|
||||
|
||||
@@ -124,7 +124,7 @@ public class State {
|
||||
}
|
||||
|
||||
public Instant minDate() {
|
||||
if (this.histories.size() == 0) {
|
||||
if (this.histories.isEmpty()) {
|
||||
return Instant.now();
|
||||
}
|
||||
|
||||
@@ -173,6 +173,11 @@ public class State {
|
||||
return this.current.isBreakpoint();
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public boolean isQueued() {
|
||||
return this.current.isQueued();
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public boolean isRetrying() {
|
||||
return this.current.isRetrying();
|
||||
@@ -206,6 +211,14 @@ public class State {
|
||||
return this.histories.get(this.histories.size() - 2).state.isPaused();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true if the execution has failed, then was restarted.
|
||||
* This is to disambiguate between a RESTARTED after PAUSED and RESTARTED after FAILED state.
|
||||
*/
|
||||
public boolean failedThenRestarted() {
|
||||
return this.current == Type.RESTARTED && this.histories.get(this.histories.size() - 2).state.isFailed();
|
||||
}
|
||||
|
||||
@Introspected
|
||||
public enum Type {
|
||||
CREATED,
|
||||
@@ -264,6 +277,10 @@ public class State {
|
||||
return this == Type.KILLED;
|
||||
}
|
||||
|
||||
public boolean isQueued(){
|
||||
return this == Type.QUEUED;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return states that are terminal to an execution
|
||||
*/
|
||||
|
||||
@@ -20,9 +20,8 @@ public class FileInput extends Input<URI> {
|
||||
|
||||
private static final String DEFAULT_EXTENSION = ".upl";
|
||||
|
||||
@Builder.Default
|
||||
@Deprecated(since = "0.24", forRemoval = true)
|
||||
public String extension = DEFAULT_EXTENSION;
|
||||
public String extension;
|
||||
|
||||
@Override
|
||||
public void validate(URI input) throws ConstraintViolationException {
|
||||
@@ -33,6 +32,7 @@ public class FileInput extends Input<URI> {
|
||||
String res = inputs.stream()
|
||||
.filter(in -> in instanceof FileInput)
|
||||
.filter(in -> in.getId().equals(fileName))
|
||||
.filter(flowInput -> ((FileInput) flowInput).getExtension() != null)
|
||||
.map(flowInput -> ((FileInput) flowInput).getExtension())
|
||||
.findFirst()
|
||||
.orElse(FileInput.DEFAULT_EXTENSION);
|
||||
|
||||
@@ -222,6 +222,7 @@ public class Trigger extends TriggerContext implements HasUID {
|
||||
}
|
||||
// If trigger is a schedule and execution ended after the next execution date
|
||||
else if (abstractTrigger instanceof Schedule schedule &&
|
||||
this.getNextExecutionDate() != null &&
|
||||
execution.getState().getEndDate().get().isAfter(this.getNextExecutionDate().toInstant())
|
||||
) {
|
||||
RecoverMissedSchedules recoverMissedSchedules = Optional.ofNullable(schedule.getRecoverMissedSchedules())
|
||||
|
||||
@@ -28,6 +28,7 @@ public interface QueueFactoryInterface {
|
||||
String SUBFLOWEXECUTIONRESULT_NAMED = "subflowExecutionResultQueue";
|
||||
String CLUSTER_EVENT_NAMED = "clusterEventQueue";
|
||||
String SUBFLOWEXECUTIONEND_NAMED = "subflowExecutionEndQueue";
|
||||
String EXECUTION_RUNNING_NAMED = "executionRunningQueue";
|
||||
|
||||
QueueInterface<Execution> execution();
|
||||
|
||||
@@ -58,4 +59,6 @@ public interface QueueFactoryInterface {
|
||||
QueueInterface<SubflowExecutionResult> subflowExecutionResult();
|
||||
|
||||
QueueInterface<SubflowExecutionEnd> subflowExecutionEnd();
|
||||
|
||||
QueueInterface<ExecutionRunning> executionRunning();
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
|
||||
void delete(String consumerGroup, T message) throws QueueException;
|
||||
|
||||
default Runnable receive(Consumer<Either<T, DeserializationException>> consumer) {
|
||||
return receive((String) null, consumer);
|
||||
return receive(null, consumer, false);
|
||||
}
|
||||
|
||||
default Runnable receive(String consumerGroup, Consumer<Either<T, DeserializationException>> consumer) {
|
||||
|
||||
@@ -27,8 +27,6 @@ public class QueueService {
|
||||
return ((Executor) object).getExecution().getId();
|
||||
} else if (object.getClass() == MetricEntry.class) {
|
||||
return null;
|
||||
} else if (object.getClass() == ExecutionRunning.class) {
|
||||
return ((ExecutionRunning) object).getExecution().getId();
|
||||
} else if (object.getClass() == SubflowExecutionEnd.class) {
|
||||
return ((SubflowExecutionEnd) object).getParentExecutionId();
|
||||
} else {
|
||||
|
||||
@@ -0,0 +1,12 @@
|
||||
package io.kestra.core.queues;
|
||||
|
||||
import java.io.Serial;
|
||||
|
||||
public class UnsupportedMessageException extends QueueException {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public UnsupportedMessageException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
||||
@@ -161,7 +161,7 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
|
||||
}
|
||||
|
||||
List<Execution> lastExecutions(
|
||||
@Nullable String tenantId,
|
||||
String tenantId,
|
||||
@Nullable List<FlowFilter> flows
|
||||
);
|
||||
}
|
||||
|
||||
@@ -81,9 +81,7 @@ public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry
|
||||
|
||||
Flux<LogEntry> findAsync(
|
||||
@Nullable String tenantId,
|
||||
@Nullable String namespace,
|
||||
@Nullable Level minLevel,
|
||||
ZonedDateTime startDate
|
||||
List<QueryFilter> filters
|
||||
);
|
||||
|
||||
Flux<LogEntry> findAllAsync(@Nullable String tenantId);
|
||||
@@ -96,5 +94,7 @@ public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry
|
||||
|
||||
void deleteByQuery(String tenantId, String namespace, String flowId, String triggerId);
|
||||
|
||||
void deleteByFilters(String tenantId, List<QueryFilter> filters);
|
||||
|
||||
int deleteByQuery(String tenantId, String namespace, String flowId, String executionId, List<Level> logLevels, ZonedDateTime startDate, ZonedDateTime endDate);
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
@@ -11,7 +12,7 @@ import lombok.With;
|
||||
@Value
|
||||
@AllArgsConstructor
|
||||
@Builder
|
||||
public class ExecutionRunning {
|
||||
public class ExecutionRunning implements HasUID {
|
||||
String tenantId;
|
||||
|
||||
@NotNull
|
||||
@@ -26,6 +27,7 @@ public class ExecutionRunning {
|
||||
@With
|
||||
ConcurrencyState concurrencyState;
|
||||
|
||||
@Override
|
||||
public String uid() {
|
||||
return IdUtils.fromPartsAndSeparator('|', this.tenantId, this.namespace, this.flowId, this.execution.getId());
|
||||
}
|
||||
|
||||
@@ -86,7 +86,7 @@ public class Executor {
|
||||
|
||||
public Boolean canBeProcessed() {
|
||||
return !(this.getException() != null || this.getFlow() == null || this.getFlow() instanceof FlowWithException || this.getFlow().getTasks() == null ||
|
||||
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint());
|
||||
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint() || this.getExecution().getState().isQueued());
|
||||
}
|
||||
|
||||
public Executor withFlow(FlowWithSource flow) {
|
||||
|
||||
@@ -102,49 +102,39 @@ public class ExecutorService {
|
||||
return this.flowExecutorInterface;
|
||||
}
|
||||
|
||||
public Executor checkConcurrencyLimit(Executor executor, FlowInterface flow, Execution execution, long count) {
|
||||
// if above the limit, handle concurrency limit based on its behavior
|
||||
if (count >= flow.getConcurrency().getLimit()) {
|
||||
public ExecutionRunning processExecutionRunning(FlowInterface flow, int runningCount, ExecutionRunning executionRunning) {
|
||||
// if concurrency was removed, it can be null as we always get the latest flow definition
|
||||
if (flow.getConcurrency() != null && runningCount >= flow.getConcurrency().getLimit()) {
|
||||
return switch (flow.getConcurrency().getBehavior()) {
|
||||
case QUEUE -> {
|
||||
var newExecution = execution.withState(State.Type.QUEUED);
|
||||
|
||||
ExecutionRunning executionRunning = ExecutionRunning.builder()
|
||||
.tenantId(flow.getTenantId())
|
||||
.namespace(flow.getNamespace())
|
||||
.flowId(flow.getId())
|
||||
.execution(newExecution)
|
||||
.concurrencyState(ExecutionRunning.ConcurrencyState.QUEUED)
|
||||
.build();
|
||||
|
||||
// when max concurrency is reached, we throttle the execution and stop processing
|
||||
logService.logExecution(
|
||||
newExecution,
|
||||
executionRunning.getExecution(),
|
||||
Level.INFO,
|
||||
"Flow is queued due to concurrency limit exceeded, {} running(s)",
|
||||
count
|
||||
"Execution is queued due to concurrency limit exceeded, {} running(s)",
|
||||
runningCount
|
||||
);
|
||||
// return the execution queued
|
||||
yield executor
|
||||
.withExecutionRunning(executionRunning)
|
||||
.withExecution(newExecution, "checkConcurrencyLimit");
|
||||
var newExecution = executionRunning.getExecution().withState(State.Type.QUEUED);
|
||||
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_QUEUED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_QUEUED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
|
||||
yield executionRunning
|
||||
.withExecution(newExecution)
|
||||
.withConcurrencyState(ExecutionRunning.ConcurrencyState.QUEUED);
|
||||
}
|
||||
case CANCEL ->
|
||||
executor.withExecution(execution.withState(State.Type.CANCELLED), "checkConcurrencyLimit");
|
||||
executionRunning
|
||||
.withExecution(executionRunning.getExecution().withState(State.Type.CANCELLED))
|
||||
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
|
||||
case FAIL ->
|
||||
executor.withException(new IllegalStateException("Flow is FAILED due to concurrency limit exceeded"), "checkConcurrencyLimit");
|
||||
executionRunning
|
||||
.withExecution(executionRunning.getExecution().failedExecutionFromExecutor(new IllegalStateException("Execution is FAILED due to concurrency limit exceeded")).getExecution())
|
||||
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
// if under the limit, update the executor with a RUNNING ExecutionRunning to track them
|
||||
var executionRunning = new ExecutionRunning(
|
||||
flow.getTenantId(),
|
||||
flow.getNamespace(),
|
||||
flow.getId(),
|
||||
executor.getExecution(),
|
||||
ExecutionRunning.ConcurrencyState.RUNNING
|
||||
);
|
||||
return executor.withExecutionRunning(executionRunning);
|
||||
// if under the limit, run it!
|
||||
return executionRunning
|
||||
.withExecution(executionRunning.getExecution().withState(State.Type.RUNNING))
|
||||
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
|
||||
}
|
||||
|
||||
public Executor process(Executor executor) {
|
||||
|
||||
@@ -286,18 +286,10 @@ public class FlowableUtils {
|
||||
|
||||
// start as many tasks as we have concurrency slots
|
||||
return collect.values().stream()
|
||||
.map(resolvedTasks -> filterCreated(resolvedTasks, taskRuns, parentTaskRun))
|
||||
.map(resolvedTasks -> resolveSequentialNexts(execution, resolvedTasks, null, null, parentTaskRun))
|
||||
.filter(resolvedTasks -> !resolvedTasks.isEmpty())
|
||||
.limit(concurrencySlots)
|
||||
.map(resolvedTasks -> resolvedTasks.getFirst().toNextTaskRun(execution))
|
||||
.toList();
|
||||
}
|
||||
|
||||
private static List<ResolvedTask> filterCreated(List<ResolvedTask> tasks, List<TaskRun> taskRuns, TaskRun parentTaskRun) {
|
||||
return tasks.stream()
|
||||
.filter(resolvedTask -> taskRuns.stream()
|
||||
.noneMatch(taskRun -> FlowableUtils.isTaskRunFor(resolvedTask, taskRun, parentTaskRun))
|
||||
)
|
||||
.map(resolvedTasks -> resolvedTasks.getFirst())
|
||||
.toList();
|
||||
}
|
||||
|
||||
|
||||
@@ -764,6 +764,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
workerTask = workerTask.withTaskRun(workerTask.getTaskRun().withState(state));
|
||||
|
||||
WorkerTaskResult workerTaskResult = new WorkerTaskResult(workerTask.getTaskRun(), dynamicTaskRuns);
|
||||
|
||||
this.workerTaskResultQueue.emit(workerTaskResult);
|
||||
|
||||
// upload the cache file, hash may not be present if we didn't succeed in computing it
|
||||
@@ -796,6 +797,10 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
// If it's a message too big, we remove the outputs
|
||||
failed = failed.withOutputs(Variables.empty());
|
||||
}
|
||||
if (e instanceof UnsupportedMessageException) {
|
||||
// we expect the offending char is in the output so we remove it
|
||||
failed = failed.withOutputs(Variables.empty());
|
||||
}
|
||||
WorkerTaskResult workerTaskResult = new WorkerTaskResult(failed);
|
||||
RunContextLogger contextLogger = runContextLoggerFactory.create(workerTask);
|
||||
contextLogger.logger().error("Unable to emit the worker task result to the queue: {}", e.getMessage(), e);
|
||||
@@ -818,7 +823,11 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
private Optional<String> hashTask(RunContext runContext, Task task) {
|
||||
try {
|
||||
var map = JacksonMapper.toMap(task);
|
||||
var rMap = runContext.render(map);
|
||||
// If there are task provided variables, rendering the task may fail.
|
||||
// The best we can do is to add a fake 'workingDir' as it's an often added variables,
|
||||
// and it should not be part of the task hash.
|
||||
Map<String, Object> variables = Map.of("workingDir", "workingDir");
|
||||
var rMap = runContext.render(map, variables);
|
||||
var json = JacksonMapper.ofJson().writeValueAsBytes(rMap);
|
||||
MessageDigest digest = MessageDigest.getInstance("SHA-256");
|
||||
digest.update(json);
|
||||
|
||||
@@ -102,6 +102,19 @@ public abstract class AbstractDate {
|
||||
}
|
||||
|
||||
if (value instanceof Long longValue) {
|
||||
if(value.toString().length() == 13) {
|
||||
return Instant.ofEpochMilli(longValue).atZone(zoneId);
|
||||
}else if(value.toString().length() == 19 ){
|
||||
if(value.toString().endsWith("000")){
|
||||
long seconds = longValue/1_000_000_000;
|
||||
int nanos = (int) (longValue%1_000_000_000);
|
||||
return Instant.ofEpochSecond(seconds,nanos).atZone(zoneId);
|
||||
}else{
|
||||
long milliseconds = longValue/1_000_000;
|
||||
int micros = (int) (longValue%1_000_000);
|
||||
return Instant.ofEpochMilli(milliseconds).atZone(zoneId).withNano(micros*1000);
|
||||
}
|
||||
}
|
||||
return Instant.ofEpochSecond(longValue).atZone(zoneId);
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ import io.kestra.core.events.CrudEventType;
|
||||
import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
@@ -318,7 +319,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
}
|
||||
|
||||
synchronized (this) { // we need a sync block as we read then update so we should not do it in multiple threads concurrently
|
||||
List<Trigger> triggers = triggerState.findAllForAllTenants();
|
||||
Map<String, Trigger> triggers = triggerState.findAllForAllTenants().stream().collect(Collectors.toMap(HasUID::uid, Function.identity()));
|
||||
|
||||
flows
|
||||
.stream()
|
||||
@@ -328,7 +329,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
.flatMap(flow -> flow.getTriggers().stream().filter(trigger -> trigger instanceof WorkerTriggerInterface).map(trigger -> new FlowAndTrigger(flow, trigger)))
|
||||
.distinct()
|
||||
.forEach(flowAndTrigger -> {
|
||||
Optional<Trigger> trigger = triggers.stream().filter(t -> t.uid().equals(Trigger.uid(flowAndTrigger.flow(), flowAndTrigger.trigger()))).findFirst(); // must have one or none
|
||||
String triggerUid = Trigger.uid(flowAndTrigger.flow(), flowAndTrigger.trigger());
|
||||
Optional<Trigger> trigger = Optional.ofNullable(triggers.get(triggerUid));
|
||||
if (trigger.isEmpty()) {
|
||||
RunContext runContext = runContextFactory.of(flowAndTrigger.flow(), flowAndTrigger.trigger());
|
||||
ConditionContext conditionContext = conditionService.conditionContext(runContext, flowAndTrigger.flow(), null);
|
||||
@@ -467,9 +469,12 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
|
||||
private List<FlowWithTriggers> computeSchedulable(List<FlowWithSource> flows, List<Trigger> triggerContextsToEvaluate, ScheduleContextInterface scheduleContext) {
|
||||
List<String> flowToKeep = triggerContextsToEvaluate.stream().map(Trigger::getFlowId).toList();
|
||||
List<String> flowIds = flows.stream().map(FlowId::uidWithoutRevision).toList();
|
||||
Map<String, Trigger> triggerById = triggerContextsToEvaluate.stream().collect(Collectors.toMap(HasUID::uid, Function.identity()));
|
||||
|
||||
// delete trigger which flow has been deleted
|
||||
triggerContextsToEvaluate.stream()
|
||||
.filter(trigger -> !flows.stream().map(FlowId::uidWithoutRevision).toList().contains(FlowId.uid(trigger)))
|
||||
.filter(trigger -> !flowIds.contains(FlowId.uid(trigger)))
|
||||
.forEach(trigger -> {
|
||||
try {
|
||||
this.triggerState.delete(trigger);
|
||||
@@ -491,12 +496,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
.map(abstractTrigger -> {
|
||||
RunContext runContext = runContextFactory.of(flow, abstractTrigger);
|
||||
ConditionContext conditionContext = conditionService.conditionContext(runContext, flow, null);
|
||||
Trigger triggerContext = null;
|
||||
Trigger lastTrigger = triggerContextsToEvaluate
|
||||
.stream()
|
||||
.filter(triggerContextToFind -> triggerContextToFind.uid().equals(Trigger.uid(flow, abstractTrigger)))
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
Trigger triggerContext;
|
||||
Trigger lastTrigger = triggerById.get(Trigger.uid(flow, abstractTrigger));
|
||||
// If a trigger is not found in triggers to evaluate, then we ignore it
|
||||
if (lastTrigger == null) {
|
||||
return null;
|
||||
|
||||
@@ -250,9 +250,10 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
|
||||
stateLock.lock();
|
||||
// Optional callback to be executed at the end.
|
||||
Runnable returnCallback = null;
|
||||
|
||||
localServiceState = localServiceState(service);
|
||||
try {
|
||||
localServiceState = localServiceState(service);
|
||||
|
||||
|
||||
if (localServiceState == null) {
|
||||
return null; // service has been unregistered.
|
||||
}
|
||||
@@ -301,7 +302,7 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
|
||||
// Update the local instance
|
||||
this.serviceRegistry.register(localServiceState.with(remoteInstance));
|
||||
} catch (Exception e) {
|
||||
final ServiceInstance localInstance = localServiceState(service).instance();
|
||||
final ServiceInstance localInstance = localServiceState.instance();
|
||||
log.error("[Service id={}, type='{}', hostname='{}'] Failed to update state to {}. Error: {}",
|
||||
localInstance.uid(),
|
||||
localInstance.type(),
|
||||
@@ -317,7 +318,7 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
|
||||
returnCallback.run();
|
||||
}
|
||||
}
|
||||
return localServiceState(service).instance();
|
||||
return Optional.ofNullable(localServiceState(service)).map(LocalServiceState::instance).orElse(null);
|
||||
}
|
||||
|
||||
private void mayDisableStateUpdate(final Service service, final ServiceInstance instance) {
|
||||
@@ -371,9 +372,11 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
|
||||
final Service service,
|
||||
final ServiceInstance instance,
|
||||
final boolean isLivenessEnabled) {
|
||||
// Never shutdown STANDALONE server or WEB_SERVER service.
|
||||
if (instance.server().type().equals(ServerInstance.Type.STANDALONE) ||
|
||||
instance.is(ServiceType.WEBSERVER)) {
|
||||
// Never shutdown STANDALONE server or WEBSERVER and INDEXER services.
|
||||
if (ServerInstance.Type.STANDALONE.equals(instance.server().type()) ||
|
||||
instance.is(ServiceType.INDEXER) ||
|
||||
instance.is(ServiceType.WEBSERVER)
|
||||
) {
|
||||
// Force the RUNNING state.
|
||||
return Optional.of(instance.state(Service.ServiceState.RUNNING, now, null));
|
||||
}
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
package io.kestra.core.server;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import io.kestra.core.utils.Enums;
|
||||
|
||||
/**
|
||||
* Supported Kestra's service types.
|
||||
*/
|
||||
@@ -9,4 +12,14 @@ public enum ServiceType {
|
||||
SCHEDULER,
|
||||
WEBSERVER,
|
||||
WORKER,
|
||||
INVALID;
|
||||
|
||||
@JsonCreator
|
||||
public static ServiceType fromString(final String value) {
|
||||
try {
|
||||
return Enums.getForNameIgnoreCase(value, ServiceType.class, INVALID);
|
||||
} catch (IllegalArgumentException e) {
|
||||
return INVALID;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.FlowWithException;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.flows.GenericFlow;
|
||||
import io.kestra.core.models.tasks.RunnableTask;
|
||||
import io.kestra.core.models.topologies.FlowTopology;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.validations.ModelValidator;
|
||||
@@ -51,7 +52,6 @@ import java.util.stream.StreamSupport;
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class FlowService {
|
||||
|
||||
@Inject
|
||||
Optional<FlowRepositoryInterface> flowRepository;
|
||||
|
||||
@@ -236,6 +236,7 @@ public class FlowService {
|
||||
}
|
||||
|
||||
List<String> warnings = new ArrayList<>(checkValidSubflows(flow, tenantId));
|
||||
|
||||
List<io.kestra.plugin.core.trigger.Flow> flowTriggers = ListUtils.emptyOnNull(flow.getTriggers()).stream()
|
||||
.filter(io.kestra.plugin.core.trigger.Flow.class::isInstance)
|
||||
.map(io.kestra.plugin.core.trigger.Flow.class::cast)
|
||||
@@ -246,6 +247,21 @@ public class FlowService {
|
||||
}
|
||||
});
|
||||
|
||||
// add warning for runnable properties (timeout, workerGroup, taskCache) when used not in a runnable
|
||||
flow.allTasksWithChilds().forEach(task -> {
|
||||
if (!(task instanceof RunnableTask<?>)) {
|
||||
if (task.getTimeout() != null) {
|
||||
warnings.add("The task '" + task.getId() + "' cannot use the 'timeout' property as it's only relevant for runnable tasks.");
|
||||
}
|
||||
if (task.getTaskCache() != null) {
|
||||
warnings.add("The task '" + task.getId() + "' cannot use the 'taskCache' property as it's only relevant for runnable tasks.");
|
||||
}
|
||||
if (task.getWorkerGroup() != null) {
|
||||
warnings.add("The task '" + task.getId() + "' cannot use the 'workerGroup' property as it's only relevant for runnable tasks.");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return warnings;
|
||||
}
|
||||
|
||||
@@ -531,29 +547,26 @@ public class FlowService {
|
||||
throw noRepositoryException();
|
||||
}
|
||||
|
||||
List<FlowTopology> flowTopologies = flowTopologyRepository.get().findByFlow(tenant, namespace, id, destinationOnly);
|
||||
return expandAll ? recursiveFlowTopology(tenant, namespace, id, destinationOnly) : flowTopologies.stream();
|
||||
return expandAll ? recursiveFlowTopology(new ArrayList<>(), tenant, namespace, id, destinationOnly) : flowTopologyRepository.get().findByFlow(tenant, namespace, id, destinationOnly).stream();
|
||||
}
|
||||
|
||||
private Stream<FlowTopology> recursiveFlowTopology(String tenantId, String namespace, String flowId, boolean destinationOnly) {
|
||||
private Stream<FlowTopology> recursiveFlowTopology(List<FlowId> flowIds, String tenantId, String namespace, String id, boolean destinationOnly) {
|
||||
if (flowTopologyRepository.isEmpty()) {
|
||||
throw noRepositoryException();
|
||||
}
|
||||
|
||||
List<FlowTopology> flowTopologies = flowTopologyRepository.get().findByFlow(tenantId, namespace, flowId, destinationOnly);
|
||||
List<FlowTopology> subTopologies = flowTopologies.stream()
|
||||
// filter on destination is not the current node to avoid an infinite loop
|
||||
.filter(topology -> !(topology.getDestination().getTenantId().equals(tenantId) && topology.getDestination().getNamespace().equals(namespace) && topology.getDestination().getId().equals(flowId)))
|
||||
.toList();
|
||||
List<FlowTopology> flowTopologies = flowTopologyRepository.get().findByFlow(tenantId, namespace, id, destinationOnly);
|
||||
|
||||
if (subTopologies.isEmpty()) {
|
||||
FlowId flowId = FlowId.of(tenantId, namespace, id, null);
|
||||
if (flowIds.contains(flowId)) {
|
||||
return flowTopologies.stream();
|
||||
} else {
|
||||
return Stream.concat(flowTopologies.stream(), subTopologies.stream()
|
||||
.map(topology -> topology.getDestination())
|
||||
// recursively fetch child nodes
|
||||
.flatMap(destination -> recursiveFlowTopology(destination.getTenantId(), destination.getNamespace(), destination.getId(), destinationOnly)));
|
||||
}
|
||||
flowIds.add(flowId);
|
||||
|
||||
return flowTopologies.stream()
|
||||
.flatMap(topology -> Stream.of(topology.getDestination(), topology.getSource()))
|
||||
// recursively fetch child nodes
|
||||
.flatMap(node -> recursiveFlowTopology(flowIds, node.getTenantId(), node.getNamespace(), node.getId(), destinationOnly));
|
||||
}
|
||||
|
||||
private IllegalStateException noRepositoryException() {
|
||||
|
||||
@@ -1,37 +1,193 @@
|
||||
package io.kestra.core.utils;
|
||||
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
|
||||
public class Either<L, R> {
|
||||
private final Optional<L> left;
|
||||
private final Optional<R> right;
|
||||
|
||||
private Either(Optional<L> left, Optional<R> right) {
|
||||
this.left = left;
|
||||
this.right = right;
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple {@link Either} monad type.
|
||||
*
|
||||
* @param <L> the {@link Left} type.
|
||||
* @param <R> the {@link Right} type.
|
||||
*/
|
||||
public abstract sealed class Either<L, R> permits Either.Left, Either.Right {
|
||||
|
||||
public static <L, R> Either<L, R> left(L value) {
|
||||
return new Either<>(Optional.ofNullable(value), Optional.empty());
|
||||
return new Left<>(value);
|
||||
}
|
||||
|
||||
public boolean isLeft() {
|
||||
return this.left.isPresent();
|
||||
}
|
||||
|
||||
public L getLeft() {
|
||||
return this.left.get();
|
||||
}
|
||||
|
||||
|
||||
public static <L, R> Either<L, R> right(R value) {
|
||||
return new Either<>(Optional.empty(), Optional.ofNullable(value));
|
||||
return new Right<>(value);
|
||||
}
|
||||
|
||||
public boolean isRight() {
|
||||
return this.right.isPresent();
|
||||
|
||||
/**
|
||||
* Returns {@code true} if this is a {@link Left}, {@code false} otherwise.
|
||||
*/
|
||||
public abstract boolean isLeft();
|
||||
|
||||
/**
|
||||
* Returns {@code true} if this is a {@link Right}, {@code false} otherwise.
|
||||
*/
|
||||
public abstract boolean isRight();
|
||||
|
||||
/**
|
||||
* Returns the left value.
|
||||
*
|
||||
* @throws NoSuchElementException if is not left.
|
||||
*/
|
||||
public abstract L getLeft();
|
||||
|
||||
/**
|
||||
* Returns the right value.
|
||||
*
|
||||
* @throws NoSuchElementException if is not right.
|
||||
*/
|
||||
public abstract R getRight();
|
||||
|
||||
public LeftProjection<L, R> left() {
|
||||
return new LeftProjection<>(this);
|
||||
}
|
||||
|
||||
public R getRight() {
|
||||
return this.right.get();
|
||||
|
||||
public RightProjection<L, R> right() {
|
||||
return new RightProjection<>(this);
|
||||
}
|
||||
}
|
||||
|
||||
public <T> T fold(final Function<L, T> fl, final Function<R, T> fr) {
|
||||
return isLeft() ? fl.apply(getLeft()) : fr.apply(getRight());
|
||||
}
|
||||
|
||||
public static final class Left<L, R> extends Either<L, R> {
|
||||
|
||||
private final L value;
|
||||
|
||||
private Left(L value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return {@code true}.
|
||||
*/
|
||||
@Override
|
||||
public boolean isLeft() {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return {@code false}.
|
||||
*/
|
||||
@Override
|
||||
public boolean isRight() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public L getLeft() {
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public R getRight() {
|
||||
throw new NoSuchElementException("This is Left");
|
||||
}
|
||||
}
|
||||
|
||||
public static final class Right<L, R> extends Either<L, R> {
|
||||
|
||||
private final R value;
|
||||
|
||||
private Right(R value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return {@code false}.
|
||||
*/
|
||||
@Override
|
||||
public boolean isLeft() {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return {@code true}.
|
||||
*/
|
||||
@Override
|
||||
public boolean isRight() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public L getLeft() {
|
||||
throw new NoSuchElementException("This is Right");
|
||||
}
|
||||
|
||||
@Override
|
||||
public R getRight() {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
||||
public static class LeftProjection<L, R> {
|
||||
|
||||
private final Either<L, R> either;
|
||||
|
||||
LeftProjection(final Either<L, R> either) {
|
||||
Objects.requireNonNull(either, "either can't be null");
|
||||
this.either = either;
|
||||
}
|
||||
|
||||
public boolean exists() {
|
||||
return either.isLeft();
|
||||
}
|
||||
|
||||
public L get() {
|
||||
return either.getLeft();
|
||||
}
|
||||
|
||||
public <LL> Either<LL, R> map(final Function<? super L, ? extends LL> fn) {
|
||||
if (either.isLeft()) return Either.left(fn.apply(either.getLeft()));
|
||||
else return Either.right(either.getRight());
|
||||
}
|
||||
|
||||
public <LL> Either<LL, R> flatMap(final Function<? super L, Either<LL, R>> fn) {
|
||||
if (either.isLeft()) return fn.apply(either.getLeft());
|
||||
else return Either.right(either.getRight());
|
||||
}
|
||||
|
||||
public Optional<L> toOptional() {
|
||||
return exists() ? Optional.of(either.getLeft()) : Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
public static class RightProjection<L, R> {
|
||||
|
||||
private final Either<L, R> either;
|
||||
|
||||
RightProjection(final Either<L, R> either) {
|
||||
Objects.requireNonNull(either, "either can't be null");
|
||||
this.either = either;
|
||||
}
|
||||
|
||||
public boolean exists() {
|
||||
return either.isRight();
|
||||
}
|
||||
|
||||
public R get() {
|
||||
return either.getRight();
|
||||
}
|
||||
|
||||
public <RR> Either<L, RR> map(final Function<? super R, ? extends RR> fn) {
|
||||
if (either.isRight()) return Either.right(fn.apply(either.getRight()));
|
||||
else return Either.left(either.getLeft());
|
||||
}
|
||||
|
||||
public <RR> Either<L, RR> flatMap(final Function<? super R, Either<L, RR>> fn) {
|
||||
if (either.isRight()) return fn.apply(either.getRight());
|
||||
else return Either.left(either.getLeft());
|
||||
}
|
||||
|
||||
public Optional<R> toOptional() {
|
||||
return exists() ? Optional.of(either.getRight()) : Optional.empty();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,7 @@ import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
@@ -118,6 +119,25 @@ public final class Enums {
|
||||
));
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert an object to a list of a specific enum.
|
||||
* @param value the object to convert to list of enum.
|
||||
* @param enumClass the class of the enum to convert to.
|
||||
* @return A list of the corresponding enum type
|
||||
* @param <T> The type of the enum.
|
||||
* @throws IllegalArgumentException If the value does not match any enum value.
|
||||
*/
|
||||
public static <T extends Enum<T>> List<T> fromList(Object value, Class<T> enumClass) {
|
||||
return switch (value) {
|
||||
case List<?> list when !list.isEmpty() && enumClass.isInstance(list.getFirst()) -> (List<T>) list;
|
||||
case List<?> list when !list.isEmpty() && list.getFirst() instanceof String ->
|
||||
list.stream().map(item -> Enum.valueOf(enumClass, item.toString().toUpperCase())).collect(Collectors.toList());
|
||||
case Enum<?> enumValue when enumClass.isInstance(enumValue) -> List.of(enumClass.cast(enumValue));
|
||||
case String stringValue -> List.of(Enum.valueOf(enumClass, stringValue.toUpperCase()));
|
||||
default -> throw new IllegalArgumentException("Field requires a " + enumClass.getSimpleName() + " or List<" + enumClass.getSimpleName() + "> value");
|
||||
};
|
||||
}
|
||||
|
||||
private Enums() {
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,4 +55,20 @@ public class ListUtils {
|
||||
|
||||
return newList;
|
||||
}
|
||||
|
||||
public static List<?> convertToList(Object object){
|
||||
if (object instanceof List<?> list) {
|
||||
return list;
|
||||
} else {
|
||||
throw new IllegalArgumentException("%s in not an instance of List".formatted(object.getClass()));
|
||||
}
|
||||
}
|
||||
|
||||
public static List<String> convertToListString(Object object){
|
||||
if (object instanceof List<?> list && (list.isEmpty() || list.getFirst() instanceof String)) {
|
||||
return (List<String>) list;
|
||||
} else {
|
||||
throw new IllegalArgumentException("%s in not an instance of List of String".formatted(object));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -169,7 +169,7 @@ public class MapUtils {
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method nested a flattened map.
|
||||
* Utility method that nests a flattened map.
|
||||
*
|
||||
* @param flatMap the flattened map.
|
||||
* @return the nested map.
|
||||
@@ -203,4 +203,44 @@ public class MapUtils {
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method that flatten a nested map.
|
||||
* <p>
|
||||
* NOTE: for simplicity, this method didn't allow to flatten maps with conflicting keys that would end up in different flatten keys,
|
||||
* this could be related later if needed by flattening {k1: k2: {k3: v1}, k1: {k4: v2}} to {k1.k2.k3: v1, k1.k4: v2} is prohibited for now.
|
||||
*
|
||||
* @param nestedMap the nested map.
|
||||
* @return the flattened map.
|
||||
*
|
||||
* @throws IllegalArgumentException if any entry contains a map of more than one element.
|
||||
*/
|
||||
public static Map<String, Object> nestedToFlattenMap(@NotNull Map<String, Object> nestedMap) {
|
||||
Map<String, Object> result = new TreeMap<>();
|
||||
|
||||
for (Map.Entry<String, Object> entry : nestedMap.entrySet()) {
|
||||
if (entry.getValue() instanceof Map<?, ?> map) {
|
||||
Map.Entry<String, Object> flatten = flattenEntry(entry.getKey(), (Map<String, Object>) map);
|
||||
result.put(flatten.getKey(), flatten.getValue());
|
||||
} else {
|
||||
result.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private static Map.Entry<String, Object> flattenEntry(String key, Map<String, Object> value) {
|
||||
if (value.size() > 1) {
|
||||
throw new IllegalArgumentException("You cannot flatten a map with an entry that is a map of more than one element, conflicting key: " + key);
|
||||
}
|
||||
|
||||
Map.Entry<String, Object> entry = value.entrySet().iterator().next();
|
||||
String newKey = key + "." + entry.getKey();
|
||||
Object newValue = entry.getValue();
|
||||
if (newValue instanceof Map<?, ?> map) {
|
||||
return flattenEntry(newKey, (Map<String, Object>) map);
|
||||
} else {
|
||||
return Map.entry(newKey, newValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,77 @@
|
||||
package io.kestra.core.validations;
|
||||
|
||||
import io.micronaut.context.annotation.Context;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Enforces validation rules upon the application configuration.
|
||||
*/
|
||||
@Slf4j
|
||||
@Context
|
||||
public class AppConfigValidator {
|
||||
private static final String KESTRA_URL_KEY = "kestra.url";
|
||||
|
||||
private final Environment environment;
|
||||
|
||||
@Inject
|
||||
public AppConfigValidator(Environment environment) {
|
||||
this.environment = environment;
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
void validate() {
|
||||
final List<Boolean> validationResults = List.of(
|
||||
isKestraUrlValid()
|
||||
);
|
||||
|
||||
if (validationResults.contains(false)) {
|
||||
throw new AppConfigException("Invalid configuration");
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isKestraUrlValid() {
|
||||
if (!environment.containsProperty(KESTRA_URL_KEY)) {
|
||||
return true;
|
||||
}
|
||||
final String rawUrl = environment.getProperty(KESTRA_URL_KEY, String.class).orElseThrow();
|
||||
final URL url;
|
||||
|
||||
try {
|
||||
url = URI.create(rawUrl).toURL();
|
||||
} catch (IllegalArgumentException | MalformedURLException e) {
|
||||
log.error(
|
||||
"Value of the '{}' configuration property must be a valid URL - e.g. https://your.company.com",
|
||||
KESTRA_URL_KEY
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!List.of("http", "https").contains(url.getProtocol())) {
|
||||
log.error(
|
||||
"Value of the '{}' configuration property must contain either HTTP or HTTPS scheme - e.g. https://your.company.com",
|
||||
KESTRA_URL_KEY
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public static class AppConfigException extends RuntimeException {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public AppConfigException(String errorMessage) {
|
||||
super(errorMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -54,9 +54,10 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
|
||||
violations.add("Namespace '" + value.getNamespace() + "' does not exist but is required to exist before a flow can be created in it.");
|
||||
}
|
||||
|
||||
List<Task> allTasks = value.allTasksWithChilds();
|
||||
|
||||
// tasks unique id
|
||||
List<String> taskIds = value.allTasksWithChilds()
|
||||
.stream()
|
||||
List<String> taskIds = allTasks.stream()
|
||||
.map(Task::getId)
|
||||
.toList();
|
||||
|
||||
@@ -72,8 +73,8 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
|
||||
violations.add("Duplicate trigger id with name [" + String.join(", ", duplicateIds) + "]");
|
||||
}
|
||||
|
||||
value.allTasksWithChilds()
|
||||
.stream().filter(task -> task instanceof ExecutableTask<?> executableTask
|
||||
allTasks.stream()
|
||||
.filter(task -> task instanceof ExecutableTask<?> executableTask
|
||||
&& value.getId().equals(executableTask.subflowId().flowId())
|
||||
&& value.getNamespace().equals(executableTask.subflowId().namespace()))
|
||||
.forEach(task -> violations.add("Recursive call to flow [" + value.getNamespace() + "." + value.getId() + "]"));
|
||||
@@ -102,7 +103,7 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
|
||||
.map(input -> Pattern.compile("\\{\\{\\s*inputs." + input.getId() + "\\s*\\}\\}"))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
List<String> invalidTasks = value.allTasks()
|
||||
List<String> invalidTasks = allTasks.stream()
|
||||
.filter(task -> checkObjectFieldsWithPatterns(task, inputsWithMinusPatterns))
|
||||
.map(task -> task.getId())
|
||||
.collect(Collectors.toList());
|
||||
@@ -112,12 +113,12 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
|
||||
" [" + String.join(", ", invalidTasks) + "]");
|
||||
}
|
||||
|
||||
List<Pattern> outputsWithMinusPattern = value.allTasks()
|
||||
List<Pattern> outputsWithMinusPattern = allTasks.stream()
|
||||
.filter(output -> Optional.ofNullable(output.getId()).orElse("").contains("-"))
|
||||
.map(output -> Pattern.compile("\\{\\{\\s*outputs\\." + output.getId() + "\\.[^}]+\\s*\\}\\}"))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
invalidTasks = value.allTasks()
|
||||
invalidTasks = allTasks.stream()
|
||||
.filter(task -> checkObjectFieldsWithPatterns(task, outputsWithMinusPattern))
|
||||
.map(task -> task.getId())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
@@ -19,7 +19,6 @@ import lombok.experimental.SuperBuilder;
|
||||
@NoArgsConstructor
|
||||
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
|
||||
@EqualsAndHashCode
|
||||
//@TriggersDataFilterValidation
|
||||
@Schema(
|
||||
title = "Display Execution data in a dashboard chart.",
|
||||
description = "Execution data can be displayed in charts broken out by Namespace and filtered by State, for example."
|
||||
|
||||
@@ -111,8 +111,9 @@ public class Labels extends Task implements ExecutionUpdatableTask {
|
||||
})
|
||||
).collect(Collectors.toMap(
|
||||
Map.Entry::getKey,
|
||||
Map.Entry::getValue
|
||||
));
|
||||
Map.Entry::getValue,
|
||||
(first, second) -> second)
|
||||
);
|
||||
} else if (labels instanceof Map<?, ?> map) {
|
||||
labelsAsMap = map.entrySet()
|
||||
.stream()
|
||||
|
||||
@@ -208,48 +208,50 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
boolean isOutputsAllowed = runContext
|
||||
.<Boolean>pluginConfiguration(PLUGIN_FLOW_OUTPUTS_ENABLED)
|
||||
.orElse(true);
|
||||
|
||||
final Output.OutputBuilder builder = Output.builder()
|
||||
.executionId(execution.getId())
|
||||
.state(execution.getState().getCurrent());
|
||||
|
||||
final Map<String, Object> subflowOutputs = Optional
|
||||
.ofNullable(flow.getOutputs())
|
||||
.map(outputs -> outputs
|
||||
.stream()
|
||||
.collect(Collectors.toMap(
|
||||
io.kestra.core.models.flows.Output::getId,
|
||||
io.kestra.core.models.flows.Output::getValue)
|
||||
)
|
||||
)
|
||||
.orElseGet(() -> isOutputsAllowed ? this.getOutputs() : null);
|
||||
|
||||
VariablesService variablesService = ((DefaultRunContext) runContext).getApplicationContext().getBean(VariablesService.class);
|
||||
if (subflowOutputs != null) {
|
||||
try {
|
||||
Map<String, Object> outputs = runContext.render(subflowOutputs);
|
||||
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
|
||||
if (flow.getOutputs() != null && flowInputOutput != null) {
|
||||
outputs = flowInputOutput.typedOutputs(flow, execution, outputs);
|
||||
}
|
||||
builder.outputs(outputs);
|
||||
} catch (Exception e) {
|
||||
runContext.logger().warn("Failed to extract outputs with the error: '{}'", e.getLocalizedMessage(), e);
|
||||
var state = State.Type.fail(this);
|
||||
Variables variables = variablesService.of(StorageContext.forTask(taskRun), builder.build());
|
||||
taskRun = taskRun
|
||||
.withState(state)
|
||||
.withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(state)).build()))
|
||||
.withOutputs(variables);
|
||||
if (this.wait) { // we only compute outputs if we wait for the subflow
|
||||
boolean isOutputsAllowed = runContext
|
||||
.<Boolean>pluginConfiguration(PLUGIN_FLOW_OUTPUTS_ENABLED)
|
||||
.orElse(true);
|
||||
|
||||
return Optional.of(SubflowExecutionResult.builder()
|
||||
.executionId(execution.getId())
|
||||
.state(State.Type.FAILED)
|
||||
.parentTaskRun(taskRun)
|
||||
.build());
|
||||
final Map<String, Object> subflowOutputs = Optional
|
||||
.ofNullable(flow.getOutputs())
|
||||
.map(outputs -> outputs
|
||||
.stream()
|
||||
.collect(Collectors.toMap(
|
||||
io.kestra.core.models.flows.Output::getId,
|
||||
io.kestra.core.models.flows.Output::getValue)
|
||||
)
|
||||
)
|
||||
.orElseGet(() -> isOutputsAllowed ? this.getOutputs() : null);
|
||||
|
||||
if (subflowOutputs != null) {
|
||||
try {
|
||||
Map<String, Object> outputs = runContext.render(subflowOutputs);
|
||||
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
|
||||
if (flow.getOutputs() != null && flowInputOutput != null) {
|
||||
outputs = flowInputOutput.typedOutputs(flow, execution, outputs);
|
||||
}
|
||||
builder.outputs(outputs);
|
||||
} catch (Exception e) {
|
||||
runContext.logger().warn("Failed to extract outputs with the error: '{}'", e.getLocalizedMessage(), e);
|
||||
var state = State.Type.fail(this);
|
||||
Variables variables = variablesService.of(StorageContext.forTask(taskRun), builder.build());
|
||||
taskRun = taskRun
|
||||
.withState(state)
|
||||
.withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(state)).build()))
|
||||
.withOutputs(variables);
|
||||
|
||||
return Optional.of(SubflowExecutionResult.builder()
|
||||
.executionId(execution.getId())
|
||||
.state(State.Type.FAILED)
|
||||
.parentTaskRun(taskRun)
|
||||
.build());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -9,11 +9,11 @@ import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.services.FlowService;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.codehaus.commons.nullanalysis.NotNull;
|
||||
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
|
||||
@@ -0,0 +1,11 @@
|
||||
<svg width="512" height="512" viewBox="0 0 512 512" fill="currentColor" xmlns="http://www.w3.org/2000/svg">
|
||||
<g clip-path="url(#clip0_1765_9330)">
|
||||
<path d="M244.592 215.915C251.569 208.938 262.881 208.938 269.858 215.915L298.537 244.595C305.514 251.572 305.514 262.883 298.537 269.86L269.858 298.54C262.881 305.517 251.569 305.517 244.592 298.54L215.913 269.86C208.936 262.883 208.936 251.572 215.913 244.595L244.592 215.915Z" />
|
||||
<path d="M376.685 215.687C383.537 208.835 394.646 208.835 401.498 215.687L430.63 244.818C437.482 251.67 437.482 262.78 430.63 269.632L401.498 298.763C394.646 305.615 383.537 305.615 376.685 298.763L347.553 269.632C340.701 262.78 340.701 251.67 347.553 244.818L376.685 215.687Z" />
|
||||
<path d="M244.818 83.8243C251.671 76.9722 262.78 76.9722 269.632 83.8243L298.763 112.956C305.616 119.808 305.616 130.917 298.763 137.769L269.632 166.901C262.78 173.753 251.671 173.753 244.818 166.901L215.687 137.769C208.835 130.917 208.835 119.808 215.687 112.956L244.818 83.8243Z" />
|
||||
<path d="M232.611 178.663C239.588 185.64 239.588 196.951 232.611 203.928L203.931 232.608C196.955 239.585 185.643 239.585 178.666 232.608L149.986 203.928C143.01 196.952 143.01 185.64 149.986 178.663L178.666 149.983C185.643 143.006 196.955 143.006 203.931 149.983L232.611 178.663Z" />
|
||||
<path d="M166.901 244.818C173.753 251.67 173.753 262.78 166.901 269.632L137.77 298.763C130.918 305.615 119.808 305.615 112.956 298.763L83.8246 269.632C76.9725 262.78 76.9725 251.67 83.8246 244.818L112.956 215.687C119.808 208.835 130.918 208.835 137.77 215.687L166.901 244.818Z" />
|
||||
<path d="M364.472 178.663C371.449 185.64 371.449 196.951 364.472 203.928L335.793 232.608C328.816 239.585 317.504 239.585 310.527 232.608L281.848 203.928C274.871 196.952 274.871 185.64 281.848 178.663L310.527 149.983C317.504 143.006 328.816 143.006 335.793 149.983L364.472 178.663Z" />
|
||||
<path d="M285.45 367.015C301.037 382.602 301.037 407.873 285.45 423.46C269.863 439.047 244.591 439.047 229.004 423.46C213.417 407.873 213.417 382.602 229.004 367.015C244.591 351.428 269.863 351.428 285.45 367.015Z" />
|
||||
</g>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 2.1 KiB |
@@ -0,0 +1,11 @@
|
||||
<svg width="512" height="512" viewBox="0 0 512 512" fill="currentColor" xmlns="http://www.w3.org/2000/svg">
|
||||
<g clip-path="url(#clip0_1765_9330)">
|
||||
<path d="M244.592 215.915C251.569 208.938 262.881 208.938 269.858 215.915L298.537 244.595C305.514 251.572 305.514 262.883 298.537 269.86L269.858 298.54C262.881 305.517 251.569 305.517 244.592 298.54L215.913 269.86C208.936 262.883 208.936 251.572 215.913 244.595L244.592 215.915Z" />
|
||||
<path d="M376.685 215.687C383.537 208.835 394.646 208.835 401.498 215.687L430.63 244.818C437.482 251.67 437.482 262.78 430.63 269.632L401.498 298.763C394.646 305.615 383.537 305.615 376.685 298.763L347.553 269.632C340.701 262.78 340.701 251.67 347.553 244.818L376.685 215.687Z" />
|
||||
<path d="M244.818 83.8243C251.671 76.9722 262.78 76.9722 269.632 83.8243L298.763 112.956C305.616 119.808 305.616 130.917 298.763 137.769L269.632 166.901C262.78 173.753 251.671 173.753 244.818 166.901L215.687 137.769C208.835 130.917 208.835 119.808 215.687 112.956L244.818 83.8243Z" />
|
||||
<path d="M232.611 178.663C239.588 185.64 239.588 196.951 232.611 203.928L203.931 232.608C196.955 239.585 185.643 239.585 178.666 232.608L149.986 203.928C143.01 196.952 143.01 185.64 149.986 178.663L178.666 149.983C185.643 143.006 196.955 143.006 203.931 149.983L232.611 178.663Z" />
|
||||
<path d="M166.901 244.818C173.753 251.67 173.753 262.78 166.901 269.632L137.77 298.763C130.918 305.615 119.808 305.615 112.956 298.763L83.8246 269.632C76.9725 262.78 76.9725 251.67 83.8246 244.818L112.956 215.687C119.808 208.835 130.918 208.835 137.77 215.687L166.901 244.818Z" />
|
||||
<path d="M364.472 178.663C371.449 185.64 371.449 196.951 364.472 203.928L335.793 232.608C328.816 239.585 317.504 239.585 310.527 232.608L281.848 203.928C274.871 196.952 274.871 185.64 281.848 178.663L310.527 149.983C317.504 143.006 328.816 143.006 335.793 149.983L364.472 178.663Z" />
|
||||
<path d="M285.45 367.015C301.037 382.602 301.037 407.873 285.45 423.46C269.863 439.047 244.591 439.047 229.004 423.46C213.417 407.873 213.417 382.602 229.004 367.015C244.591 351.428 269.863 351.428 285.45 367.015Z" />
|
||||
</g>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 2.1 KiB |
@@ -112,7 +112,7 @@ class JsonSchemaGeneratorTest {
|
||||
|
||||
var requiredWithDefault = definitions.get("io.kestra.core.docs.JsonSchemaGeneratorTest-RequiredWithDefault");
|
||||
assertThat(requiredWithDefault, is(notNullValue()));
|
||||
assertThat((List<String>) requiredWithDefault.get("required"), not(contains("requiredWithDefault")));
|
||||
assertThat((List<String>) requiredWithDefault.get("required"), not(containsInAnyOrder("requiredWithDefault", "anotherRequiredWithDefault")));
|
||||
|
||||
var properties = (Map<String, Map<String, Object>>) flow.get("properties");
|
||||
var listeners = properties.get("listeners");
|
||||
@@ -253,7 +253,7 @@ class JsonSchemaGeneratorTest {
|
||||
void requiredAreRemovedIfThereIsADefault() {
|
||||
Map<String, Object> generate = jsonSchemaGenerator.properties(Task.class, RequiredWithDefault.class);
|
||||
assertThat(generate, is(not(nullValue())));
|
||||
assertThat((List<String>) generate.get("required"), not(containsInAnyOrder("requiredWithDefault")));
|
||||
assertThat((List<String>) generate.get("required"), not(containsInAnyOrder("requiredWithDefault", "anotherRequiredWithDefault")));
|
||||
assertThat((List<String>) generate.get("required"), containsInAnyOrder("requiredWithNoDefault"));
|
||||
}
|
||||
|
||||
@@ -466,6 +466,11 @@ class JsonSchemaGeneratorTest {
|
||||
@Builder.Default
|
||||
private Property<TaskWithEnum.TestClass> requiredWithDefault = Property.ofValue(TaskWithEnum.TestClass.builder().testProperty("test").build());
|
||||
|
||||
@PluginProperty
|
||||
@NotNull
|
||||
@Builder.Default
|
||||
private Property<TaskWithEnum.TestClass> anotherRequiredWithDefault = Property.ofValue(TaskWithEnum.TestClass.builder().testProperty("test2").build());
|
||||
|
||||
@PluginProperty
|
||||
@NotNull
|
||||
private Property<TaskWithEnum.TestClass> requiredWithNoDefault;
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
package io.kestra.core.models;
|
||||
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class LabelTest {
|
||||
|
||||
@Test
|
||||
@@ -15,9 +16,8 @@ class LabelTest {
|
||||
new Label(Label.CORRELATION_ID, "id"))
|
||||
);
|
||||
|
||||
Assertions.assertEquals(
|
||||
Map.of("system", Map.of("username", "test", "correlationId", "id")),
|
||||
result
|
||||
assertThat(result).isEqualTo(
|
||||
Map.of("system", Map.of("username", "test", "correlationId", "id"))
|
||||
);
|
||||
}
|
||||
|
||||
@@ -29,9 +29,48 @@ class LabelTest {
|
||||
new Label(Label.CORRELATION_ID, "id"))
|
||||
);
|
||||
|
||||
Assertions.assertEquals(
|
||||
Map.of("system", Map.of("username", "test1", "correlationId", "id")),
|
||||
result
|
||||
assertThat(result).isEqualTo(
|
||||
Map.of("system", Map.of("username", "test2", "correlationId", "id"))
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldGetMapGivenDistinctLabels() {
|
||||
Map<String, String> result = Label.toMap(List.of(
|
||||
new Label(Label.USERNAME, "test"),
|
||||
new Label(Label.CORRELATION_ID, "id"))
|
||||
);
|
||||
|
||||
assertThat(result).isEqualTo(
|
||||
Map.of(Label.USERNAME, "test", Label.CORRELATION_ID, "id")
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldGetMapGivenDuplicateLabels() {
|
||||
Map<String, String> result = Label.toMap(List.of(
|
||||
new Label(Label.USERNAME, "test1"),
|
||||
new Label(Label.USERNAME, "test2"),
|
||||
new Label(Label.CORRELATION_ID, "id"))
|
||||
);
|
||||
|
||||
assertThat(result).isEqualTo(
|
||||
Map.of(Label.USERNAME, "test2", Label.CORRELATION_ID, "id")
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldDuplicateLabelsWithKeyOrderKept() {
|
||||
List<Label> result = Label.deduplicate(List.of(
|
||||
new Label(Label.USERNAME, "test1"),
|
||||
new Label(Label.USERNAME, "test2"),
|
||||
new Label(Label.CORRELATION_ID, "id"),
|
||||
new Label(Label.USERNAME, "test3"))
|
||||
);
|
||||
|
||||
assertThat(result).containsExactly(
|
||||
new Label(Label.USERNAME, "test3"),
|
||||
new Label(Label.CORRELATION_ID, "id")
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -94,6 +94,14 @@ public class QueryFilterTest {
|
||||
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.ENDS_WITH).build(), Resource.LOG),
|
||||
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.CONTAINS).build(), Resource.LOG),
|
||||
|
||||
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.EQUALS).build(), Resource.LOG),
|
||||
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.NOT_EQUALS).build(), Resource.LOG),
|
||||
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.IN).build(), Resource.LOG),
|
||||
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.NOT_IN).build(), Resource.LOG),
|
||||
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.STARTS_WITH).build(), Resource.LOG),
|
||||
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.ENDS_WITH).build(), Resource.LOG),
|
||||
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.CONTAINS).build(), Resource.LOG),
|
||||
|
||||
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.EQUALS).build(), Resource.EXECUTION),
|
||||
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.NOT_EQUALS).build(), Resource.EXECUTION),
|
||||
|
||||
@@ -204,6 +212,13 @@ public class QueryFilterTest {
|
||||
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.REGEX).build(), Resource.LOG),
|
||||
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.PREFIX).build(), Resource.LOG),
|
||||
|
||||
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.GREATER_THAN).build(), Resource.LOG),
|
||||
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.LESS_THAN).build(), Resource.LOG),
|
||||
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.GREATER_THAN_OR_EQUAL_TO).build(), Resource.LOG),
|
||||
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.LESS_THAN_OR_EQUAL_TO).build(), Resource.LOG),
|
||||
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.REGEX).build(), Resource.LOG),
|
||||
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.PREFIX).build(), Resource.LOG),
|
||||
|
||||
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.GREATER_THAN).build(), Resource.EXECUTION),
|
||||
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.LESS_THAN).build(), Resource.EXECUTION),
|
||||
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.GREATER_THAN_OR_EQUAL_TO).build(), Resource.EXECUTION),
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.core.models.executions;
|
||||
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import io.kestra.core.models.flows.State;
|
||||
|
||||
@@ -157,7 +158,58 @@ class ExecutionTest {
|
||||
.labels(List.of(new Label("test", "test-value")))
|
||||
.build();
|
||||
|
||||
assertThat(execution.getLabels().size()).isEqualTo(1);
|
||||
assertThat(execution.getLabels().getFirst()).isEqualTo(new Label("test", "test-value"));
|
||||
assertThat(execution.getLabels()).containsExactly(new Label("test", "test-value"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void labelsGetDeduplicated() {
|
||||
final List<Label> duplicatedLabels = List.of(
|
||||
new Label("test", "value1"),
|
||||
new Label("test", "value2")
|
||||
);
|
||||
|
||||
final Execution executionWithLabels = Execution.builder()
|
||||
.build()
|
||||
.withLabels(duplicatedLabels);
|
||||
assertThat(executionWithLabels.getLabels()).containsExactly(new Label("test", "value2"));
|
||||
|
||||
final Execution executionBuilder = Execution.builder()
|
||||
.labels(duplicatedLabels)
|
||||
.build();
|
||||
assertThat(executionBuilder.getLabels()).containsExactly(new Label("test", "value2"));
|
||||
}
|
||||
|
||||
@Test
|
||||
@Disabled("Solve label deduplication on instantization")
|
||||
void labelsGetDeduplicatedOnNewInstance() {
|
||||
final List<Label> duplicatedLabels = List.of(
|
||||
new Label("test", "value1"),
|
||||
new Label("test", "value2")
|
||||
);
|
||||
|
||||
final Execution executionNew = new Execution(
|
||||
"foo",
|
||||
"id",
|
||||
"namespace",
|
||||
"flowId",
|
||||
1,
|
||||
Collections.emptyList(),
|
||||
Map.of(),
|
||||
Map.of(),
|
||||
duplicatedLabels,
|
||||
Map.of(),
|
||||
State.of(State.Type.SUCCESS, Collections.emptyList()),
|
||||
"parentId",
|
||||
"originalId",
|
||||
null,
|
||||
false,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
assertThat(executionNew.getLabels()).containsExactly(new Label("test", "value2"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,6 +44,7 @@ import java.time.ZonedDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static io.kestra.core.models.flows.FlowScope.USER;
|
||||
@@ -198,6 +199,7 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
return Stream.of(
|
||||
QueryFilter.builder().field(Field.TIME_RANGE).value("test").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.TRIGGER_ID).value("test").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.EXECUTION_ID).value("test").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.WORKER_ID).value("test").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.EXISTING_ONLY).value("test").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.MIN_LEVEL).value(Level.DEBUG).operation(Op.EQUALS).build()
|
||||
@@ -740,4 +742,16 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
|
||||
assertThat(executions.size()).isEqualTo(0L);
|
||||
}
|
||||
|
||||
@Test
|
||||
protected void shouldReturnLastExecutionsWhenInputsAreNull() {
|
||||
inject();
|
||||
|
||||
List<Execution> lastExecutions = executionRepository.lastExecutions(MAIN_TENANT, null);
|
||||
|
||||
assertThat(lastExecutions).isNotEmpty();
|
||||
Set<String> flowIds = lastExecutions.stream().map(Execution::getFlowId).collect(Collectors.toSet());
|
||||
assertThat(flowIds.size()).isEqualTo(lastExecutions.size());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -160,6 +160,7 @@ public abstract class AbstractFlowRepositoryTest {
|
||||
QueryFilter.builder().field(Field.TIME_RANGE).value("test").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).value("executionTriggerId").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.TRIGGER_ID).value("test").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.EXECUTION_ID).value("test").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.CHILD_FILTER).value(ChildFilter.CHILD).operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.WORKER_ID).value("test").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.EXISTING_ONLY).value("test").operation(Op.EQUALS).build(),
|
||||
|
||||
@@ -34,6 +34,7 @@ import static io.kestra.core.models.flows.FlowScope.SYSTEM;
|
||||
import static io.kestra.core.models.flows.FlowScope.USER;
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.assertThatReflectiveOperationException;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
@KestraTest
|
||||
@@ -42,11 +43,15 @@ public abstract class AbstractLogRepositoryTest {
|
||||
protected LogRepositoryInterface logRepository;
|
||||
|
||||
protected static LogEntry.LogEntryBuilder logEntry(Level level) {
|
||||
return logEntry(level, IdUtils.create());
|
||||
}
|
||||
|
||||
protected static LogEntry.LogEntryBuilder logEntry(Level level, String executionId) {
|
||||
return LogEntry.builder()
|
||||
.flowId("flowId")
|
||||
.namespace("io.kestra.unittest")
|
||||
.taskId("taskId")
|
||||
.executionId(IdUtils.create())
|
||||
.executionId(executionId)
|
||||
.taskRunId(IdUtils.create())
|
||||
.attemptNumber(0)
|
||||
.timestamp(Instant.now())
|
||||
@@ -60,13 +65,36 @@ public abstract class AbstractLogRepositoryTest {
|
||||
@ParameterizedTest
|
||||
@MethodSource("filterCombinations")
|
||||
void should_find_all(QueryFilter filter){
|
||||
logRepository.save(logEntry(Level.INFO).build());
|
||||
logRepository.save(logEntry(Level.INFO, "executionId").build());
|
||||
|
||||
ArrayListTotal<LogEntry> entries = logRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter));
|
||||
|
||||
assertThat(entries).hasSize(1);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("filterCombinations")
|
||||
void should_find_async(QueryFilter filter){
|
||||
logRepository.save(logEntry(Level.INFO, "executionId").build());
|
||||
|
||||
Flux<LogEntry> find = logRepository.findAsync(MAIN_TENANT, List.of(filter));
|
||||
|
||||
List<LogEntry> logEntries = find.collectList().block();
|
||||
assertThat(logEntries).hasSize(1);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("filterCombinations")
|
||||
void should_delete_with_filter(QueryFilter filter){
|
||||
logRepository.save(logEntry(Level.INFO, "executionId").build());
|
||||
|
||||
logRepository.deleteByFilters(MAIN_TENANT, List.of(filter));
|
||||
|
||||
assertThat(logRepository.findAllAsync(MAIN_TENANT).collectList().block()).isEmpty();
|
||||
}
|
||||
|
||||
|
||||
|
||||
static Stream<QueryFilter> filterCombinations() {
|
||||
return Stream.of(
|
||||
QueryFilter.builder().field(Field.QUERY).value("flowId").operation(Op.EQUALS).build(),
|
||||
@@ -105,6 +133,13 @@ public abstract class AbstractLogRepositoryTest {
|
||||
QueryFilter.builder().field(Field.TRIGGER_ID).value("Id").operation(Op.ENDS_WITH).build(),
|
||||
QueryFilter.builder().field(Field.TRIGGER_ID).value(List.of("triggerId")).operation(Op.IN).build(),
|
||||
QueryFilter.builder().field(Field.TRIGGER_ID).value(List.of("anotherId")).operation(Op.NOT_IN).build(),
|
||||
QueryFilter.builder().field(Field.EXECUTION_ID).value("executionId").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.EXECUTION_ID).value("anotherId").operation(Op.NOT_EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.EXECUTION_ID).value("xecution").operation(Op.CONTAINS).build(),
|
||||
QueryFilter.builder().field(Field.EXECUTION_ID).value("execution").operation(Op.STARTS_WITH).build(),
|
||||
QueryFilter.builder().field(Field.EXECUTION_ID).value("Id").operation(Op.ENDS_WITH).build(),
|
||||
QueryFilter.builder().field(Field.EXECUTION_ID).value(List.of("executionId")).operation(Op.IN).build(),
|
||||
QueryFilter.builder().field(Field.EXECUTION_ID).value(List.of("anotherId")).operation(Op.NOT_IN).build(),
|
||||
QueryFilter.builder().field(Field.MIN_LEVEL).value(Level.DEBUG).operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.MIN_LEVEL).value(Level.ERROR).operation(Op.NOT_EQUALS).build()
|
||||
);
|
||||
@@ -284,32 +319,6 @@ public abstract class AbstractLogRepositoryTest {
|
||||
assertThat(find.size()).isZero();
|
||||
}
|
||||
|
||||
@Test
|
||||
void findAsync() {
|
||||
logRepository.save(logEntry(Level.INFO).build());
|
||||
logRepository.save(logEntry(Level.ERROR).build());
|
||||
logRepository.save(logEntry(Level.WARN).build());
|
||||
logRepository.save(logEntry(Level.INFO).executionKind(ExecutionKind.TEST).build()); // should not be visible here
|
||||
|
||||
ZonedDateTime startDate = ZonedDateTime.now().minusSeconds(1);
|
||||
|
||||
Flux<LogEntry> find = logRepository.findAsync(MAIN_TENANT, "io.kestra.unittest", Level.INFO, startDate);
|
||||
List<LogEntry> logEntries = find.collectList().block();
|
||||
assertThat(logEntries).hasSize(3);
|
||||
|
||||
find = logRepository.findAsync(MAIN_TENANT, null, Level.ERROR, startDate);
|
||||
logEntries = find.collectList().block();
|
||||
assertThat(logEntries).hasSize(1);
|
||||
|
||||
find = logRepository.findAsync(MAIN_TENANT, "io.kestra.unused", Level.INFO, startDate);
|
||||
logEntries = find.collectList().block();
|
||||
assertThat(logEntries).hasSize(0);
|
||||
|
||||
find = logRepository.findAsync(MAIN_TENANT, null, Level.INFO, startDate.plusSeconds(2));
|
||||
logEntries = find.collectList().block();
|
||||
assertThat(logEntries).hasSize(0);
|
||||
}
|
||||
|
||||
@Test
|
||||
void findAllAsync() {
|
||||
logRepository.save(logEntry(Level.INFO).build());
|
||||
|
||||
@@ -101,6 +101,7 @@ public abstract class AbstractTriggerRepositoryTest {
|
||||
QueryFilter.builder().field(Field.STATE).value(State.Type.RUNNING).operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.TIME_RANGE).value("test").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).value("test").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.EXECUTION_ID).value("test").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.CHILD_FILTER).value(ChildFilter.CHILD).operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.EXISTING_ONLY).value("test").operation(Op.EQUALS).build(),
|
||||
QueryFilter.builder().field(Field.MIN_LEVEL).value(Level.DEBUG).operation(Op.EQUALS).build()
|
||||
|
||||
@@ -417,6 +417,18 @@ public abstract class AbstractRunnerTest {
|
||||
flowConcurrencyCaseTest.flowConcurrencyCancelPause();
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-for-each-item.yaml", "flows/valids/flow-concurrency-queue.yml"})
|
||||
protected void flowConcurrencyWithForEachItem() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem();
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/flow-concurrency-queue-fail.yml"})
|
||||
void concurrencyQueueRestarted() throws Exception {
|
||||
flowConcurrencyCaseTest.flowConcurrencyQueueRestarted();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ExecuteFlow("flows/valids/executable-fail.yml")
|
||||
void badExecutable(Execution execution) {
|
||||
|
||||
@@ -8,18 +8,29 @@ import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.services.ExecutionService;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Files;
|
||||
import java.time.Duration;
|
||||
import java.util.Optional;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
@@ -28,7 +39,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
@Singleton
|
||||
public class FlowConcurrencyCaseTest {
|
||||
@Inject
|
||||
private RunnerUtils runnerUtils;
|
||||
private StorageInterface storageInterface;
|
||||
|
||||
@Inject
|
||||
protected RunnerUtils runnerUtils;
|
||||
|
||||
@Inject
|
||||
private FlowInputOutput flowIO;
|
||||
|
||||
@Inject
|
||||
private FlowRepositoryInterface flowRepository;
|
||||
@@ -37,6 +54,9 @@ public class FlowConcurrencyCaseTest {
|
||||
@Named(QueueFactoryInterface.EXECUTION_NAMED)
|
||||
protected QueueInterface<Execution> executionQueue;
|
||||
|
||||
@Inject
|
||||
private ExecutionService executionService;
|
||||
|
||||
public void flowConcurrencyCancel() throws TimeoutException, QueueException, InterruptedException {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
|
||||
Execution execution2 = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel");
|
||||
@@ -237,4 +257,109 @@ public class FlowConcurrencyCaseTest {
|
||||
assertThat(secondExecutionResult.get().getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
|
||||
assertThat(secondExecutionResult.get().getState().getHistories().get(1).getState()).isEqualTo(State.Type.CANCELLED);
|
||||
}
|
||||
|
||||
public void flowConcurrencyWithForEachItem() throws TimeoutException, QueueException, InterruptedException, URISyntaxException, IOException {
|
||||
URI file = storageUpload();
|
||||
Map<String, Object> inputs = Map.of("file", file.toString(), "batch", 4);
|
||||
Execution forEachItem = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-for-each-item", null,
|
||||
(flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs), Duration.ofSeconds(5));
|
||||
assertThat(forEachItem.getState().getCurrent()).isEqualTo(Type.RUNNING);
|
||||
|
||||
Set<String> executionIds = new HashSet<>();
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
|
||||
if ("flow-concurrency-queue".equals(e.getLeft().getFlowId()) && e.getLeft().getState().isRunning()) {
|
||||
executionIds.add(e.getLeft().getId());
|
||||
}
|
||||
});
|
||||
|
||||
// wait a little to be sure there are not too many executions started
|
||||
Thread.sleep(500);
|
||||
|
||||
assertThat(executionIds).hasSize(1);
|
||||
receive.blockLast();
|
||||
|
||||
Execution terminated = runnerUtils.awaitExecution(e -> e.getId().equals(forEachItem.getId()) && e.getState().isTerminated(), () -> {}, Duration.ofSeconds(10));
|
||||
assertThat(terminated.getState().getCurrent()).isEqualTo(Type.SUCCESS);
|
||||
}
|
||||
|
||||
public void flowConcurrencyQueueRestarted() throws Exception {
|
||||
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-fail", null, null, Duration.ofSeconds(30));
|
||||
Flow flow = flowRepository
|
||||
.findById(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-fail", Optional.empty())
|
||||
.orElseThrow();
|
||||
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
|
||||
executionQueue.emit(execution2);
|
||||
|
||||
assertThat(execution1.getState().isRunning()).isTrue();
|
||||
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CREATED);
|
||||
|
||||
var executionResult1 = new AtomicReference<Execution>();
|
||||
var executionResult2 = new AtomicReference<Execution>();
|
||||
|
||||
CountDownLatch latch1 = new CountDownLatch(2);
|
||||
AtomicReference<Execution> failedExecution = new AtomicReference<>();
|
||||
CountDownLatch latch2 = new CountDownLatch(1);
|
||||
CountDownLatch latch3 = new CountDownLatch(1);
|
||||
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
|
||||
if (e.getLeft().getId().equals(execution1.getId())) {
|
||||
executionResult1.set(e.getLeft());
|
||||
if (e.getLeft().getState().getCurrent() == Type.FAILED) {
|
||||
failedExecution.set(e.getLeft());
|
||||
latch1.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
if (e.getLeft().getId().equals(execution2.getId())) {
|
||||
executionResult2.set(e.getLeft());
|
||||
if (e.getLeft().getState().getCurrent() == State.Type.RUNNING) {
|
||||
latch2.countDown();
|
||||
}
|
||||
if (e.getLeft().getState().getCurrent() == Type.FAILED) {
|
||||
latch3.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue(latch2.await(1, TimeUnit.MINUTES));
|
||||
assertThat(failedExecution.get()).isNotNull();
|
||||
// here the first fail and the second is now running.
|
||||
// we restart the first one, it should be queued then fail again.
|
||||
Execution restarted = executionService.restart(failedExecution.get(), null);
|
||||
executionQueue.emit(restarted);
|
||||
|
||||
assertTrue(latch3.await(1, TimeUnit.MINUTES));
|
||||
assertTrue(latch1.await(1, TimeUnit.MINUTES));
|
||||
receive.blockLast();
|
||||
|
||||
assertThat(executionResult1.get().getState().getCurrent()).isEqualTo(Type.FAILED);
|
||||
// it should have been queued after restarted
|
||||
assertThat(executionResult1.get().getState().getHistories().stream().anyMatch(history -> history.getState() == Type.RESTARTED)).isTrue();
|
||||
assertThat(executionResult1.get().getState().getHistories().stream().anyMatch(history -> history.getState() == Type.QUEUED)).isTrue();
|
||||
assertThat(executionResult2.get().getState().getCurrent()).isEqualTo(Type.FAILED);
|
||||
assertThat(executionResult2.get().getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
|
||||
assertThat(executionResult2.get().getState().getHistories().get(1).getState()).isEqualTo(State.Type.QUEUED);
|
||||
assertThat(executionResult2.get().getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
|
||||
}
|
||||
|
||||
private URI storageUpload() throws URISyntaxException, IOException {
|
||||
File tempFile = File.createTempFile("file", ".txt");
|
||||
|
||||
Files.write(tempFile.toPath(), content());
|
||||
|
||||
return storageInterface.put(
|
||||
MAIN_TENANT,
|
||||
null,
|
||||
new URI("/file/storage/file.txt"),
|
||||
new FileInputStream(tempFile)
|
||||
);
|
||||
}
|
||||
|
||||
private List<String> content() {
|
||||
return IntStream
|
||||
.range(0, 7)
|
||||
.mapToObj(value -> StringUtils.leftPad(value + "", 20))
|
||||
.toList();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -273,7 +273,7 @@ public class RestartCaseTest {
|
||||
|
||||
// wait
|
||||
Execution finishedRestartedExecution = runnerUtils.awaitExecution(
|
||||
execution -> execution.getState().getCurrent() == State.Type.SUCCESS && execution.getId().equals(firstExecution.getId()),
|
||||
execution -> executionService.isTerminated(flow, execution) && execution.getState().isSuccess() && execution.getId().equals(firstExecution.getId()),
|
||||
throwRunnable(() -> {
|
||||
Execution restartedExec = executionService.restart(firstExecution, null);
|
||||
assertThat(restartedExec).isNotNull();
|
||||
|
||||
@@ -9,6 +9,7 @@ import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import org.junit.jupiter.api.RepeatedTest;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.event.Level;
|
||||
@@ -109,7 +110,8 @@ class RunContextLoggerTest {
|
||||
logger.info("test myawesomepassmyawesomepass myawesomepass myawesomepassmyawesomepass");
|
||||
logger.warn("test {}", URI.create("http://it-s.secret"));
|
||||
|
||||
matchingLog = TestsUtils.awaitLogs(logs, 3);
|
||||
// the 3 logs will create 4 log entries as exceptions stacktraces are logged separately at the TRACE level
|
||||
matchingLog = TestsUtils.awaitLogs(logs, 4);
|
||||
receive.blockLast();
|
||||
assertThat(matchingLog.stream().filter(logEntry -> logEntry.getLevel().equals(Level.DEBUG)).findFirst().orElseThrow().getMessage()).isEqualTo("test john@****** test");
|
||||
assertThat(matchingLog.stream().filter(logEntry -> logEntry.getLevel().equals(Level.TRACE)).findFirst().orElseThrow().getMessage()).contains("exception from doe.com");
|
||||
|
||||
@@ -5,6 +5,7 @@ import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.RunnableTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
@@ -18,6 +19,7 @@ import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
@@ -77,8 +79,12 @@ public class TaskCacheTest {
|
||||
@Plugin
|
||||
public static class CounterTask extends Task implements RunnableTask<CounterTask.Output> {
|
||||
|
||||
private String workingDir;
|
||||
|
||||
@Override
|
||||
public Output run(RunContext runContext) throws Exception {
|
||||
Map<String, Object> variables = Map.of("workingDir", runContext.workingDir().path().toString());
|
||||
runContext.render(this.workingDir, variables);
|
||||
return Output.builder()
|
||||
.counter(COUNTER.incrementAndGet())
|
||||
.build();
|
||||
|
||||
@@ -71,6 +71,8 @@ class DateFilterTest {
|
||||
{{ "2013-09-08T17:19:12+02:00" | date(timeZone="Europe/Paris") }}
|
||||
{{ "2013-09-08T17:19:12" | date(timeZone="Europe/Paris") }}
|
||||
{{ "2013-09-08" | date(timeZone="Europe/Paris") }}
|
||||
{{ "08.09.2023" | date("yyyy-MM-dd", existingFormat="dd.MM.yyyy") }}
|
||||
{{ "08092023" | date("yyyy-MM-dd", existingFormat="ddMMyyyy") }}
|
||||
""",
|
||||
Map.of()
|
||||
);
|
||||
@@ -80,6 +82,8 @@ class DateFilterTest {
|
||||
2013-09-08T17:19:12.000000+02:00
|
||||
2013-09-08T17:19:12.000000+02:00
|
||||
2013-09-08T00:00:00.000000+02:00
|
||||
2023-09-08
|
||||
2023-09-08
|
||||
""");
|
||||
}
|
||||
|
||||
@@ -171,7 +175,9 @@ class DateFilterTest {
|
||||
|
||||
render = variableRenderer.render("{{ now(format=\"sql_milli\") }}", ImmutableMap.of());
|
||||
|
||||
assertThat(render).isEqualTo(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));
|
||||
// a millisecond can pass between the render and now so we can't assert on a precise to millisecond date
|
||||
assertThat(render).startsWith(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
|
||||
assertThat(render).hasSize(23);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -185,4 +191,41 @@ class DateFilterTest {
|
||||
|
||||
assertThat(render).isEqualTo("2013-09-07T17:19:12.123456+02:00");
|
||||
}
|
||||
|
||||
@Test
|
||||
void timestampDateFormat() throws IllegalVariableEvaluationException {
|
||||
String render =
|
||||
variableRenderer.render(
|
||||
"""
|
||||
{{ 1378653552 | date(format="iso_sec", timeZone="Europe/Paris") }}
|
||||
{{ 1378653552123 | date(format="iso_milli", timeZone="Europe/Paris") }}
|
||||
{{ 1378653552123 | date(timeZone="Europe/Paris") }}
|
||||
{{ 1378653552123 | date(format="iso_zoned_date_time", timeZone="Europe/Paris") }}
|
||||
{{ 1378653552123456000 | date(format="iso", timeZone="Europe/Paris") }}
|
||||
{{ 1378653552000123456 | date(format="iso", timeZone="Europe/Paris") }}
|
||||
{{ 1378653552 | date(format="sql_sec", timeZone="Europe/Paris") }}
|
||||
{{ 1378653552123 | date(format="sql_milli", timeZone="Europe/Paris") }}
|
||||
{{ 1378653552123456000 | date(format="sql", timeZone="Europe/Paris") }}
|
||||
{{ 1378653552000123456 | date(format="sql", timeZone="Europe/Paris") }}
|
||||
{{ 1378653552123 | date(format="sql_milli", timeZone="UTC") }}
|
||||
{{ "1378653552123" | number | date(format="sql_milli", timeZone="UTC") }}
|
||||
""",
|
||||
Map.of());
|
||||
|
||||
assertThat(render).isEqualTo("""
|
||||
2013-09-08T17:19:12+02:00
|
||||
2013-09-08T17:19:12.123+02:00
|
||||
2013-09-08T17:19:12.123000+02:00
|
||||
2013-09-08T17:19:12.123+02:00[Europe/Paris]
|
||||
2013-09-08T17:19:12.123456+02:00
|
||||
2013-09-08T17:19:12.123456+02:00
|
||||
2013-09-08 17:19:12
|
||||
2013-09-08 17:19:12.123
|
||||
2013-09-08 17:19:12.123456
|
||||
2013-09-08 17:19:12.123456
|
||||
2013-09-08 15:19:12.123
|
||||
2013-09-08 15:19:12.123
|
||||
""");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -372,4 +372,44 @@ class FlowServiceTest {
|
||||
|
||||
assertThat(exceptions.size()).isZero();
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldReturnValidationForRunnablePropsOnFlowable() {
|
||||
// Given
|
||||
String source = """
|
||||
id: dolphin_164914
|
||||
namespace: company.team
|
||||
|
||||
tasks:
|
||||
- id: for
|
||||
type: io.kestra.plugin.core.flow.ForEach
|
||||
values: [1, 2, 3]
|
||||
workerGroup:
|
||||
key: toto
|
||||
timeout: PT10S
|
||||
taskCache:
|
||||
enabled: true
|
||||
tasks:
|
||||
- id: hello
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: Hello World! 🚀
|
||||
workerGroup:
|
||||
key: toto
|
||||
timeout: PT10S
|
||||
taskCache:
|
||||
enabled: true
|
||||
""";
|
||||
|
||||
// When
|
||||
List<ValidateConstraintViolation> results = flowService.validate("my-tenant", source);
|
||||
|
||||
// Then
|
||||
assertThat(results).hasSize(1);
|
||||
assertThat(results.getFirst().getWarnings()).hasSize(3);
|
||||
assertThat(results.getFirst().getWarnings()).containsExactlyInAnyOrder(
|
||||
"The task 'for' cannot use the 'timeout' property as it's only relevant for runnable tasks.",
|
||||
"The task 'for' cannot use the 'taskCache' property as it's only relevant for runnable tasks.",
|
||||
"The task 'for' cannot use the 'workerGroup' property as it's only relevant for runnable tasks."
|
||||
);
|
||||
}
|
||||
}
|
||||
462
core/src/test/java/io/kestra/core/utils/EitherTest.java
Normal file
462
core/src/test/java/io/kestra/core/utils/EitherTest.java
Normal file
@@ -0,0 +1,462 @@
|
||||
package io.kestra.core.utils;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.assertj.core.api.Assertions.*;
|
||||
|
||||
class EitherTest {
|
||||
|
||||
@Test
|
||||
void shouldCreateLeftInstance() {
|
||||
// Given
|
||||
String leftValue = "error";
|
||||
|
||||
// When
|
||||
Either<String, Integer> either = Either.left(leftValue);
|
||||
|
||||
// Then
|
||||
assertThat(either).isInstanceOf(Either.Left.class);
|
||||
assertThat(either.isLeft()).isTrue();
|
||||
assertThat(either.isRight()).isFalse();
|
||||
assertThat(either.getLeft()).isEqualTo(leftValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateRightInstance() {
|
||||
// Given
|
||||
Integer rightValue = 42;
|
||||
|
||||
// When
|
||||
Either<String, Integer> either = Either.right(rightValue);
|
||||
|
||||
// Then
|
||||
assertThat(either).isInstanceOf(Either.Right.class);
|
||||
assertThat(either.isRight()).isTrue();
|
||||
assertThat(either.isLeft()).isFalse();
|
||||
assertThat(either.getRight()).isEqualTo(rightValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateLeftWithNullValue() {
|
||||
// When
|
||||
Either<String, Integer> either = Either.left(null);
|
||||
|
||||
// Then
|
||||
assertThat(either.isLeft()).isTrue();
|
||||
assertThat(either.getLeft()).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateRightWithNullValue() {
|
||||
// When
|
||||
Either<String, Integer> either = Either.right(null);
|
||||
|
||||
// Then
|
||||
assertThat(either.isRight()).isTrue();
|
||||
assertThat(either.getRight()).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
void leftShouldReturnCorrectValues() {
|
||||
// Given
|
||||
String leftValue = "error message";
|
||||
Either<String, Integer> either = Either.left(leftValue);
|
||||
|
||||
// Then
|
||||
assertThat(either.isLeft()).isTrue();
|
||||
assertThat(either.isRight()).isFalse();
|
||||
assertThat(either.getLeft()).isEqualTo(leftValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
void leftShouldThrowExceptionWhenGettingRightValue() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.left("error");
|
||||
|
||||
// When/Then
|
||||
assertThatThrownBy(either::getRight)
|
||||
.isInstanceOf(NoSuchElementException.class)
|
||||
.hasMessage("This is Left");
|
||||
}
|
||||
|
||||
@Test
|
||||
void rightShouldReturnCorrectValues() {
|
||||
// Given
|
||||
Integer rightValue = 100;
|
||||
Either<String, Integer> either = Either.right(rightValue);
|
||||
|
||||
// Then
|
||||
assertThat(either.isRight()).isTrue();
|
||||
assertThat(either.isLeft()).isFalse();
|
||||
assertThat(either.getRight()).isEqualTo(rightValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
void rightShouldThrowExceptionWhenGettingLeftValue() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.right(42);
|
||||
|
||||
// When/Then
|
||||
assertThatThrownBy(either::getLeft)
|
||||
.isInstanceOf(NoSuchElementException.class)
|
||||
.hasMessage("This is Right");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldApplyLeftFunctionForLeftInstanceInFold() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.left("error");
|
||||
Function<String, String> leftFn = s -> "Left: " + s;
|
||||
Function<Integer, String> rightFn = i -> "Right: " + i;
|
||||
|
||||
// When
|
||||
String result = either.fold(leftFn, rightFn);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("Left: error");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldApplyRightFunctionForRightInstanceInFold() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.right(42);
|
||||
Function<String, String> leftFn = s -> "Left: " + s;
|
||||
Function<Integer, String> rightFn = i -> "Right: " + i;
|
||||
|
||||
// When
|
||||
String result = either.fold(leftFn, rightFn);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("Right: 42");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldHandleNullReturnValuesInFold() {
|
||||
// Given
|
||||
Either<String, Integer> leftEither = Either.left("error");
|
||||
Either<String, Integer> rightEither = Either.right(42);
|
||||
|
||||
// When
|
||||
String leftResult = leftEither.fold(s -> null, i -> "not null");
|
||||
String rightResult = rightEither.fold(s -> "not null", i -> null);
|
||||
|
||||
// Then
|
||||
assertThat(leftResult).isNull();
|
||||
assertThat(rightResult).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
void leftProjectionShouldExistForLeftInstance() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.left("error");
|
||||
|
||||
// When
|
||||
Either.LeftProjection<String, Integer> projection = either.left();
|
||||
|
||||
// Then
|
||||
assertThat(projection.exists()).isTrue();
|
||||
assertThat(projection.get()).isEqualTo("error");
|
||||
}
|
||||
|
||||
@Test
|
||||
void leftProjectionShouldNotExistForRightInstance() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.right(42);
|
||||
|
||||
// When
|
||||
Either.LeftProjection<String, Integer> projection = either.left();
|
||||
|
||||
// Then
|
||||
assertThat(projection.exists()).isFalse();
|
||||
assertThatThrownBy(projection::get)
|
||||
.isInstanceOf(NoSuchElementException.class)
|
||||
.hasMessage("This is Right");
|
||||
}
|
||||
|
||||
@Test
|
||||
void leftProjectionMapShouldTransformLeftValue() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.left("error");
|
||||
|
||||
// When
|
||||
Either<Integer, Integer> result = either.left().map(String::length);
|
||||
|
||||
// Then
|
||||
assertThat(result.isLeft()).isTrue();
|
||||
assertThat(result.getLeft()).isEqualTo(5);
|
||||
}
|
||||
|
||||
@Test
|
||||
void leftProjectionMapShouldPreserveRightValue() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.right(42);
|
||||
|
||||
// When
|
||||
Either<Integer, Integer> result = either.left().map(String::length);
|
||||
|
||||
// Then
|
||||
assertThat(result.isRight()).isTrue();
|
||||
assertThat(result.getRight()).isEqualTo(42);
|
||||
}
|
||||
|
||||
@Test
|
||||
void leftProjectionFlatMapShouldTransformLeftValue() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.left("error");
|
||||
|
||||
// When
|
||||
Either<Integer, Integer> result = either.left().flatMap(s -> Either.left(s.length()));
|
||||
|
||||
// Then
|
||||
assertThat(result.isLeft()).isTrue();
|
||||
assertThat(result.getLeft()).isEqualTo(5);
|
||||
}
|
||||
|
||||
@Test
|
||||
void leftProjectionFlatMapShouldPreserveRightValue() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.right(42);
|
||||
|
||||
// When
|
||||
Either<Integer, Integer> result = either.left().flatMap(s -> Either.left(s.length()));
|
||||
|
||||
// Then
|
||||
assertThat(result.isRight()).isTrue();
|
||||
assertThat(result.getRight()).isEqualTo(42);
|
||||
}
|
||||
|
||||
@Test
|
||||
void leftProjectionFlatMapCanReturnRight() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.left("error");
|
||||
|
||||
// When
|
||||
Either<String, Integer> result = either.left().flatMap(s -> Either.right(999));
|
||||
|
||||
// Then
|
||||
assertThat(result.isRight()).isTrue();
|
||||
assertThat(result.getRight()).isEqualTo(999);
|
||||
}
|
||||
|
||||
@Test
|
||||
void leftProjectionToOptionalShouldReturnPresentForLeft() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.left("error");
|
||||
|
||||
// When
|
||||
Optional<String> optional = either.left().toOptional();
|
||||
|
||||
// Then
|
||||
assertThat(optional).isPresent();
|
||||
assertThat(optional.get()).isEqualTo("error");
|
||||
}
|
||||
|
||||
@Test
|
||||
void leftProjectionToOptionalShouldReturnEmptyForRight() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.right(42);
|
||||
|
||||
// When
|
||||
Optional<String> optional = either.left().toOptional();
|
||||
|
||||
// Then
|
||||
assertThat(optional).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
void leftProjectionConstructorShouldThrowForNullEither() {
|
||||
// When/Then
|
||||
assertThatThrownBy(() -> new Either.LeftProjection<>(null))
|
||||
.isInstanceOf(NullPointerException.class)
|
||||
.hasMessage("either can't be null");
|
||||
}
|
||||
|
||||
@Test
|
||||
void rightProjectionShouldExistForRightInstance() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.right(42);
|
||||
|
||||
// When
|
||||
Either.RightProjection<String, Integer> projection = either.right();
|
||||
|
||||
// Then
|
||||
assertThat(projection.exists()).isTrue();
|
||||
assertThat(projection.get()).isEqualTo(42);
|
||||
}
|
||||
|
||||
@Test
|
||||
void rightProjectionShouldNotExistForLeftInstance() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.left("error");
|
||||
|
||||
// When
|
||||
Either.RightProjection<String, Integer> projection = either.right();
|
||||
|
||||
// Then
|
||||
assertThat(projection.exists()).isFalse();
|
||||
assertThatThrownBy(projection::get)
|
||||
.isInstanceOf(NoSuchElementException.class)
|
||||
.hasMessage("This is Left");
|
||||
}
|
||||
|
||||
@Test
|
||||
void rightProjectionMapShouldTransformRightValue() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.right(42);
|
||||
|
||||
// When
|
||||
Either<String, String> result = either.right().map(Object::toString);
|
||||
|
||||
// Then
|
||||
assertThat(result.isRight()).isTrue();
|
||||
assertThat(result.getRight()).isEqualTo("42");
|
||||
}
|
||||
|
||||
@Test
|
||||
void rightProjectionMapShouldPreserveLeftValue() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.left("error");
|
||||
|
||||
// When
|
||||
Either<String, String> result = either.right().map(Object::toString);
|
||||
|
||||
// Then
|
||||
assertThat(result.isLeft()).isTrue();
|
||||
assertThat(result.getLeft()).isEqualTo("error");
|
||||
}
|
||||
|
||||
@Test
|
||||
void rightProjectionFlatMapShouldTransformRightValue() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.right(42);
|
||||
|
||||
// When
|
||||
Either<String, String> result = either.right().flatMap(i -> Either.right(i.toString()));
|
||||
|
||||
// Then
|
||||
assertThat(result.isRight()).isTrue();
|
||||
assertThat(result.getRight()).isEqualTo("42");
|
||||
}
|
||||
|
||||
@Test
|
||||
void rightProjectionFlatMapShouldPreserveLeftValue() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.left("error");
|
||||
|
||||
// When
|
||||
Either<String, String> result = either.right().flatMap(i -> Either.right(i.toString()));
|
||||
|
||||
// Then
|
||||
assertThat(result.isLeft()).isTrue();
|
||||
assertThat(result.getLeft()).isEqualTo("error");
|
||||
}
|
||||
|
||||
@Test
|
||||
void rightProjectionFlatMapCanReturnLeft() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.right(42);
|
||||
|
||||
// When
|
||||
Either<String, Integer> result = either.right().flatMap(i -> Either.left("converted"));
|
||||
|
||||
// Then
|
||||
assertThat(result.isLeft()).isTrue();
|
||||
assertThat(result.getLeft()).isEqualTo("converted");
|
||||
}
|
||||
|
||||
@Test
|
||||
void rightProjectionToOptionalShouldReturnPresentForRight() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.right(42);
|
||||
|
||||
// When
|
||||
Optional<Integer> optional = either.right().toOptional();
|
||||
|
||||
// Then
|
||||
assertThat(optional).isPresent();
|
||||
assertThat(optional.get()).isEqualTo(42);
|
||||
}
|
||||
|
||||
@Test
|
||||
void rightProjectionToOptionalShouldReturnEmptyForLeft() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.left("error");
|
||||
|
||||
// When
|
||||
Optional<Integer> optional = either.right().toOptional();
|
||||
|
||||
// Then
|
||||
assertThat(optional).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
void rightProjectionConstructorShouldThrowForNullEither() {
|
||||
// When/Then
|
||||
assertThatThrownBy(() -> new Either.RightProjection<>(null))
|
||||
.isInstanceOf(NullPointerException.class)
|
||||
.hasMessage("either can't be null");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldHandleNullValuesInTransformations() {
|
||||
// Given
|
||||
Either<String, Integer> leftEither = Either.left(null);
|
||||
Either<String, Integer> rightEither = Either.right(null);
|
||||
|
||||
// When/Then
|
||||
assertThat(leftEither.left().map(s -> s == null ? "was null" : s).getLeft())
|
||||
.isEqualTo("was null");
|
||||
|
||||
assertThat(rightEither.right().map(i -> i == null ? "was null" : i.toString()).getRight())
|
||||
.isEqualTo("was null");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldHandleComplexTypeTransformations() {
|
||||
// Given
|
||||
Either<Exception, String> either = Either.right("hello world");
|
||||
|
||||
// When
|
||||
Either<String, Integer> result = either
|
||||
.left().map(Exception::getMessage)
|
||||
.right().map(String::length);
|
||||
|
||||
// Then
|
||||
assertThat(result.isRight()).isTrue();
|
||||
assertThat(result.getRight()).isEqualTo(11);
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldChainTransformationsCorrectly() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.right(10);
|
||||
|
||||
// When
|
||||
Either<String, String> result = either
|
||||
.right().flatMap(i -> i > 5 ? Either.right(i * 2) : Either.left("too small"))
|
||||
.right().map(i -> "Result: " + i);
|
||||
|
||||
// Then
|
||||
assertThat(result.isRight()).isTrue();
|
||||
assertThat(result.getRight()).isEqualTo("Result: 20");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldHandleProjectionChainingWithErrorCases() {
|
||||
// Given
|
||||
Either<String, Integer> either = Either.right(3);
|
||||
|
||||
// When
|
||||
Either<String, String> result = either
|
||||
.right().flatMap(i -> i > 5 ? Either.right(i * 2) : Either.left("too small"))
|
||||
.right().map(i -> "Result: " + i);
|
||||
|
||||
// Then
|
||||
assertThat(result.isLeft()).isTrue();
|
||||
assertThat(result.getLeft()).isEqualTo("too small");
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,9 @@
|
||||
package io.kestra.core.utils;
|
||||
|
||||
import org.junit.Assert;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
import java.util.List;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -25,7 +28,7 @@ class EnumsTest {
|
||||
|
||||
@Test
|
||||
void shouldThrowExceptionGivenInvalidString() {
|
||||
Assertions.assertThrows(IllegalArgumentException.class, () -> {
|
||||
assertThrows(IllegalArgumentException.class, () -> {
|
||||
Enums.getForNameIgnoreCase("invalid", TestEnum.class);
|
||||
});
|
||||
}
|
||||
@@ -49,11 +52,22 @@ class EnumsTest {
|
||||
String invalidValue = "invalidValue";
|
||||
|
||||
// Act & Assert
|
||||
IllegalArgumentException exception = Assert.assertThrows(IllegalArgumentException.class, () ->
|
||||
IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () ->
|
||||
Enums.fromString(invalidValue, mapping, "TestEnumWithValue")
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
void should_get_from_list(){
|
||||
assertThat(Enums.fromList(List.of(TestEnum.ENUM1, TestEnum.ENUM2), TestEnum.class)).isEqualTo(List.of(TestEnum.ENUM1, TestEnum.ENUM2));
|
||||
assertThat(Enums.fromList(List.of("ENUM1", "ENUM2"), TestEnum.class)).isEqualTo(List.of(TestEnum.ENUM1, TestEnum.ENUM2));
|
||||
assertThat(Enums.fromList(TestEnum.ENUM1, TestEnum.class)).isEqualTo(List.of(TestEnum.ENUM1));
|
||||
assertThat(Enums.fromList("ENUM1", TestEnum.class)).isEqualTo(List.of(TestEnum.ENUM1));
|
||||
|
||||
assertThrows(IllegalArgumentException.class, () -> Enums.fromList(List.of("string1", "string2"), TestEnum.class));
|
||||
assertThrows(IllegalArgumentException.class, () -> Enums.fromList("non enum value", TestEnum.class));
|
||||
}
|
||||
|
||||
enum TestEnum {
|
||||
ENUM1, ENUM2
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
class ListUtilsTest {
|
||||
|
||||
@@ -36,4 +37,19 @@ class ListUtilsTest {
|
||||
assertThat(ListUtils.concat(list1, null)).isEqualTo(List.of("1", "2"));
|
||||
assertThat(ListUtils.concat(null, list2)).isEqualTo(List.of("3", "4"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void convertToList(){
|
||||
assertThat(ListUtils.convertToList(List.of(1, 2, 3))).isEqualTo(List.of(1, 2, 3));
|
||||
assertThrows(IllegalArgumentException.class, () -> ListUtils.convertToList("not a list"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void convertToListString(){
|
||||
assertThat(ListUtils.convertToListString(List.of("string1", "string2"))).isEqualTo(List.of("string1", "string2"));
|
||||
assertThat(ListUtils.convertToListString(List.of())).isEqualTo(List.of());
|
||||
|
||||
assertThrows(IllegalArgumentException.class, () -> ListUtils.convertToListString("not a list"));
|
||||
assertThrows(IllegalArgumentException.class, () -> ListUtils.convertToListString(List.of(1, 2, 3)));
|
||||
}
|
||||
}
|
||||
@@ -9,6 +9,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
class MapUtilsTest {
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -194,4 +195,23 @@ class MapUtilsTest {
|
||||
assertThat(results).hasSize(1);
|
||||
// due to ordering change on each JVM restart, the result map would be different as different entries will be skipped
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldFlattenANestedMap() {
|
||||
Map<String, Object> results = MapUtils.nestedToFlattenMap(Map.of("k1",Map.of("k2", Map.of("k3", "v1")), "k4", "v2"));
|
||||
|
||||
assertThat(results).hasSize(2);
|
||||
assertThat(results).containsAllEntriesOf(Map.of(
|
||||
"k1.k2.k3", "v1",
|
||||
"k4", "v2"
|
||||
));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldThrowIfNestedMapContainsMultipleEntries() {
|
||||
var exception = assertThrows(IllegalArgumentException.class,
|
||||
() -> MapUtils.nestedToFlattenMap(Map.of("k1", Map.of("k2", Map.of("k3", "v1"), "k4", "v2")))
|
||||
);
|
||||
assertThat(exception.getMessage()).isEqualTo("You cannot flatten a map with an entry that is a map of more than one element, conflicting key: k1");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,77 @@
|
||||
package io.kestra.core.validations;
|
||||
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.exceptions.BeanInstantiationException;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThatCode;
|
||||
import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
||||
|
||||
class AppConfigValidatorTest {
|
||||
|
||||
@Test
|
||||
void validateNoKestraUrl() {
|
||||
assertThatCode(() -> {
|
||||
try (ApplicationContext context = ApplicationContext.run()) {
|
||||
context.getBean(AppConfigValidator.class);
|
||||
}
|
||||
})
|
||||
.as("The bean got initialized properly including the PostConstruct validation")
|
||||
.doesNotThrowAnyException();
|
||||
}
|
||||
|
||||
@Test
|
||||
void validateValidKestraUrl() {
|
||||
assertThatCode(() -> {
|
||||
try (ApplicationContext context = ApplicationContext.builder()
|
||||
.deduceEnvironment(false)
|
||||
.properties(
|
||||
Map.of("kestra.url", "https://postgres-oss.preview.dev.kestra.io")
|
||||
)
|
||||
.start()
|
||||
) {
|
||||
context.getBean(AppConfigValidator.class);
|
||||
}
|
||||
})
|
||||
.as("The bean got initialized properly including the PostConstruct validation")
|
||||
.doesNotThrowAnyException();
|
||||
}
|
||||
|
||||
@Test
|
||||
void validateInvalidKestraUrl() {
|
||||
assertThatThrownBy(() -> {
|
||||
try (ApplicationContext context = ApplicationContext.builder()
|
||||
.deduceEnvironment(false)
|
||||
.properties(
|
||||
Map.of("kestra.url", "postgres-oss.preview.dev.kestra.io")
|
||||
)
|
||||
.start()
|
||||
) {
|
||||
context.getBean(AppConfigValidator.class);
|
||||
}
|
||||
})
|
||||
.as("The bean initialization failed at PostConstruct")
|
||||
.isInstanceOf(BeanInstantiationException.class)
|
||||
.hasMessageContaining("Invalid configuration");
|
||||
}
|
||||
|
||||
@Test
|
||||
void validateNonHttpKestraUrl() {
|
||||
assertThatThrownBy(() -> {
|
||||
try (ApplicationContext context = ApplicationContext.builder()
|
||||
.deduceEnvironment(false)
|
||||
.properties(
|
||||
Map.of("kestra.url", "ftp://postgres-oss.preview.dev.kestra.io")
|
||||
)
|
||||
.start()
|
||||
) {
|
||||
context.getBean(AppConfigValidator.class);
|
||||
}
|
||||
})
|
||||
.as("The bean initialization failed at PostConstruct")
|
||||
.isInstanceOf(BeanInstantiationException.class)
|
||||
.hasMessageContaining("Invalid configuration");
|
||||
}
|
||||
}
|
||||
@@ -54,4 +54,10 @@ class ForEachTest {
|
||||
assertThat(execution.findTaskRunsByTaskId("e1").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(execution.findTaskRunsByTaskId("e2").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ExecuteFlow("flows/valids/foreach-nested.yaml")
|
||||
void nested(Execution execution) {
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
}
|
||||
}
|
||||
@@ -2,7 +2,6 @@ package io.kestra.plugin.core.flow;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
import io.kestra.core.junit.annotations.ExecuteFlow;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
@@ -104,4 +103,61 @@ class RuntimeLabelsTest {
|
||||
new Label("taskRunId", labelsTaskRunId),
|
||||
new Label("existingLabel", "someValue"));
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/primitive-labels-flow.yml"})
|
||||
void primitiveTypeLabelsOverrideExistingLabels() throws TimeoutException, QueueException {
|
||||
Execution execution = runnerUtils.runOne(
|
||||
MAIN_TENANT,
|
||||
"io.kestra.tests",
|
||||
"primitive-labels-flow",
|
||||
null,
|
||||
(flow, createdExecution) -> Map.of(
|
||||
"intLabel", 42,
|
||||
"boolLabel", true,
|
||||
"floatLabel", 3.14f
|
||||
),
|
||||
null,
|
||||
List.of(
|
||||
new Label("intValue", "1"),
|
||||
new Label("boolValue", "false"),
|
||||
new Label("floatValue", "4.2f")
|
||||
)
|
||||
);
|
||||
|
||||
assertThat(execution.getTaskRunList()).hasSize(1);
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
|
||||
String labelsTaskRunId = execution.findTaskRunsByTaskId("update-labels").getFirst().getId();
|
||||
|
||||
assertThat(execution.getLabels()).containsExactlyInAnyOrder(
|
||||
new Label(Label.CORRELATION_ID, execution.getId()),
|
||||
new Label("intValue", "42"),
|
||||
new Label("boolValue", "true"),
|
||||
new Label("floatValue", "3.14"),
|
||||
new Label("taskRunId", labelsTaskRunId));
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/labels-update-task-deduplicate.yml"})
|
||||
void updateGetsDeduplicated() throws TimeoutException, QueueException {
|
||||
Execution execution = runnerUtils.runOne(
|
||||
MAIN_TENANT,
|
||||
"io.kestra.tests",
|
||||
"labels-update-task-deduplicate",
|
||||
null,
|
||||
(flow, createdExecution) -> Map.of(),
|
||||
null,
|
||||
List.of()
|
||||
);
|
||||
|
||||
assertThat(execution.getTaskRunList()).hasSize(2);
|
||||
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
|
||||
assertThat(execution.getLabels()).containsExactlyInAnyOrder(
|
||||
new Label(Label.CORRELATION_ID, execution.getId()),
|
||||
new Label("fromStringKey", "value2"),
|
||||
new Label("fromListKey", "value2")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,90 @@
|
||||
package io.kestra.plugin.core.flow;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||
import io.kestra.core.runners.RunnerUtils;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@KestraTest(startRunner = true)
|
||||
class SubflowRunnerTest {
|
||||
|
||||
@Inject
|
||||
private RunnerUtils runnerUtils;
|
||||
|
||||
@Inject
|
||||
private ExecutionRepositoryInterface executionRepository;
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.EXECUTION_NAMED)
|
||||
protected QueueInterface<Execution> executionQueue;
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/subflow-inherited-labels-child.yaml", "flows/valids/subflow-inherited-labels-parent.yaml"})
|
||||
void inheritedLabelsAreOverridden() throws QueueException, TimeoutException {
|
||||
Execution parentExecution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "subflow-inherited-labels-parent");
|
||||
|
||||
assertThat(parentExecution.getLabels()).containsExactlyInAnyOrder(
|
||||
new Label(Label.CORRELATION_ID, parentExecution.getId()),
|
||||
new Label("parentFlowLabel1", "value1"),
|
||||
new Label("parentFlowLabel2", "value2")
|
||||
);
|
||||
|
||||
String childExecutionId = (String) parentExecution.findTaskRunsByTaskId("launch").getFirst().getOutputs().get("executionId");
|
||||
|
||||
assertThat(childExecutionId).isNotBlank();
|
||||
|
||||
Execution childExecution = executionRepository.findById(MAIN_TENANT, childExecutionId).orElseThrow();
|
||||
|
||||
assertThat(childExecution.getLabels()).containsExactlyInAnyOrder(
|
||||
new Label(Label.CORRELATION_ID, parentExecution.getId()), // parent's correlation ID
|
||||
new Label("childFlowLabel1", "value1"), // defined by the subtask flow
|
||||
new Label("childFlowLabel2", "value2"), // defined by the subtask flow
|
||||
new Label("launchTaskLabel", "launchFoo"), // added by Subtask
|
||||
new Label("parentFlowLabel1", "launchBar"), // overridden by Subtask
|
||||
new Label("parentFlowLabel2", "value2") // inherited from the parent flow
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/subflow-parent-no-wait.yaml", "flows/valids/subflow-child-with-output.yaml"})
|
||||
void subflowOutputWithoutWait() throws QueueException, TimeoutException, InterruptedException {
|
||||
AtomicReference<Execution> childExecution = new AtomicReference<>();
|
||||
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
Runnable closing = executionQueue.receive(either -> {
|
||||
if (either.isLeft() && either.getLeft().getFlowId().equals("subflow-child-with-output") && either.getLeft().getState().isTerminated()) {
|
||||
childExecution.set(either.getLeft());
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Execution parentExecution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "subflow-parent-no-wait");
|
||||
String childExecutionId = (String) parentExecution.findTaskRunsByTaskId("subflow").getFirst().getOutputs().get("executionId");
|
||||
assertThat(childExecutionId).isNotBlank();
|
||||
assertThat(parentExecution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(parentExecution.getTaskRunList()).hasSize(1);
|
||||
|
||||
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
|
||||
assertThat(childExecution.get().getId()).isEqualTo(childExecutionId);
|
||||
assertThat(childExecution.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
|
||||
assertThat(childExecution.get().getTaskRunList()).hasSize(1);
|
||||
closing.run();
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,7 @@ namespace: io.kestra.tests
|
||||
tasks:
|
||||
- id: cache
|
||||
type: io.kestra.core.runners.TaskCacheTest$CounterTask
|
||||
workingDir: "{{workingDir}}"
|
||||
taskCache:
|
||||
enabled: true
|
||||
ttl: PT1S
|
||||
@@ -0,0 +1,21 @@
|
||||
id: flow-concurrency-for-each-item
|
||||
namespace: io.kestra.tests
|
||||
|
||||
inputs:
|
||||
- id: file
|
||||
type: FILE
|
||||
- id: batch
|
||||
type: INT
|
||||
|
||||
tasks:
|
||||
- id: each
|
||||
type: io.kestra.plugin.core.flow.ForEachItem
|
||||
items: "{{ inputs.file }}"
|
||||
batch:
|
||||
rows: "{{inputs.batch}}"
|
||||
namespace: io.kestra.tests
|
||||
flowId: flow-concurrency-queue
|
||||
wait: true
|
||||
transmitFailed: true
|
||||
inputs:
|
||||
items: "{{ taskrun.items }}"
|
||||
@@ -0,0 +1,13 @@
|
||||
id: flow-concurrency-queue-fail
|
||||
namespace: io.kestra.tests
|
||||
|
||||
concurrency:
|
||||
behavior: QUEUE
|
||||
limit: 1
|
||||
|
||||
tasks:
|
||||
- id: sleep
|
||||
type: io.kestra.plugin.core.flow.Sleep
|
||||
duration: PT2S
|
||||
- id: fail
|
||||
type: io.kestra.plugin.core.execution.Fail
|
||||
21
core/src/test/resources/flows/valids/foreach-nested.yaml
Normal file
21
core/src/test/resources/flows/valids/foreach-nested.yaml
Normal file
@@ -0,0 +1,21 @@
|
||||
id: foreach-nested
|
||||
namespace: io.kestra.tests
|
||||
tasks:
|
||||
- id: each0
|
||||
type: io.kestra.plugin.core.flow.ForEach
|
||||
values: ["l1", "l2"]
|
||||
tasks:
|
||||
- id: each1
|
||||
type: io.kestra.plugin.core.flow.ForEach
|
||||
concurrencyLimit: 0
|
||||
values: ["d1", "d2", "d3"]
|
||||
tasks:
|
||||
- id: p1
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: "{{ parent.taskrun.value }}-{{ taskrun.value }}"
|
||||
- id: p2
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: "{{ outputs.p1[parent.taskrun.value][taskrun.value].value }}"
|
||||
- id: test
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: "{{ outputs.p1 }}"
|
||||
@@ -0,0 +1,14 @@
|
||||
id: labels-update-task-deduplicate
|
||||
namespace: io.kestra.tests
|
||||
|
||||
tasks:
|
||||
- id: from-string
|
||||
type: io.kestra.plugin.core.execution.Labels
|
||||
labels: "{ \"fromStringKey\": \"value1\", \"fromStringKey\": \"value2\" }"
|
||||
- id: from-list
|
||||
type: io.kestra.plugin.core.execution.Labels
|
||||
labels:
|
||||
- key: "fromListKey"
|
||||
value: "value1"
|
||||
- key: "fromListKey"
|
||||
value: "value2"
|
||||
@@ -0,0 +1,12 @@
|
||||
id: subflow-child-with-output
|
||||
namespace: io.kestra.tests
|
||||
|
||||
tasks:
|
||||
- id: return
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: "Some value"
|
||||
|
||||
outputs:
|
||||
- id: flow_a_output
|
||||
type: STRING
|
||||
value: "{{ outputs.return.value }}"
|
||||
@@ -0,0 +1,11 @@
|
||||
id: subflow-inherited-labels-child
|
||||
namespace: io.kestra.tests
|
||||
|
||||
labels:
|
||||
childFlowLabel1: value1
|
||||
childFlowLabel2: value2
|
||||
|
||||
tasks:
|
||||
- id: return
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: "{{ execution.id }}"
|
||||
@@ -0,0 +1,18 @@
|
||||
id: subflow-inherited-labels-parent
|
||||
namespace: io.kestra.tests
|
||||
|
||||
labels:
|
||||
parentFlowLabel1: value1
|
||||
parentFlowLabel2: value2
|
||||
|
||||
tasks:
|
||||
- id: launch
|
||||
type: io.kestra.plugin.core.flow.Subflow
|
||||
namespace: io.kestra.tests
|
||||
flowId: subflow-inherited-labels-child
|
||||
wait: true
|
||||
transmitFailed: true
|
||||
inheritLabels: true
|
||||
labels:
|
||||
launchTaskLabel: launchFoo
|
||||
parentFlowLabel1: launchBar
|
||||
@@ -0,0 +1,9 @@
|
||||
id: subflow-parent-no-wait
|
||||
namespace: io.kestra.tests
|
||||
|
||||
tasks:
|
||||
- id: subflow
|
||||
type: io.kestra.plugin.core.flow.Subflow
|
||||
namespace: io.kestra.tests
|
||||
flowId: subflow-child-with-output
|
||||
wait: false
|
||||
@@ -1,152 +1,199 @@
|
||||
#!/bin/bash
|
||||
#!/usr/bin/env bash
|
||||
#===============================================================================
|
||||
# SCRIPT: release-plugins.sh
|
||||
#
|
||||
# DESCRIPTION:
|
||||
# This script can be used to run a ./gradlew release command on each kestra plugin repository.
|
||||
# By default, if no `GITHUB_PAT` environment variable exist, the script will attempt to clone GitHub repositories using SSH_KEY.
|
||||
# Runs Gradle release for one or multiple Kestra plugin repositories.
|
||||
# - If $GITHUB_PAT is set, HTTPS cloning via PAT is used.
|
||||
# - Otherwise, SSH cloning is used (requires SSH key configured on runner).
|
||||
#
|
||||
# USAGE:
|
||||
# ./release-plugins.sh [options] [plugin-repositories...]
|
||||
#
|
||||
# USAGE: ./release-plugins.sh [options]
|
||||
# OPTIONS:
|
||||
# --release-version <version> Specify the release version (required)
|
||||
# --next-version <version> Specify the next version (required)
|
||||
# --dry-run Specify to run in DRY_RUN.
|
||||
# -y, --yes Automatically confirm prompts (non-interactive).
|
||||
# -h, --help Show the help message and exit
|
||||
|
||||
# --release-version <version> Specify the release version (required).
|
||||
# --next-version <version> Specify the next (development) version (required).
|
||||
# --plugin-file <path> File containing the plugin list (default: ../.plugins).
|
||||
# --dry-run Run in DRY_RUN mode (no publish, no changes pushed).
|
||||
# --only-changed Skip repositories with no commits since last tag (or --since-tag).
|
||||
# --since-tag <tag> Use this tag as base for change detection (default: last tag).
|
||||
# -y, --yes Automatically confirm prompts (non-interactive).
|
||||
# -h, --help Show this help message and exit.
|
||||
#
|
||||
# EXAMPLES:
|
||||
# To release all plugins:
|
||||
# ./release-plugins.sh --release-version=0.20.0 --next-version=0.21.0-SNAPSHOT
|
||||
# To release a specific plugin:
|
||||
# ./release-plugins.sh --release-version=0.20.0 --next-version=0.21.0-SNAPSHOT plugin-kubernetes
|
||||
# To release specific plugins from file:
|
||||
# ./release-plugins.sh --release-version=0.20.0 --plugin-file .plugins
|
||||
# # Release all plugins from .plugins:
|
||||
# ./release-plugins.sh --release-version=1.0.0 --next-version=1.1.0-SNAPSHOT
|
||||
#
|
||||
# # Release a specific plugin:
|
||||
# ./release-plugins.sh --release-version=1.0.0 --next-version=1.1.0-SNAPSHOT plugin-kubernetes
|
||||
#
|
||||
# # Release specific plugins from file:
|
||||
# ./release-plugins.sh --release-version=1.0.0 --next-version=1.1.0-SNAPSHOT --plugin-file .plugins
|
||||
#
|
||||
# # Release only plugins that have changed since the last tag:
|
||||
# ./release-plugins.sh --release-version=1.0.0 --next-version=1.1.0-SNAPSHOT --only-changed --yes
|
||||
#===============================================================================
|
||||
|
||||
set -e;
|
||||
set -euo pipefail
|
||||
|
||||
###############################################################
|
||||
# Global vars
|
||||
# Globals
|
||||
###############################################################
|
||||
BASEDIR=$(dirname "$(readlink -f $0)")
|
||||
WORKING_DIR=/tmp/kestra-release-plugins-$(date +%s);
|
||||
BASEDIR=$(dirname "$(readlink -f "$0")")
|
||||
WORKING_DIR="/tmp/kestra-release-plugins-$(date +%s)"
|
||||
PLUGIN_FILE="$BASEDIR/../.plugins"
|
||||
GIT_BRANCH=master
|
||||
GIT_BRANCH="master" # Fallback if default branch cannot be detected
|
||||
|
||||
###############################################################
|
||||
# Functions
|
||||
###############################################################
|
||||
|
||||
# Function to display the help message
|
||||
usage() {
|
||||
echo "Usage: $0 --release-version <version> --next-version [plugin-repositories...]"
|
||||
echo
|
||||
echo "Options:"
|
||||
echo " --release-version <version> Specify the release version (required)."
|
||||
echo " --next-version <version> Specify the next version (required)."
|
||||
echo " --plugin-file File containing the plugin list (default: .plugins)"
|
||||
echo " --dry-run Specify to run in DRY_RUN."
|
||||
echo " -y, --yes Automatically confirm prompts (non-interactive)."
|
||||
echo " -h, --help Show this help message and exit."
|
||||
exit 1
|
||||
echo "Usage: $0 --release-version <version> --next-version <version> [options] [plugin-repositories...]"
|
||||
echo
|
||||
echo "Options:"
|
||||
echo " --release-version <version> Specify the release version (required)."
|
||||
echo " --next-version <version> Specify the next version (required)."
|
||||
echo " --plugin-file <path> File containing the plugin list (default: ../.plugins)."
|
||||
echo " --dry-run Run in DRY_RUN mode."
|
||||
echo " --only-changed Skip repositories with no commits since last tag (or --since-tag)."
|
||||
echo " --since-tag <tag> Use this tag as base for change detection (default: last tag)."
|
||||
echo " -y, --yes Automatically confirm prompts (non-interactive)."
|
||||
echo " -h, --help Show this help message and exit."
|
||||
exit 1
|
||||
}
|
||||
|
||||
# Function to ask to continue
|
||||
function askToContinue() {
|
||||
read -p "Are you sure you want to continue? [y/N] " confirm
|
||||
askToContinue() {
|
||||
read -r -p "Are you sure you want to continue? [y/N] " confirm
|
||||
[[ "$confirm" =~ ^[Yy]$ ]] || { echo "Operation cancelled."; exit 1; }
|
||||
}
|
||||
|
||||
# Detect default branch from remote; fallback to $GIT_BRANCH if unknown
|
||||
detect_default_branch() {
|
||||
local default_branch
|
||||
default_branch=$(git remote show origin | sed -n '/HEAD branch/s/.*: //p' || true)
|
||||
if [[ -z "${default_branch:-}" ]]; then
|
||||
default_branch="$GIT_BRANCH"
|
||||
fi
|
||||
echo "$default_branch"
|
||||
}
|
||||
|
||||
# Return last tag that matches v* or any tag if v* not found; empty if none
|
||||
last_tag_or_empty() {
|
||||
local tag
|
||||
tag=$(git tag --list 'v*' --sort=-v:refname | head -n1 || true)
|
||||
if [[ -z "${tag:-}" ]]; then
|
||||
tag=$(git tag --sort=-creatordate | head -n1 || true)
|
||||
fi
|
||||
echo "$tag"
|
||||
}
|
||||
|
||||
# True (0) if there are commits since tag on branch, False (1) otherwise.
|
||||
has_changes_since_tag() {
|
||||
local tag="$1"
|
||||
local branch="$2"
|
||||
if [[ -z "$tag" ]]; then
|
||||
# No tag => consider it as changed (first release)
|
||||
return 0
|
||||
fi
|
||||
git fetch --tags --quiet
|
||||
git fetch origin "$branch" --quiet
|
||||
local count
|
||||
count=$(git rev-list --count "${tag}..origin/${branch}" || echo "0")
|
||||
[[ "${count}" -gt 0 ]]
|
||||
}
|
||||
|
||||
###############################################################
|
||||
# Options
|
||||
# Options parsing
|
||||
###############################################################
|
||||
|
||||
PLUGINS_ARGS=()
|
||||
AUTO_YES=false
|
||||
DRY_RUN=false
|
||||
# Get the options
|
||||
ONLY_CHANGED=false
|
||||
SINCE_TAG=""
|
||||
|
||||
RELEASE_VERSION=""
|
||||
NEXT_VERSION=""
|
||||
|
||||
while [[ "$#" -gt 0 ]]; do
|
||||
case "$1" in
|
||||
--release-version)
|
||||
RELEASE_VERSION="$2"
|
||||
shift 2
|
||||
;;
|
||||
--release-version=*)
|
||||
RELEASE_VERSION="${1#*=}"
|
||||
shift
|
||||
;;
|
||||
--next-version)
|
||||
NEXT_VERSION="$2"
|
||||
shift 2
|
||||
;;
|
||||
--next-version=*)
|
||||
NEXT_VERSION="${1#*=}"
|
||||
shift
|
||||
;;
|
||||
--plugin-file)
|
||||
PLUGIN_FILE="$2"
|
||||
shift 2
|
||||
;;
|
||||
--plugin-file=*)
|
||||
PLUGIN_FILE="${1#*=}"
|
||||
shift
|
||||
;;
|
||||
--dry-run)
|
||||
DRY_RUN=true
|
||||
shift
|
||||
;;
|
||||
-y|--yes)
|
||||
AUTO_YES=true
|
||||
shift
|
||||
;;
|
||||
-h|--help)
|
||||
usage
|
||||
;;
|
||||
*)
|
||||
PLUGINS_ARGS+=("$1")
|
||||
shift
|
||||
;;
|
||||
esac
|
||||
case "$1" in
|
||||
--release-version)
|
||||
RELEASE_VERSION="$2"; shift 2 ;;
|
||||
--release-version=*)
|
||||
RELEASE_VERSION="${1#*=}"; shift ;;
|
||||
--next-version)
|
||||
NEXT_VERSION="$2"; shift 2 ;;
|
||||
--next-version=*)
|
||||
NEXT_VERSION="${1#*=}"; shift ;;
|
||||
--plugin-file)
|
||||
PLUGIN_FILE="$2"; shift 2 ;;
|
||||
--plugin-file=*)
|
||||
PLUGIN_FILE="${1#*=}"; shift ;;
|
||||
--dry-run)
|
||||
DRY_RUN=true; shift ;;
|
||||
--only-changed)
|
||||
ONLY_CHANGED=true; shift ;;
|
||||
--since-tag)
|
||||
SINCE_TAG="$2"; shift 2 ;;
|
||||
--since-tag=*)
|
||||
SINCE_TAG="${1#*=}"; shift ;;
|
||||
-y|--yes)
|
||||
AUTO_YES=true; shift ;;
|
||||
-h|--help)
|
||||
usage ;;
|
||||
*)
|
||||
PLUGINS_ARGS+=("$1"); shift ;;
|
||||
esac
|
||||
done
|
||||
|
||||
## Check options
|
||||
# Required options
|
||||
if [[ -z "$RELEASE_VERSION" ]]; then
|
||||
echo -e "Missing required argument: --release-version\n";
|
||||
usage
|
||||
echo -e "Missing required argument: --release-version\n"; usage
|
||||
fi
|
||||
|
||||
if [[ -z "$NEXT_VERSION" ]]; then
|
||||
echo -e "Missing required argument: --next-version\n";
|
||||
usage
|
||||
echo -e "Missing required argument: --next-version\n"; usage
|
||||
fi
|
||||
|
||||
## Get plugin list
|
||||
###############################################################
|
||||
# Build plugin list (from args or from .plugins)
|
||||
###############################################################
|
||||
PLUGINS_ARRAY=()
|
||||
PLUGINS_COUNT=0
|
||||
|
||||
if [[ "${#PLUGINS_ARGS[@]}" -eq 0 ]]; then
|
||||
if [ -f "$PLUGIN_FILE" ]; then
|
||||
PLUGINS=$(cat "$PLUGIN_FILE" | grep "io\\.kestra\\." | sed -e '/#/s/^.//' | cut -d':' -f1 | uniq | sort);
|
||||
PLUGINS_COUNT=$(echo "$PLUGINS" | wc -l);
|
||||
PLUGINS_ARRAY=$(echo "$PLUGINS" | xargs || echo '');
|
||||
PLUGINS_ARRAY=($PLUGINS_ARRAY);
|
||||
if [[ -f "$PLUGIN_FILE" ]]; then
|
||||
# Keep only uncommented lines, then keep the first column (repo name)
|
||||
mapfile -t PLUGINS_ARRAY < <(
|
||||
grep -E '^\s*[^#]' "$PLUGIN_FILE" 2>/dev/null \
|
||||
| grep "io\.kestra\." \
|
||||
| cut -d':' -f1 \
|
||||
| uniq | sort
|
||||
)
|
||||
PLUGINS_COUNT="${#PLUGINS_ARRAY[@]}"
|
||||
else
|
||||
echo "Plugin file not found: $PLUGIN_FILE"
|
||||
exit 1
|
||||
fi
|
||||
else
|
||||
PLUGINS_ARRAY=("${PLUGINS_ARGS[@]}")
|
||||
PLUGINS_COUNT="${#PLUGINS_ARGS[@]}"
|
||||
fi
|
||||
|
||||
# Extract the major and minor versions
|
||||
# Extract major.minor (e.g. 0.21) to build the release branch name
|
||||
BASE_VERSION=$(echo "$RELEASE_VERSION" | sed -E 's/^([0-9]+\.[0-9]+)\..*/\1/')
|
||||
PUSH_RELEASE_BRANCH="releases/v${BASE_VERSION}.x"
|
||||
|
||||
## Get plugin list
|
||||
echo "RELEASE_VERSION=$RELEASE_VERSION"
|
||||
echo "NEXT_VERSION=$NEXT_VERSION"
|
||||
echo "PUSH_RELEASE_BRANCH=$PUSH_RELEASE_BRANCH"
|
||||
echo "GIT_BRANCH=$GIT_BRANCH"
|
||||
echo "GIT_BRANCH=$GIT_BRANCH (fallback)"
|
||||
echo "DRY_RUN=$DRY_RUN"
|
||||
echo "Found ($PLUGINS_COUNT) plugin repositories:";
|
||||
|
||||
echo "ONLY_CHANGED=$ONLY_CHANGED"
|
||||
echo "SINCE_TAG=${SINCE_TAG:-<auto>}"
|
||||
echo "Found ($PLUGINS_COUNT) plugin repositories:"
|
||||
for PLUGIN in "${PLUGINS_ARRAY[@]}"; do
|
||||
echo "$PLUGIN"
|
||||
echo " - $PLUGIN"
|
||||
done
|
||||
|
||||
if [[ "$AUTO_YES" == false ]]; then
|
||||
@@ -156,49 +203,77 @@ fi
|
||||
###############################################################
|
||||
# Main
|
||||
###############################################################
|
||||
mkdir -p $WORKING_DIR
|
||||
mkdir -p "$WORKING_DIR"
|
||||
|
||||
COUNTER=1;
|
||||
for PLUGIN in "${PLUGINS_ARRAY[@]}"
|
||||
do
|
||||
cd $WORKING_DIR;
|
||||
COUNTER=1
|
||||
for PLUGIN in "${PLUGINS_ARRAY[@]}"; do
|
||||
cd "$WORKING_DIR"
|
||||
|
||||
echo "---------------------------------------------------------------------------------------"
|
||||
echo "[$COUNTER/$PLUGINS_COUNT] Release Plugin: $PLUGIN"
|
||||
echo "---------------------------------------------------------------------------------------"
|
||||
if [[ -z "${GITHUB_PAT}" ]]; then
|
||||
git clone git@github.com:kestra-io/$PLUGIN
|
||||
|
||||
# Clone the repo using SSH, otherwise PAT if provided
|
||||
if [[ -z "${GITHUB_PAT:-}" ]]; then
|
||||
git clone "git@github.com:kestra-io/${PLUGIN}.git"
|
||||
else
|
||||
echo "Clone git repository using GITHUB PAT"
|
||||
git clone https://${GITHUB_PAT}@github.com/kestra-io/$PLUGIN.git
|
||||
git clone "https://${GITHUB_PAT}@github.com/kestra-io/${PLUGIN}.git"
|
||||
fi
|
||||
cd "$PLUGIN";
|
||||
|
||||
if [[ "$PLUGIN" == "plugin-transform" ]] && [[ "$GIT_BRANCH" == "master" ]]; then # quickfix
|
||||
git checkout main;
|
||||
else
|
||||
git checkout "$GIT_BRANCH";
|
||||
cd "$PLUGIN"
|
||||
|
||||
# Determine the default branch dynamically to avoid hardcoding "master"/"main"
|
||||
DEFAULT_BRANCH=$(detect_default_branch)
|
||||
git checkout "$DEFAULT_BRANCH"
|
||||
|
||||
# Skip if the release tag already exists on remote (check both with and without 'v' prefix)
|
||||
TAG_EXISTS=$(
|
||||
{ git ls-remote --tags origin "refs/tags/v${RELEASE_VERSION}" \
|
||||
&& git ls-remote --tags origin "refs/tags/${RELEASE_VERSION}"; } | wc -l
|
||||
)
|
||||
if [[ "$TAG_EXISTS" -ne 0 ]]; then
|
||||
echo "Tag ${RELEASE_VERSION} already exists for $PLUGIN. Skipping..."
|
||||
COUNTER=$(( COUNTER + 1 ))
|
||||
continue
|
||||
fi
|
||||
|
||||
# Change detection (if requested)
|
||||
if [[ "$ONLY_CHANGED" == true ]]; then
|
||||
git fetch --tags --quiet
|
||||
git fetch origin "$DEFAULT_BRANCH" --quiet
|
||||
BASE_TAG="$SINCE_TAG"
|
||||
if [[ -z "$BASE_TAG" ]]; then
|
||||
BASE_TAG=$(last_tag_or_empty)
|
||||
fi
|
||||
|
||||
if has_changes_since_tag "$BASE_TAG" "$DEFAULT_BRANCH"; then
|
||||
echo "Changes detected since ${BASE_TAG:-<no-tag>} on ${DEFAULT_BRANCH}, proceeding."
|
||||
else
|
||||
echo "No changes since ${BASE_TAG:-<no-tag>} on ${DEFAULT_BRANCH} for $PLUGIN. Skipping..."
|
||||
COUNTER=$(( COUNTER + 1 ))
|
||||
continue
|
||||
fi
|
||||
fi
|
||||
|
||||
if [[ "$DRY_RUN" == false ]]; then
|
||||
CURRENT_BRANCH=$(git branch --show-current);
|
||||
|
||||
echo "Run gradle release for plugin: $PLUGIN";
|
||||
echo "Branch: $CURRENT_BRANCH";
|
||||
CURRENT_BRANCH=$(git branch --show-current)
|
||||
echo "Run gradle release for plugin: $PLUGIN"
|
||||
echo "Branch: $CURRENT_BRANCH"
|
||||
|
||||
if [[ "$AUTO_YES" == false ]]; then
|
||||
askToContinue
|
||||
fi
|
||||
|
||||
# Create and push release branch
|
||||
git checkout -b "$PUSH_RELEASE_BRANCH";
|
||||
git push -u origin "$PUSH_RELEASE_BRANCH";
|
||||
# Create and push the release branch (branch that will hold the release versions)
|
||||
git checkout -b "$PUSH_RELEASE_BRANCH"
|
||||
git push -u origin "$PUSH_RELEASE_BRANCH"
|
||||
|
||||
# Run gradle release
|
||||
git checkout "$CURRENT_BRANCH";
|
||||
# Switch back to the working branch to run the gradle release
|
||||
git checkout "$CURRENT_BRANCH"
|
||||
|
||||
# Run Gradle release with snapshot tolerance if releaseVersion contains -SNAPSHOT
|
||||
if [[ "$RELEASE_VERSION" == *"-SNAPSHOT" ]]; then
|
||||
# -SNAPSHOT qualifier maybe used to test release-candidates
|
||||
./gradlew release -Prelease.useAutomaticVersion=true \
|
||||
-Prelease.releaseVersion="${RELEASE_VERSION}" \
|
||||
-Prelease.newVersion="${NEXT_VERSION}" \
|
||||
@@ -211,19 +286,28 @@ do
|
||||
-Prelease.pushReleaseVersionBranch="${PUSH_RELEASE_BRANCH}"
|
||||
fi
|
||||
|
||||
git push;
|
||||
# Update the upper bound version of kestra
|
||||
# Push new commits/tags created by the release plugin
|
||||
git push --follow-tags
|
||||
|
||||
# Update the upper bound version of Kestra on the release branch (e.g., [0.21,))
|
||||
PLUGIN_KESTRA_VERSION="[${BASE_VERSION},)"
|
||||
git checkout "$PUSH_RELEASE_BRANCH" && git pull;
|
||||
git checkout "$PUSH_RELEASE_BRANCH" && git pull --ff-only
|
||||
sed -i "s/^kestraVersion=.*/kestraVersion=${PLUGIN_KESTRA_VERSION}/" ./gradle.properties
|
||||
git add ./gradle.properties
|
||||
git commit -m"chore(deps): update kestraVersion to ${PLUGIN_KESTRA_VERSION}."
|
||||
git push
|
||||
sleep 5; # add a short delay to not spam Maven Central
|
||||
else
|
||||
echo "Skip gradle release [DRY_RUN=true]";
|
||||
fi
|
||||
COUNTER=$(( COUNTER + 1 ));
|
||||
done;
|
||||
|
||||
exit 0;
|
||||
# Commit only if there are actual changes staged
|
||||
if ! git diff --cached --quiet; then
|
||||
git commit -m "chore(deps): update kestraVersion to ${PLUGIN_KESTRA_VERSION}."
|
||||
git push
|
||||
fi
|
||||
|
||||
# Small delay to avoid hammering Maven Central
|
||||
sleep 5
|
||||
else
|
||||
echo "Skip gradle release [DRY_RUN=true]"
|
||||
fi
|
||||
|
||||
COUNTER=$(( COUNTER + 1 ))
|
||||
done
|
||||
|
||||
exit 0
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
version=0.24.0-SNAPSHOT
|
||||
version=1.0.0-SNAPSHOT
|
||||
|
||||
org.gradle.jvmargs=-Xmx2g -XX:MaxMetaspaceSize=512m -XX:+HeapDumpOnOutOfMemoryError
|
||||
org.gradle.parallel=true
|
||||
|
||||
@@ -0,0 +1,15 @@
|
||||
package io.kestra.runner.h2;
|
||||
|
||||
import io.kestra.core.runners.ExecutionRunning;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutionRunningStorage;
|
||||
import io.kestra.repository.h2.H2Repository;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@Singleton
|
||||
@H2QueueEnabled
|
||||
public class H2ExecutionRunningStorage extends AbstractJdbcExecutionRunningStorage {
|
||||
public H2ExecutionRunningStorage(@Named("executionrunning") H2Repository<ExecutionRunning> repository) {
|
||||
super(repository);
|
||||
}
|
||||
}
|
||||
@@ -144,4 +144,12 @@ public class H2QueueFactory implements QueueFactoryInterface {
|
||||
public QueueInterface<SubflowExecutionEnd> subflowExecutionEnd() {
|
||||
return new H2Queue<>(SubflowExecutionEnd.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
|
||||
@Bean(preDestroy = "close")
|
||||
public QueueInterface<ExecutionRunning> executionRunning() {
|
||||
return new H2Queue<>(ExecutionRunning.class, applicationContext);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,29 @@
|
||||
CREATE TABLE IF NOT EXISTS execution_running (
|
||||
"key" VARCHAR(250) NOT NULL PRIMARY KEY,
|
||||
"value" TEXT NOT NULL,
|
||||
"tenant_id" VARCHAR(250) GENERATED ALWAYS AS (JQ_STRING("value", '.tenantId')),
|
||||
"namespace" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.namespace')),
|
||||
"flow_id" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.flowId'))
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS execution_running__flow ON execution_running ("tenant_id", "namespace", "flow_id");
|
||||
|
||||
ALTER TABLE queues ALTER COLUMN "type" ENUM(
|
||||
'io.kestra.core.models.executions.Execution',
|
||||
'io.kestra.core.models.templates.Template',
|
||||
'io.kestra.core.models.executions.ExecutionKilled',
|
||||
'io.kestra.core.runners.WorkerJob',
|
||||
'io.kestra.core.runners.WorkerTaskResult',
|
||||
'io.kestra.core.runners.WorkerInstance',
|
||||
'io.kestra.core.runners.WorkerTaskRunning',
|
||||
'io.kestra.core.models.executions.LogEntry',
|
||||
'io.kestra.core.models.triggers.Trigger',
|
||||
'io.kestra.ee.models.audits.AuditLog',
|
||||
'io.kestra.core.models.executions.MetricEntry',
|
||||
'io.kestra.core.runners.WorkerTriggerResult',
|
||||
'io.kestra.core.runners.SubflowExecutionResult',
|
||||
'io.kestra.core.server.ClusterEvent',
|
||||
'io.kestra.core.runners.SubflowExecutionEnd',
|
||||
'io.kestra.core.models.flows.FlowInterface',
|
||||
'io.kestra.core.runners.ExecutionRunning'
|
||||
) NOT NULL
|
||||
@@ -0,0 +1,15 @@
|
||||
package io.kestra.runner.mysql;
|
||||
|
||||
import io.kestra.core.runners.ExecutionRunning;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutionRunningStorage;
|
||||
import io.kestra.repository.mysql.MysqlRepository;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@Singleton
|
||||
@MysqlQueueEnabled
|
||||
public class MysqlExecutionRunningStorage extends AbstractJdbcExecutionRunningStorage {
|
||||
public MysqlExecutionRunningStorage(@Named("executionrunning") MysqlRepository<ExecutionRunning> repository) {
|
||||
super(repository);
|
||||
}
|
||||
}
|
||||
@@ -144,4 +144,12 @@ public class MysqlQueueFactory implements QueueFactoryInterface {
|
||||
public QueueInterface<SubflowExecutionEnd> subflowExecutionEnd() {
|
||||
return new MysqlQueue<>(SubflowExecutionEnd.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
|
||||
@Bean(preDestroy = "close")
|
||||
public QueueInterface<ExecutionRunning> executionRunning() {
|
||||
return new MysqlQueue<>(ExecutionRunning.class, applicationContext);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
CREATE TABLE IF NOT EXISTS execution_running (
|
||||
`key` VARCHAR(250) NOT NULL PRIMARY KEY,
|
||||
`value` JSON NOT NULL,
|
||||
`tenant_id` VARCHAR(250) GENERATED ALWAYS AS (value ->> '$.tenantId') STORED,
|
||||
`namespace` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.namespace') STORED NOT NULL,
|
||||
`flow_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.flowId') STORED NOT NULL,
|
||||
INDEX ix_flow (tenant_id, namespace, flow_id)
|
||||
);
|
||||
|
||||
ALTER TABLE queues MODIFY COLUMN `type` ENUM(
|
||||
'io.kestra.core.models.executions.Execution',
|
||||
'io.kestra.core.models.templates.Template',
|
||||
'io.kestra.core.models.executions.ExecutionKilled',
|
||||
'io.kestra.core.runners.WorkerJob',
|
||||
'io.kestra.core.runners.WorkerTaskResult',
|
||||
'io.kestra.core.runners.WorkerInstance',
|
||||
'io.kestra.core.runners.WorkerTaskRunning',
|
||||
'io.kestra.core.models.executions.LogEntry',
|
||||
'io.kestra.core.models.triggers.Trigger',
|
||||
'io.kestra.ee.models.audits.AuditLog',
|
||||
'io.kestra.core.models.executions.MetricEntry',
|
||||
'io.kestra.core.runners.WorkerTriggerResult',
|
||||
'io.kestra.core.runners.SubflowExecutionResult',
|
||||
'io.kestra.core.server.ClusterEvent',
|
||||
'io.kestra.core.runners.SubflowExecutionEnd',
|
||||
'io.kestra.core.models.flows.FlowInterface',
|
||||
'io.kestra.core.runners.ExecutionRunning'
|
||||
) NOT NULL;
|
||||
@@ -0,0 +1 @@
|
||||
ALTER TABLE queues MODIFY COLUMN `offset` BIGINT NOT NULL AUTO_INCREMENT;
|
||||
@@ -0,0 +1,15 @@
|
||||
package io.kestra.runner.postgres;
|
||||
|
||||
import io.kestra.core.runners.ExecutionRunning;
|
||||
import io.kestra.jdbc.runner.AbstractJdbcExecutionRunningStorage;
|
||||
import io.kestra.repository.postgres.PostgresRepository;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@Singleton
|
||||
@PostgresQueueEnabled
|
||||
public class PostgresExecutionRunningStorage extends AbstractJdbcExecutionRunningStorage {
|
||||
public PostgresExecutionRunningStorage(@Named("executionrunning") PostgresRepository<ExecutionRunning> repository) {
|
||||
super(repository);
|
||||
}
|
||||
}
|
||||
@@ -144,4 +144,12 @@ public class PostgresQueueFactory implements QueueFactoryInterface {
|
||||
public QueueInterface<SubflowExecutionEnd> subflowExecutionEnd() {
|
||||
return new PostgresQueue<>(SubflowExecutionEnd.class, applicationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Singleton
|
||||
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
|
||||
@Bean(preDestroy = "close")
|
||||
public QueueInterface<ExecutionRunning> executionRunning() {
|
||||
return new PostgresQueue<>(ExecutionRunning.class, applicationContext);
|
||||
}
|
||||
}
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user