mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 11:12:12 -05:00
Compare commits
207 Commits
fix/remove
...
v0.21.10
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1cff398fca | ||
|
|
8e335ce1c3 | ||
|
|
da15fff781 | ||
|
|
0c2f0ddc1b | ||
|
|
724a9e28ca | ||
|
|
df31d9cc4a | ||
|
|
560f638657 | ||
|
|
c71366ad4d | ||
|
|
0bf60bd5fc | ||
|
|
e557ec45f5 | ||
|
|
ed1bbb9444 | ||
|
|
1da2e25f34 | ||
|
|
5bb2e6264c | ||
|
|
d89b22cce7 | ||
|
|
cebbcc8f5a | ||
|
|
9b8db2014d | ||
|
|
1a8750dc96 | ||
|
|
072674a1e1 | ||
|
|
0fd60f811a | ||
|
|
d1a63e71bb | ||
|
|
17ed32556a | ||
|
|
7dd7d4507e | ||
|
|
54da3fc268 | ||
|
|
ce94c9ad29 | ||
|
|
f80bdc2909 | ||
|
|
f0fcdf4851 | ||
|
|
2aeb11e2da | ||
|
|
368f4f22db | ||
|
|
3f7c385aba | ||
|
|
59dc598ee9 | ||
|
|
49a8d13b85 | ||
|
|
67e4154069 | ||
|
|
4bb2de219d | ||
|
|
27c8762628 | ||
|
|
3d29077a99 | ||
|
|
33ac1ac535 | ||
|
|
eec8fb9fb4 | ||
|
|
df5d13467a | ||
|
|
65999abb3f | ||
|
|
58d82b79f8 | ||
|
|
5131c347cd | ||
|
|
b7259cc796 | ||
|
|
ea26e4dda7 | ||
|
|
5b1d216c40 | ||
|
|
49977f505f | ||
|
|
1dc8401a0e | ||
|
|
c004ba1a6b | ||
|
|
cfe4e2a3c2 | ||
|
|
d4b7650633 | ||
|
|
312221ef73 | ||
|
|
9774d46d8b | ||
|
|
b3699eda5f | ||
|
|
527c13dec6 | ||
|
|
e89b102ea3 | ||
|
|
0933439503 | ||
|
|
09d5b2f584 | ||
|
|
4abbc4cbc7 | ||
|
|
d84d66005a | ||
|
|
e4c7c0f103 | ||
|
|
278289a0c7 | ||
|
|
07329555c0 | ||
|
|
49cfb8f425 | ||
|
|
9aeda7160b | ||
|
|
51e5d35e67 | ||
|
|
522cbbc9f9 | ||
|
|
82ef34c085 | ||
|
|
e312ece25f | ||
|
|
fffac0f905 | ||
|
|
01e4f7f8cd | ||
|
|
6f24dac816 | ||
|
|
f3c3c65374 | ||
|
|
035d9a56d7 | ||
|
|
6966779f3c | ||
|
|
bed0470b73 | ||
|
|
43b55d8e56 | ||
|
|
bdf7a1681c | ||
|
|
592a99e669 | ||
|
|
3269ee2266 | ||
|
|
b0fa3ddd56 | ||
|
|
4c2317ddd4 | ||
|
|
f866af5ee8 | ||
|
|
f3f691431b | ||
|
|
38ba665ef1 | ||
|
|
6929ca1963 | ||
|
|
23bde6b716 | ||
|
|
0b2df61c2e | ||
|
|
d30b331b3c | ||
|
|
1fa026f0ee | ||
|
|
3a39c65829 | ||
|
|
b174a81562 | ||
|
|
077421d59c | ||
|
|
fcf999ff61 | ||
|
|
3e2f798ccf | ||
|
|
69faecf339 | ||
|
|
aa3a6854ae | ||
|
|
bb6edfff98 | ||
|
|
f7b495d22f | ||
|
|
eaf63f307c | ||
|
|
905f778204 | ||
|
|
0b15711b23 | ||
|
|
a51b193f4b | ||
|
|
42a7938d38 | ||
|
|
5783a95db3 | ||
|
|
785afe7884 | ||
|
|
28fea2e5dc | ||
|
|
dcc59fde35 | ||
|
|
4e9ac8b3a2 | ||
|
|
5d5b74613b | ||
|
|
44c149e8d5 | ||
|
|
c262525341 | ||
|
|
7da24df76f | ||
|
|
2664307517 | ||
|
|
8c0f0f86b6 | ||
|
|
b651f53e8a | ||
|
|
10fad29923 | ||
|
|
d9962a89a7 | ||
|
|
60b189d101 | ||
|
|
6b065815b7 | ||
|
|
8c943b43f0 | ||
|
|
8b813115a9 | ||
|
|
4a6bb0ba87 | ||
|
|
a2daf0f493 | ||
|
|
0e3218c7be | ||
|
|
d98c5e19fc | ||
|
|
e086099d6c | ||
|
|
df3bec4d6c | ||
|
|
4b946175bf | ||
|
|
0e891f64a2 | ||
|
|
47cc38d89e | ||
|
|
d2f9060b5c | ||
|
|
c36cc504eb | ||
|
|
8d3b3a8493 | ||
|
|
e7955ca7bf | ||
|
|
016cd09849 | ||
|
|
23846d6100 | ||
|
|
0b247b709e | ||
|
|
bfee53a9b1 | ||
|
|
70a3c98aca | ||
|
|
a923124108 | ||
|
|
92484c0333 | ||
|
|
eb21452a83 | ||
|
|
433fe963e2 | ||
|
|
7a2390ddf7 | ||
|
|
1c6a14d17a | ||
|
|
0ba64f7979 | ||
|
|
38720e96a9 | ||
|
|
0f7d9b2adc | ||
|
|
210fc246ac | ||
|
|
df0d037f66 | ||
|
|
07ea309a47 | ||
|
|
1f09f53a88 | ||
|
|
f356921daa | ||
|
|
3d50ef03f7 | ||
|
|
7b309eb2d2 | ||
|
|
b22b0642ed | ||
|
|
1cbc9195c4 | ||
|
|
b853dd0b6e | ||
|
|
f7df60419c | ||
|
|
9f76cae55e | ||
|
|
aca5a9ff4c | ||
|
|
a6ce86d702 | ||
|
|
4392c89ec7 | ||
|
|
d74a31ba7f | ||
|
|
cb3195900f | ||
|
|
cf4b91f44d | ||
|
|
33ecf8d5f5 | ||
|
|
39a2293a45 | ||
|
|
88c93995df | ||
|
|
6afe5ff41f | ||
|
|
a3a8863f46 | ||
|
|
fcfee5116b | ||
|
|
3f2d91014b | ||
|
|
41149a83b3 | ||
|
|
1ed882e8f3 | ||
|
|
0f6e0de29c | ||
|
|
238bc532c3 | ||
|
|
6919848ab3 | ||
|
|
86aec88de4 | ||
|
|
f609d57a0c | ||
|
|
f3852a3c24 | ||
|
|
804ff6a81c | ||
|
|
7869f90edd | ||
|
|
2b72306b3d | ||
|
|
f0d5d4b93f | ||
|
|
4e4ab80b2f | ||
|
|
c33d08afda | ||
|
|
a246ac38f5 | ||
|
|
7bdaa81dee | ||
|
|
6a1d831849 | ||
|
|
95d2d1dfa3 | ||
|
|
d12dd179c2 | ||
|
|
ceda5eb8ee | ||
|
|
1301aaac76 | ||
|
|
5f7468a9a4 | ||
|
|
aa24c888a3 | ||
|
|
c792d9b6ea | ||
|
|
a921b95404 | ||
|
|
e46df069a9 | ||
|
|
c08f4f24ca | ||
|
|
67b3937824 | ||
|
|
17e1623342 | ||
|
|
d12fbf05b0 | ||
|
|
efa2d44e76 | ||
|
|
acdb46cea0 | ||
|
|
c1807516f5 | ||
|
|
ab796dff93 | ||
|
|
2d98f909de |
2
.github/workflows/check.yml
vendored
2
.github/workflows/check.yml
vendored
@@ -9,6 +9,8 @@ jobs:
|
||||
env:
|
||||
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
|
||||
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
|
||||
# to save corepack from itself
|
||||
COREPACK_INTEGRITY_KEYS: 0
|
||||
name: Check & Publish
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 60
|
||||
|
||||
27
.github/workflows/docker.yml
vendored
27
.github/workflows/docker.yml
vendored
@@ -1,4 +1,4 @@
|
||||
name: Create Docker images on tag
|
||||
name: Create Docker images on Release
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
@@ -11,6 +11,10 @@ on:
|
||||
options:
|
||||
- "true"
|
||||
- "false"
|
||||
release-tag:
|
||||
description: 'Kestra Release Tag'
|
||||
required: false
|
||||
type: string
|
||||
plugin-version:
|
||||
description: 'Plugin version'
|
||||
required: false
|
||||
@@ -38,7 +42,6 @@ jobs:
|
||||
name: Publish Docker
|
||||
needs: [ plugins ]
|
||||
runs-on: ubuntu-latest
|
||||
if: startsWith(github.ref, 'refs/tags/v')
|
||||
strategy:
|
||||
matrix:
|
||||
image:
|
||||
@@ -57,10 +60,19 @@ jobs:
|
||||
- name: Set image name
|
||||
id: vars
|
||||
run: |
|
||||
TAG=${GITHUB_REF#refs/*/}
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
|
||||
if [[ "${{ inputs.release-tag }}" == "" ]]; then
|
||||
TAG=${GITHUB_REF#refs/*/}
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
else
|
||||
TAG="${{ inputs.release-tag }}"
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
if [[ "${{ env.PLUGIN_VERSION }}" == *"-SNAPSHOT" ]]; then
|
||||
echo "plugins=--repositories=https://s01.oss.sonatype.org/content/repositories/snapshots ${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT;
|
||||
else
|
||||
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
# Download release
|
||||
- name: Download release
|
||||
uses: robinraju/release-downloader@v1.11
|
||||
@@ -77,6 +89,11 @@ jobs:
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Docker - Fix Qemu
|
||||
shell: bash
|
||||
run: |
|
||||
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
releaseVersion:
|
||||
description: 'The release version (e.g., 0.21.0)'
|
||||
description: 'The release version (e.g., 0.21.0-rc1)'
|
||||
required: true
|
||||
type: string
|
||||
nextVersion:
|
||||
@@ -18,13 +18,29 @@ on:
|
||||
jobs:
|
||||
release:
|
||||
name: Release plugins
|
||||
runs-on: kestra-private-standard
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
ref: main
|
||||
|
||||
# Setup build
|
||||
- uses: ./actions/.github/actions/setup-build
|
||||
id: build
|
||||
with:
|
||||
java-enabled: true
|
||||
node-enabled: true
|
||||
python-enabled: true
|
||||
caches-enabled: true
|
||||
|
||||
# Get Plugins List
|
||||
- name: Get Plugins List
|
||||
uses: ./.github/actions/plugins-list
|
||||
@@ -33,14 +49,20 @@ jobs:
|
||||
with:
|
||||
plugin-version: 'LATEST'
|
||||
|
||||
- name: 'Configure Git'
|
||||
run: |
|
||||
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
|
||||
git config --global user.name "github-actions[bot]"
|
||||
|
||||
# Execute
|
||||
- name: Run Gradle Release
|
||||
if: ${{ github.event.inputs.dryRun == 'false' }}
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
chmod +x ./release-plugins.sh;
|
||||
./release-plugins.sh \
|
||||
chmod +x ./dev-tools/release-plugins.sh;
|
||||
|
||||
./dev-tools/release-plugins.sh \
|
||||
--release-version=${{github.event.inputs.releaseVersion}} \
|
||||
--next-version=${{github.event.inputs.nextVersion}} \
|
||||
--yes \
|
||||
@@ -51,8 +73,9 @@ jobs:
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
chmod +x ./release-plugins.sh;
|
||||
./release-plugins.sh \
|
||||
chmod +x ./dev-tools/release-plugins.sh;
|
||||
|
||||
./dev-tools/release-plugins.sh \
|
||||
--release-version=${{github.event.inputs.releaseVersion}} \
|
||||
--next-version=${{github.event.inputs.nextVersion}} \
|
||||
--dry-run \
|
||||
89
.github/workflows/gradle-release.yml
vendored
Normal file
89
.github/workflows/gradle-release.yml
vendored
Normal file
@@ -0,0 +1,89 @@
|
||||
name: Run Gradle Release
|
||||
run-name: "Releasing Kestra ${{ github.event.inputs.releaseVersion }} 🚀"
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
releaseVersion:
|
||||
description: 'The release version (e.g., 0.21.0-rc1)'
|
||||
required: true
|
||||
type: string
|
||||
nextVersion:
|
||||
description: 'The next version (e.g., 0.22.0-SNAPSHOT)'
|
||||
required: true
|
||||
type: string
|
||||
env:
|
||||
RELEASE_VERSION: "${{ github.event.inputs.releaseVersion }}"
|
||||
NEXT_VERSION: "${{ github.event.inputs.nextVersion }}"
|
||||
jobs:
|
||||
release:
|
||||
name: Release Kestra
|
||||
runs-on: ubuntu-latest
|
||||
if: github.ref == 'refs/heads/develop'
|
||||
steps:
|
||||
# Checks
|
||||
- name: Check Inputs
|
||||
run: |
|
||||
if ! [[ "$RELEASE_VERSION" =~ ^[0-9]+(\.[0-9]+)\.0-rc[01](-SNAPSHOT)?$ ]]; then
|
||||
echo "Invalid release version. Must match regex: ^[0-9]+(\.[0-9]+)\.0-rc[01](-SNAPSHOT)?$"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if ! [[ "$NEXT_VERSION" =~ ^[0-9]+(\.[0-9]+)\.0-SNAPSHOT$ ]]; then
|
||||
echo "Invalid next version. Must match regex: ^[0-9]+(\.[0-9]+)\.0-SNAPSHOT$"
|
||||
exit 1;
|
||||
fi
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
ref: main
|
||||
|
||||
# Setup build
|
||||
- uses: ./actions/.github/actions/setup-build
|
||||
id: build
|
||||
with:
|
||||
java-enabled: true
|
||||
node-enabled: true
|
||||
python-enabled: true
|
||||
caches-enabled: true
|
||||
|
||||
- name: Configure Git
|
||||
run: |
|
||||
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
|
||||
git config --global user.name "github-actions[bot]"
|
||||
|
||||
# Execute
|
||||
- name: Run Gradle Release
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
# Extract the major and minor versions
|
||||
BASE_VERSION=$(echo "$RELEASE_VERSION" | sed -E 's/^([0-9]+\.[0-9]+)\..*/\1/')
|
||||
PUSH_RELEASE_BRANCH="releases/v${BASE_VERSION}.x"
|
||||
|
||||
# Create and push release branch
|
||||
git checkout -b "$PUSH_RELEASE_BRANCH";
|
||||
git push -u origin "$PUSH_RELEASE_BRANCH";
|
||||
|
||||
# Run gradle release
|
||||
git checkout develop;
|
||||
|
||||
if [[ "$RELEASE_VERSION" == *"-SNAPSHOT" ]]; then
|
||||
# -SNAPSHOT qualifier maybe used to test release-candidates
|
||||
./gradlew release -Prelease.useAutomaticVersion=true \
|
||||
-Prelease.releaseVersion="${RELEASE_VERSION}" \
|
||||
-Prelease.newVersion="${NEXT_VERSION}" \
|
||||
-Prelease.pushReleaseVersionBranch="${PUSH_RELEASE_BRANCH}" \
|
||||
-Prelease.failOnSnapshotDependencies=false
|
||||
else
|
||||
./gradlew release -Prelease.useAutomaticVersion=true \
|
||||
-Prelease.releaseVersion="${RELEASE_VERSION}" \
|
||||
-Prelease.newVersion="${NEXT_VERSION}" \
|
||||
-Prelease.pushReleaseVersionBranch="${PUSH_RELEASE_BRANCH}"
|
||||
fi
|
||||
21
.github/workflows/main.yml
vendored
21
.github/workflows/main.yml
vendored
@@ -35,6 +35,8 @@ env:
|
||||
DOCKER_APT_PACKAGES: python3 python3-venv python-is-python3 python3-pip nodejs npm curl zip unzip
|
||||
DOCKER_PYTHON_LIBRARIES: kestra
|
||||
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
# to save corepack from itself
|
||||
COREPACK_INTEGRITY_KEYS: 0
|
||||
jobs:
|
||||
build-artifacts:
|
||||
name: Build Artifacts
|
||||
@@ -45,13 +47,14 @@ jobs:
|
||||
docker-artifact-name: ${{ steps.vars.outputs.artifact }}
|
||||
plugins: ${{ steps.plugins-list.outputs.plugins }}
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Checkout current ref
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
- name: Checkout GitHub Actions
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
@@ -125,6 +128,11 @@ jobs:
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Docker - Fix Qemu
|
||||
shell: bash
|
||||
run: |
|
||||
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
@@ -403,6 +411,11 @@ jobs:
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Docker - Fix Qemu
|
||||
shell: bash
|
||||
run: |
|
||||
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
name: Update and Tag Kestra Plugins
|
||||
name: Set Version and Tag Plugins
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
@@ -14,7 +14,7 @@ on:
|
||||
jobs:
|
||||
tag:
|
||||
name: Release plugins
|
||||
runs-on: kestra-private-standard
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
@@ -29,25 +29,32 @@ jobs:
|
||||
with:
|
||||
plugin-version: 'LATEST'
|
||||
|
||||
- name: 'Configure Git'
|
||||
run: |
|
||||
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
|
||||
git config --global user.name "github-actions[bot]"
|
||||
|
||||
# Execute
|
||||
- name: Tag Plugins
|
||||
- name: Set Version and Tag Plugins
|
||||
if: ${{ github.event.inputs.dryRun == 'false' }}
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
chmod +x ./tag-release-plugins.sh;
|
||||
./tag-release-plugins.sh \
|
||||
chmod +x ./dev-tools/setversion-tag-plugins.sh;
|
||||
|
||||
./dev-tools/setversion-tag-plugins.sh \
|
||||
--release-version=${{github.event.inputs.releaseVersion}} \
|
||||
--yes \
|
||||
${{ steps.plugins-list.outputs.repositories }}
|
||||
|
||||
- name: Run Gradle Release (DRY_RUN)
|
||||
- name: Set Version and Tag Plugins (DRY_RUN)
|
||||
if: ${{ github.event.inputs.dryRun == 'true' }}
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
chmod +x ./tag-release-plugins.sh;
|
||||
./tag-release-plugins.sh \
|
||||
chmod +x ./dev-tools/setversion-tag-plugins.sh;
|
||||
|
||||
./dev-tools/setversion-tag-plugins.sh \
|
||||
--release-version=${{github.event.inputs.releaseVersion}} \
|
||||
--dry-run \
|
||||
--yes \
|
||||
58
.github/workflows/setversion-tag.yml
vendored
Normal file
58
.github/workflows/setversion-tag.yml
vendored
Normal file
@@ -0,0 +1,58 @@
|
||||
name: Set Version and Tag
|
||||
run-name: "Set version and Tag Kestra to ${{ github.event.inputs.releaseVersion }} 🚀"
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
releaseVersion:
|
||||
description: 'The release version (e.g., 0.21.1)'
|
||||
required: true
|
||||
type: string
|
||||
env:
|
||||
RELEASE_VERSION: "${{ github.event.inputs.releaseVersion }}"
|
||||
jobs:
|
||||
release:
|
||||
name: Release Kestra
|
||||
runs-on: ubuntu-latest
|
||||
if: startsWith(github.ref, 'refs/heads/releases/v')
|
||||
steps:
|
||||
# Checks
|
||||
- name: Check Inputs
|
||||
run: |
|
||||
if ! [[ "$RELEASE_VERSION" =~ ^[0-9]+(\.[0-9]+)(\.[0-9]+)(-rc[0-9])?(-SNAPSHOT)?$ ]]; then
|
||||
echo "Invalid release version. Must match regex: ^[0-9]+(\.[0-9]+)(\.[0-9]+)-(rc[0-9])?(-SNAPSHOT)?$"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
CURRENT_BRANCH="{{ github.ref }}"
|
||||
|
||||
# Extract the major and minor versions
|
||||
BASE_VERSION=$(echo "$RELEASE_VERSION" | sed -E 's/^([0-9]+\.[0-9]+)\..*/\1/')
|
||||
RELEASE_BRANCH="refs/heads/releases/v${BASE_VERSION}.x"
|
||||
|
||||
if ! [[ "$CURRENT_BRANCH" == "$RELEASE_BRANCH" ]]; then
|
||||
echo "Invalid release branch. Expected $RELEASE_BRANCH, was $CURRENT_BRANCH"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Configure Git
|
||||
run: |
|
||||
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
|
||||
git config --global user.name "github-actions[bot]"
|
||||
|
||||
# Execute
|
||||
- name: Run Gradle Release
|
||||
env:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
# Update version
|
||||
sed -i "s/^version=.*/version=$RELEASE_VERSION/" ./gradle.properties
|
||||
git add ./gradle.properties
|
||||
git commit -m"chore(version): update to version '$RELEASE_VERSION'"
|
||||
git push
|
||||
git tag -a "v$RELEASE_VERSION" -m"v$RELEASE_VERSION"
|
||||
git push origin "v$RELEASE_VERSION"
|
||||
1
.plugins
1
.plugins
@@ -40,6 +40,7 @@
|
||||
#plugin-jdbc:io.kestra.plugin:plugin-jdbc-db2:LATEST
|
||||
#plugin-jdbc:io.kestra.plugin:plugin-jdbc-duckdb:LATEST
|
||||
#plugin-jdbc:io.kestra.plugin:plugin-jdbc-druid:LATEST
|
||||
#plugin-jdbc:io.kestra.plugin:plugin-jdbc-mariadb:LATEST
|
||||
#plugin-jdbc:io.kestra.plugin:plugin-jdbc-mysql:LATEST
|
||||
#plugin-jdbc:io.kestra.plugin:plugin-jdbc-oracle:LATEST
|
||||
#plugin-jdbc:io.kestra.plugin:plugin-jdbc-pinot:LATEST
|
||||
|
||||
1064
CHANGELOG.md
Normal file
1064
CHANGELOG.md
Normal file
File diff suppressed because it is too large
Load Diff
@@ -39,7 +39,7 @@ public class FlowValidateCommand extends AbstractValidateCommand {
|
||||
Flow flow = (Flow) object;
|
||||
List<String> warnings = new ArrayList<>();
|
||||
warnings.addAll(flowService.deprecationPaths(flow).stream().map(deprecation -> deprecation + " is deprecated").toList());
|
||||
warnings.addAll(flowService.warnings(flow));
|
||||
warnings.addAll(flowService.warnings(flow, this.tenantId));
|
||||
return warnings;
|
||||
},
|
||||
(Object object) -> {
|
||||
|
||||
@@ -3,7 +3,7 @@ package io.kestra.cli.commands.plugins;
|
||||
import org.apache.commons.io.FilenameUtils;
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.cli.plugins.PluginDownloader;
|
||||
import io.kestra.cli.plugins.RepositoryConfig;
|
||||
import io.kestra.cli.plugins.MavenPluginRepositoryConfig;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import org.apache.http.client.utils.URIBuilder;
|
||||
import picocli.CommandLine;
|
||||
@@ -59,13 +59,13 @@ public class PluginInstallCommand extends AbstractCommand {
|
||||
.forEach(throwConsumer(s -> {
|
||||
URIBuilder uriBuilder = new URIBuilder(s);
|
||||
|
||||
RepositoryConfig.RepositoryConfigBuilder builder = RepositoryConfig.builder()
|
||||
var builder = MavenPluginRepositoryConfig.builder()
|
||||
.id(IdUtils.create());
|
||||
|
||||
if (uriBuilder.getUserInfo() != null) {
|
||||
int index = uriBuilder.getUserInfo().indexOf(":");
|
||||
|
||||
builder.basicAuth(new RepositoryConfig.BasicAuth(
|
||||
builder.basicAuth(new MavenPluginRepositoryConfig.BasicAuth(
|
||||
uriBuilder.getUserInfo().substring(0, index),
|
||||
uriBuilder.getUserInfo().substring(index + 1)
|
||||
));
|
||||
|
||||
@@ -91,9 +91,10 @@ public class StandAloneCommand extends AbstractServerCommand {
|
||||
this.skipExecutionService.setSkipFlows(skipFlows);
|
||||
this.skipExecutionService.setSkipNamespaces(skipNamespaces);
|
||||
this.skipExecutionService.setSkipTenants(skipTenants);
|
||||
|
||||
this.startExecutorService.applyOptions(startExecutors, notStartExecutors);
|
||||
|
||||
KestraContext.getContext().injectWorkerConfigs(workerThread, null);
|
||||
|
||||
super.call();
|
||||
this.shutdownHook(() -> KestraContext.getContext().shutdown());
|
||||
|
||||
|
||||
@@ -39,8 +39,13 @@ public class WorkerCommand extends AbstractServerCommand {
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
|
||||
KestraContext.getContext().injectWorkerConfigs(thread, workerGroupKey);
|
||||
|
||||
super.call();
|
||||
|
||||
this.shutdownHook(() -> KestraContext.getContext().shutdown());
|
||||
|
||||
if (this.workerGroupKey != null && !this.workerGroupKey.matches("[a-zA-Z0-9_-]+")) {
|
||||
throw new IllegalArgumentException("The --worker-group option must match the [a-zA-Z0-9_-]+ pattern");
|
||||
}
|
||||
|
||||
@@ -0,0 +1,30 @@
|
||||
package io.kestra.cli.plugins;
|
||||
|
||||
import io.micronaut.context.annotation.ConfigurationProperties;
|
||||
import io.micronaut.context.annotation.EachProperty;
|
||||
import io.micronaut.context.annotation.Parameter;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
|
||||
@Builder
|
||||
@EachProperty("kestra.plugins.repositories")
|
||||
public record MavenPluginRepositoryConfig(
|
||||
@Parameter
|
||||
String id,
|
||||
String url,
|
||||
|
||||
@Nullable
|
||||
BasicAuth basicAuth
|
||||
) {
|
||||
|
||||
@Builder
|
||||
@ConfigurationProperties("basic-auth")
|
||||
public record BasicAuth(
|
||||
String username,
|
||||
String password
|
||||
) {
|
||||
|
||||
}
|
||||
}
|
||||
@@ -11,7 +11,6 @@ import org.apache.maven.repository.internal.MavenRepositorySystemUtils;
|
||||
import org.eclipse.aether.DefaultRepositorySystemSession;
|
||||
import org.eclipse.aether.RepositorySystem;
|
||||
import org.eclipse.aether.RepositorySystemSession;
|
||||
import org.eclipse.aether.artifact.Artifact;
|
||||
import org.eclipse.aether.artifact.DefaultArtifact;
|
||||
import org.eclipse.aether.connector.basic.BasicRepositoryConnectorFactory;
|
||||
import org.eclipse.aether.impl.DefaultServiceLocator;
|
||||
@@ -31,18 +30,17 @@ import java.net.URL;
|
||||
import java.nio.file.Files;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class PluginDownloader {
|
||||
private final List<RepositoryConfig> repositoryConfigs;
|
||||
private final List<MavenPluginRepositoryConfig> repositoryConfigs;
|
||||
private final RepositorySystem system;
|
||||
private final RepositorySystemSession session;
|
||||
|
||||
@Inject
|
||||
public PluginDownloader(
|
||||
List<RepositoryConfig> repositoryConfigs,
|
||||
List<MavenPluginRepositoryConfig> repositoryConfigs,
|
||||
@Nullable @Value("${kestra.plugins.local-repository-path}") String localRepositoryPath
|
||||
) {
|
||||
this.repositoryConfigs = repositoryConfigs;
|
||||
@@ -50,7 +48,7 @@ public class PluginDownloader {
|
||||
this.session = repositorySystemSession(system, localRepositoryPath);
|
||||
}
|
||||
|
||||
public void addRepository(RepositoryConfig repositoryConfig) {
|
||||
public void addRepository(MavenPluginRepositoryConfig repositoryConfig) {
|
||||
this.repositoryConfigs.add(repositoryConfig);
|
||||
}
|
||||
|
||||
@@ -69,15 +67,15 @@ public class PluginDownloader {
|
||||
.stream()
|
||||
.map(repositoryConfig -> {
|
||||
var build = new RemoteRepository.Builder(
|
||||
repositoryConfig.getId(),
|
||||
repositoryConfig.id(),
|
||||
"default",
|
||||
repositoryConfig.getUrl()
|
||||
repositoryConfig.url()
|
||||
);
|
||||
|
||||
if (repositoryConfig.getBasicAuth() != null) {
|
||||
if (repositoryConfig.basicAuth() != null) {
|
||||
var authenticationBuilder = new AuthenticationBuilder();
|
||||
authenticationBuilder.addUsername(repositoryConfig.getBasicAuth().getUsername());
|
||||
authenticationBuilder.addPassword(repositoryConfig.getBasicAuth().getPassword());
|
||||
authenticationBuilder.addUsername(repositoryConfig.basicAuth().username());
|
||||
authenticationBuilder.addPassword(repositoryConfig.basicAuth().password());
|
||||
|
||||
build.setAuthentication(authenticationBuilder.build());
|
||||
}
|
||||
|
||||
@@ -1,30 +0,0 @@
|
||||
package io.kestra.cli.plugins;
|
||||
|
||||
import io.micronaut.context.annotation.EachProperty;
|
||||
import io.micronaut.context.annotation.Parameter;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
|
||||
@EachProperty("kestra.plugins.repositories")
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
@Builder
|
||||
public class RepositoryConfig {
|
||||
String id;
|
||||
|
||||
String url;
|
||||
|
||||
BasicAuth basicAuth;
|
||||
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public static class BasicAuth {
|
||||
private String username;
|
||||
private String password;
|
||||
}
|
||||
|
||||
public RepositoryConfig(@Parameter String id) {
|
||||
this.id = id;
|
||||
}
|
||||
}
|
||||
@@ -7,6 +7,7 @@ import io.kestra.core.models.validations.ModelValidator;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.serializers.YamlParser;
|
||||
import io.kestra.core.services.FlowListenersInterface;
|
||||
import io.kestra.core.services.PluginDefaultService;
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
import io.micronaut.context.annotation.Value;
|
||||
import io.micronaut.scheduling.io.watch.FileWatchConfiguration;
|
||||
@@ -36,6 +37,9 @@ public class FileChangedEventListener {
|
||||
@Inject
|
||||
private FlowRepositoryInterface flowRepositoryInterface;
|
||||
|
||||
@Inject
|
||||
private PluginDefaultService pluginDefaultService;
|
||||
|
||||
@Inject
|
||||
private YamlParser yamlParser;
|
||||
|
||||
@@ -64,7 +68,7 @@ public class FileChangedEventListener {
|
||||
|
||||
public void startListeningFromConfig() throws IOException, InterruptedException {
|
||||
if (fileWatchConfiguration != null && fileWatchConfiguration.isEnabled()) {
|
||||
this.flowFilesManager = new LocalFlowFileWatcher(flowRepositoryInterface);
|
||||
this.flowFilesManager = new LocalFlowFileWatcher(flowRepositoryInterface, pluginDefaultService);
|
||||
List<Path> paths = fileWatchConfiguration.getPaths();
|
||||
this.setup(paths);
|
||||
|
||||
@@ -107,7 +111,6 @@ public class FileChangedEventListener {
|
||||
} else {
|
||||
log.info("File watching is disabled.");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void startListening(List<Path> paths) throws IOException, InterruptedException {
|
||||
@@ -118,60 +121,64 @@ public class FileChangedEventListener {
|
||||
WatchKey key;
|
||||
while ((key = watchService.take()) != null) {
|
||||
for (WatchEvent<?> watchEvent : key.pollEvents()) {
|
||||
WatchEvent.Kind<?> kind = watchEvent.kind();
|
||||
Path entry = (Path) watchEvent.context();
|
||||
try {
|
||||
WatchEvent.Kind<?> kind = watchEvent.kind();
|
||||
Path entry = (Path) watchEvent.context();
|
||||
|
||||
if (entry.toString().endsWith(".yml") || entry.toString().endsWith(".yaml")) {
|
||||
if (entry.toString().endsWith(".yml") || entry.toString().endsWith(".yaml")) {
|
||||
|
||||
if (kind == StandardWatchEventKinds.ENTRY_CREATE || kind == StandardWatchEventKinds.ENTRY_MODIFY) {
|
||||
if (kind == StandardWatchEventKinds.ENTRY_CREATE || kind == StandardWatchEventKinds.ENTRY_MODIFY) {
|
||||
|
||||
Path filePath = ((Path) key.watchable()).resolve(entry);
|
||||
if (Files.isDirectory(filePath)) {
|
||||
loadFlowsFromFolder(filePath);
|
||||
} else {
|
||||
Path filePath = ((Path) key.watchable()).resolve(entry);
|
||||
if (Files.isDirectory(filePath)) {
|
||||
loadFlowsFromFolder(filePath);
|
||||
} else {
|
||||
|
||||
try {
|
||||
String content = Files.readString(filePath, Charset.defaultCharset());
|
||||
try {
|
||||
String content = Files.readString(filePath, Charset.defaultCharset());
|
||||
|
||||
Optional<Flow> flow = parseFlow(content, entry);
|
||||
if (flow.isPresent()) {
|
||||
if (kind == StandardWatchEventKinds.ENTRY_MODIFY) {
|
||||
// Check if we already have a file with the given path
|
||||
if (flows.stream().anyMatch(flowWithPath -> flowWithPath.getPath().equals(filePath.toString()))) {
|
||||
Optional<FlowWithPath> previous = flows.stream().filter(flowWithPath -> flowWithPath.getPath().equals(filePath.toString())).findFirst();
|
||||
// Check if Flow from file has id/namespace updated
|
||||
if (previous.isPresent() && !previous.get().uidWithoutRevision().equals(flow.get().uidWithoutRevision())) {
|
||||
flows.removeIf(flowWithPath -> flowWithPath.getPath().equals(filePath.toString()));
|
||||
flowFilesManager.deleteFlow(previous.get().getTenantId(), previous.get().getNamespace(), previous.get().getId());
|
||||
Optional<Flow> flow = parseFlow(content, entry);
|
||||
if (flow.isPresent()) {
|
||||
if (kind == StandardWatchEventKinds.ENTRY_MODIFY) {
|
||||
// Check if we already have a file with the given path
|
||||
if (flows.stream().anyMatch(flowWithPath -> flowWithPath.getPath().equals(filePath.toString()))) {
|
||||
Optional<FlowWithPath> previous = flows.stream().filter(flowWithPath -> flowWithPath.getPath().equals(filePath.toString())).findFirst();
|
||||
// Check if Flow from file has id/namespace updated
|
||||
if (previous.isPresent() && !previous.get().uidWithoutRevision().equals(flow.get().uidWithoutRevision())) {
|
||||
flows.removeIf(flowWithPath -> flowWithPath.getPath().equals(filePath.toString()));
|
||||
flowFilesManager.deleteFlow(previous.get().getTenantId(), previous.get().getNamespace(), previous.get().getId());
|
||||
flows.add(FlowWithPath.of(flow.get(), filePath.toString()));
|
||||
}
|
||||
} else {
|
||||
flows.add(FlowWithPath.of(flow.get(), filePath.toString()));
|
||||
}
|
||||
} else {
|
||||
flows.add(FlowWithPath.of(flow.get(), filePath.toString()));
|
||||
}
|
||||
} else {
|
||||
flows.add(FlowWithPath.of(flow.get(), filePath.toString()));
|
||||
|
||||
flowFilesManager.createOrUpdateFlow(flow.get(), content);
|
||||
log.info("Flow {} from file {} has been created or modified", flow.get().getId(), entry);
|
||||
}
|
||||
|
||||
flowFilesManager.createOrUpdateFlow(flow.get(), content);
|
||||
log.info("Flow {} from file {} has been created or modified", flow.get().getId(), entry);
|
||||
} catch (NoSuchFileException e) {
|
||||
log.error("File not found: {}", entry, e);
|
||||
} catch (IOException e) {
|
||||
log.error("Error reading file: {}", entry, e);
|
||||
}
|
||||
|
||||
} catch (NoSuchFileException e) {
|
||||
log.error("File not found: {}", entry, e);
|
||||
} catch (IOException e) {
|
||||
log.error("Error reading file: {}", entry, e);
|
||||
}
|
||||
} else {
|
||||
Path filePath = ((Path) key.watchable()).resolve(entry);
|
||||
flows.stream()
|
||||
.filter(flow -> flow.getPath().equals(filePath.toString()))
|
||||
.findFirst()
|
||||
.ifPresent(flowWithPath -> {
|
||||
flowFilesManager.deleteFlow(flowWithPath.getTenantId(), flowWithPath.getNamespace(), flowWithPath.getId());
|
||||
this.flows.removeIf(fwp -> fwp.uidWithoutRevision().equals(flowWithPath.uidWithoutRevision()));
|
||||
});
|
||||
}
|
||||
} else {
|
||||
Path filePath = ((Path) key.watchable()).resolve(entry);
|
||||
flows.stream()
|
||||
.filter(flow -> flow.getPath().equals(filePath.toString()))
|
||||
.findFirst()
|
||||
.ifPresent(flowWithPath -> {
|
||||
flowFilesManager.deleteFlow(flowWithPath.getTenantId(), flowWithPath.getNamespace(), flowWithPath.getId());
|
||||
this.flows.removeIf(fwp -> fwp.uidWithoutRevision().equals(flowWithPath.uidWithoutRevision()));
|
||||
});
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Unexpected error while watching flows", e);
|
||||
}
|
||||
}
|
||||
key.reset();
|
||||
@@ -230,7 +237,8 @@ public class FileChangedEventListener {
|
||||
private Optional<Flow> parseFlow(String content, Path entry) {
|
||||
try {
|
||||
Flow flow = yamlParser.parse(content, Flow.class);
|
||||
modelValidator.validate(flow);
|
||||
FlowWithSource withPluginDefault = pluginDefaultService.injectDefaults(FlowWithSource.of(flow, content));
|
||||
modelValidator.validate(withPluginDefault);
|
||||
return Optional.of(flow);
|
||||
} catch (ConstraintViolationException e) {
|
||||
log.warn("Error while parsing flow: {}", entry, e);
|
||||
|
||||
@@ -3,32 +3,36 @@ package io.kestra.cli.services;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
import io.kestra.core.services.PluginDefaultService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Requires(property = "micronaut.io.watch.enabled", value = "true")
|
||||
@Slf4j
|
||||
public class LocalFlowFileWatcher implements FlowFilesManager {
|
||||
private FlowRepositoryInterface flowRepositoryInterface;
|
||||
private final FlowRepositoryInterface flowRepository;
|
||||
private final PluginDefaultService pluginDefaultService;
|
||||
|
||||
public LocalFlowFileWatcher(FlowRepositoryInterface flowRepositoryInterface) {
|
||||
this.flowRepositoryInterface = flowRepositoryInterface;
|
||||
public LocalFlowFileWatcher(FlowRepositoryInterface flowRepository, PluginDefaultService pluginDefaultService) {
|
||||
this.flowRepository = flowRepository;
|
||||
this.pluginDefaultService = pluginDefaultService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FlowWithSource createOrUpdateFlow(Flow flow, String content) {
|
||||
return flowRepositoryInterface.findById(null, flow.getNamespace(), flow.getId())
|
||||
.map(previous -> flowRepositoryInterface.update(flow, previous, content, flow))
|
||||
.orElseGet(() -> flowRepositoryInterface.create(flow, content, flow));
|
||||
FlowWithSource withDefault = pluginDefaultService.injectDefaults(FlowWithSource.of(flow, content));
|
||||
return flowRepository.findById(null, flow.getNamespace(), flow.getId())
|
||||
.map(previous -> flowRepository.update(flow, previous, content, withDefault))
|
||||
.orElseGet(() -> flowRepository.create(flow, content, withDefault));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteFlow(FlowWithSource toDelete) {
|
||||
flowRepositoryInterface.findByIdWithSource(toDelete.getTenantId(), toDelete.getNamespace(), toDelete.getId()).ifPresent(flowRepositoryInterface::delete);
|
||||
log.error("Flow {} has been deleted", toDelete.getId());
|
||||
flowRepository.findByIdWithSource(toDelete.getTenantId(), toDelete.getNamespace(), toDelete.getId()).ifPresent(flowRepository::delete);
|
||||
log.info("Flow {} has been deleted", toDelete.getId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteFlow(String tenantId, String namespace, String id) {
|
||||
flowRepositoryInterface.findByIdWithSource(tenantId, namespace, id).ifPresent(flowRepositoryInterface::delete);
|
||||
log.error("Flow {} has been deleted", id);
|
||||
flowRepository.findByIdWithSource(tenantId, namespace, id).ifPresent(flowRepository::delete);
|
||||
log.info("Flow {} has been deleted", id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,131 @@
|
||||
package io.kestra.cli.services;
|
||||
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.junitpioneer.jupiter.RetryingTest;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwRunnable;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
@MicronautTest(environments = {"test", "file-watch"}, transactional = false)
|
||||
class FileChangedEventListenerTest {
|
||||
public static final String FILE_WATCH = "build/file-watch";
|
||||
@Inject
|
||||
private FileChangedEventListener fileWatcher;
|
||||
|
||||
@Inject
|
||||
private FlowRepositoryInterface flowRepository;
|
||||
|
||||
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
|
||||
private final AtomicBoolean started = new AtomicBoolean(false);
|
||||
|
||||
@BeforeAll
|
||||
static void setup() throws IOException {
|
||||
if (!Files.exists(Path.of(FILE_WATCH))) {
|
||||
Files.createDirectories(Path.of(FILE_WATCH));
|
||||
}
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
static void tearDown() throws IOException {
|
||||
if (Files.exists(Path.of(FILE_WATCH))) {
|
||||
FileUtils.deleteDirectory(Path.of(FILE_WATCH).toFile());
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
void beforeEach() throws Exception {
|
||||
if (started.compareAndSet(false, true)) {
|
||||
executorService.execute(throwRunnable(() -> fileWatcher.startListeningFromConfig()));
|
||||
}
|
||||
}
|
||||
|
||||
@RetryingTest(5) // Flaky on CI but always pass locally
|
||||
void test() throws IOException, TimeoutException {
|
||||
// remove the flow if it already exists
|
||||
flowRepository.findByIdWithSource(null, "io.kestra.tests.watch", "myflow").ifPresent(flow -> flowRepository.delete(flow));
|
||||
|
||||
// create a basic flow
|
||||
String flow = """
|
||||
id: myflow
|
||||
namespace: io.kestra.tests.watch
|
||||
|
||||
tasks:
|
||||
- id: hello
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: Hello World! 🚀
|
||||
""";
|
||||
Files.write(Path.of(FILE_WATCH + "/myflow.yaml"), flow.getBytes());
|
||||
Await.until(
|
||||
() -> flowRepository.findById(null, "io.kestra.tests.watch", "myflow").isPresent(),
|
||||
Duration.ofMillis(100),
|
||||
Duration.ofSeconds(10)
|
||||
);
|
||||
Flow myflow = flowRepository.findById(null, "io.kestra.tests.watch", "myflow").orElseThrow();
|
||||
assertThat(myflow.getTasks(), hasSize(1));
|
||||
assertThat(myflow.getTasks().getFirst().getId(), is("hello"));
|
||||
assertThat(myflow.getTasks().getFirst().getType(), is("io.kestra.plugin.core.log.Log"));
|
||||
|
||||
// delete the flow
|
||||
Files.delete(Path.of(FILE_WATCH + "/myflow.yaml"));
|
||||
Await.until(
|
||||
() -> flowRepository.findById(null, "io.kestra.tests.watch", "myflow").isEmpty(),
|
||||
Duration.ofMillis(100),
|
||||
Duration.ofSeconds(10)
|
||||
);
|
||||
}
|
||||
|
||||
@RetryingTest(5) // Flaky on CI but always pass locally
|
||||
void testWithPluginDefault() throws IOException, TimeoutException {
|
||||
// remove the flow if it already exists
|
||||
flowRepository.findByIdWithSource(null, "io.kestra.tests.watch", "pluginDefault").ifPresent(flow -> flowRepository.delete(flow));
|
||||
|
||||
// create a flow with plugin default
|
||||
String pluginDefault = """
|
||||
id: pluginDefault
|
||||
namespace: io.kestra.tests.watch
|
||||
|
||||
tasks:
|
||||
- id: helloWithDefault
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
|
||||
pluginDefaults:
|
||||
- type: io.kestra.plugin.core.log.Log
|
||||
values:
|
||||
message: Hello World!
|
||||
""";
|
||||
Files.write(Path.of(FILE_WATCH + "/plugin-default.yaml"), pluginDefault.getBytes());
|
||||
Await.until(
|
||||
() -> flowRepository.findById(null, "io.kestra.tests.watch", "pluginDefault").isPresent(),
|
||||
Duration.ofMillis(100),
|
||||
Duration.ofSeconds(10)
|
||||
);
|
||||
Flow pluginDefaultFlow = flowRepository.findById(null, "io.kestra.tests.watch", "pluginDefault").orElseThrow();
|
||||
assertThat(pluginDefaultFlow.getTasks(), hasSize(1));
|
||||
assertThat(pluginDefaultFlow.getTasks().getFirst().getId(), is("helloWithDefault"));
|
||||
assertThat(pluginDefaultFlow.getTasks().getFirst().getType(), is("io.kestra.plugin.core.log.Log"));
|
||||
|
||||
// delete both files
|
||||
Files.delete(Path.of(FILE_WATCH + "/plugin-default.yaml"));
|
||||
Await.until(
|
||||
() -> flowRepository.findById(null, "io.kestra.tests.watch", "pluginDefault").isEmpty(),
|
||||
Duration.ofMillis(100),
|
||||
Duration.ofSeconds(10)
|
||||
);
|
||||
}
|
||||
}
|
||||
12
cli/src/test/resources/application-file-watch.yml
Normal file
12
cli/src/test/resources/application-file-watch.yml
Normal file
@@ -0,0 +1,12 @@
|
||||
micronaut:
|
||||
io:
|
||||
watch:
|
||||
enabled: true
|
||||
paths:
|
||||
- build/file-watch
|
||||
|
||||
kestra:
|
||||
repository:
|
||||
type: memory
|
||||
queue:
|
||||
type: memory
|
||||
@@ -10,6 +10,8 @@ import io.micronaut.context.env.Environment;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
@@ -27,6 +29,10 @@ public abstract class KestraContext {
|
||||
// Properties
|
||||
private static final String KESTRA_SERVER_TYPE = "kestra.server-type";
|
||||
|
||||
// Those properties are injected bases on the CLI args.
|
||||
private static final String KESTRA_WORKER_MAX_NUM_THREADS = "kestra.worker.max-num-threads";
|
||||
private static final String KESTRA_WORKER_GROUP_KEY = "kestra.worker.group-key";
|
||||
|
||||
/**
|
||||
* Gets the current {@link KestraContext}.
|
||||
*
|
||||
@@ -54,6 +60,12 @@ public abstract class KestraContext {
|
||||
*/
|
||||
public abstract ServerType getServerType();
|
||||
|
||||
public abstract Optional<Integer> getWorkerMaxNumThreads();
|
||||
|
||||
public abstract Optional<String> getWorkerGroupKey();
|
||||
|
||||
public abstract void injectWorkerConfigs(Integer maxNumThreads, String workerGroupKey);
|
||||
|
||||
/**
|
||||
* Returns the Kestra Version.
|
||||
*
|
||||
@@ -110,6 +122,34 @@ public abstract class KestraContext {
|
||||
.orElse(ServerType.STANDALONE);
|
||||
}
|
||||
|
||||
/** {@inheritDoc} **/
|
||||
@Override
|
||||
public Optional<Integer> getWorkerMaxNumThreads() {
|
||||
return Optional.ofNullable(environment)
|
||||
.flatMap(env -> env.getProperty(KESTRA_WORKER_MAX_NUM_THREADS, Integer.class));
|
||||
}
|
||||
|
||||
/** {@inheritDoc} **/
|
||||
@Override
|
||||
public Optional<String> getWorkerGroupKey() {
|
||||
return Optional.ofNullable(environment)
|
||||
.flatMap(env -> env.getProperty(KESTRA_WORKER_GROUP_KEY, String.class));
|
||||
}
|
||||
/** {@inheritDoc} **/
|
||||
@Override
|
||||
public void injectWorkerConfigs(Integer maxNumThreads, String workerGroupKey) {
|
||||
final Map<String, Object> configs = new HashMap<>();
|
||||
Optional.ofNullable(maxNumThreads)
|
||||
.ifPresent(val -> configs.put(KESTRA_WORKER_MAX_NUM_THREADS, val));
|
||||
|
||||
Optional.ofNullable(workerGroupKey)
|
||||
.ifPresent(val -> configs.put(KESTRA_WORKER_GROUP_KEY, val));
|
||||
|
||||
if (!configs.isEmpty()) {
|
||||
environment.addPropertySource("kestra-runtime", configs);
|
||||
}
|
||||
}
|
||||
|
||||
/** {@inheritDoc} **/
|
||||
@Override
|
||||
public void shutdown() {
|
||||
|
||||
@@ -82,6 +82,7 @@ public class JsonSchemaGenerator {
|
||||
}
|
||||
replaceAnyOfWithOneOf(objectNode);
|
||||
pullOfDefaultFromOneOf(objectNode);
|
||||
removeRequiredOnPropsWithDefaults(objectNode);
|
||||
|
||||
return JacksonMapper.toMap(objectNode);
|
||||
} catch (IllegalArgumentException e) {
|
||||
@@ -89,6 +90,27 @@ public class JsonSchemaGenerator {
|
||||
}
|
||||
}
|
||||
|
||||
private void removeRequiredOnPropsWithDefaults(ObjectNode objectNode) {
|
||||
objectNode.findParents("required").forEach(jsonNode -> {
|
||||
if (jsonNode instanceof ObjectNode clazzSchema && clazzSchema.get("required") instanceof ArrayNode requiredPropsNode && clazzSchema.get("properties") instanceof ObjectNode properties) {
|
||||
List<String> requiredFieldValues = StreamSupport.stream(requiredPropsNode.spliterator(), false)
|
||||
.map(JsonNode::asText)
|
||||
.toList();
|
||||
|
||||
properties.fields().forEachRemaining(e -> {
|
||||
int indexInRequiredArray = requiredFieldValues.indexOf(e.getKey());
|
||||
if (indexInRequiredArray != -1 && e.getValue() instanceof ObjectNode valueNode && valueNode.has("default")) {
|
||||
requiredPropsNode.remove(indexInRequiredArray);
|
||||
}
|
||||
});
|
||||
|
||||
if (requiredPropsNode.isEmpty()) {
|
||||
clazzSchema.remove("required");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void replaceAnyOfWithOneOf(ObjectNode objectNode) {
|
||||
objectNode.findParents("anyOf").forEach(jsonNode -> {
|
||||
if (jsonNode instanceof ObjectNode oNode) {
|
||||
@@ -311,10 +333,12 @@ public class JsonSchemaGenerator {
|
||||
if (member.getDeclaredType().isInstanceOf(Property.class)) {
|
||||
memberAttributes.put("$dynamic", true);
|
||||
// if we are in the String definition of a Property but the target type is not String: we configure the pattern
|
||||
Class<?> targetType = member.getDeclaredType().getTypeParameters().getFirst().getErasedType();
|
||||
if (!String.class.isAssignableFrom(targetType) && String.class.isAssignableFrom(member.getType().getErasedType())) {
|
||||
memberAttributes.put("pattern", ".*{{.*}}.*");
|
||||
}
|
||||
// TODO this was a good idea but their is too much cases where it didn't work like in List or Map so if we want it we need to make it more clever
|
||||
// I keep it for now commented but at some point we may want to re-do and improve it or remove these commented lines
|
||||
// Class<?> targetType = member.getDeclaredType().getTypeParameters().getFirst().getErasedType();
|
||||
// if (!String.class.isAssignableFrom(targetType) && String.class.isAssignableFrom(member.getType().getErasedType())) {
|
||||
// memberAttributes.put("pattern", ".*{{.*}}.*");
|
||||
// }
|
||||
} else if (member.getDeclaredType().isInstanceOf(Data.class)) {
|
||||
memberAttributes.put("$dynamic", false);
|
||||
}
|
||||
@@ -603,6 +627,7 @@ public class JsonSchemaGenerator {
|
||||
ObjectNode objectNode = generator.generateSchema(cls);
|
||||
replaceAnyOfWithOneOf(objectNode);
|
||||
pullOfDefaultFromOneOf(objectNode);
|
||||
removeRequiredOnPropsWithDefaults(objectNode);
|
||||
|
||||
return JacksonMapper.toMap(extractMainRef(objectNode));
|
||||
} catch (IllegalArgumentException e) {
|
||||
|
||||
@@ -7,6 +7,7 @@ import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.function.Predicate.not;
|
||||
@@ -40,15 +41,22 @@ public class Plugin {
|
||||
private String subGroup;
|
||||
|
||||
public static Plugin of(RegisteredPlugin registeredPlugin, @Nullable String subgroup) {
|
||||
return Plugin.of(registeredPlugin, subgroup, true);
|
||||
}
|
||||
|
||||
public static Plugin of(RegisteredPlugin registeredPlugin, @Nullable String subgroup, boolean includeDeprecated) {
|
||||
Plugin plugin = new Plugin();
|
||||
plugin.name = registeredPlugin.name();
|
||||
PluginSubGroup subGroupInfos = null;
|
||||
if (subgroup == null) {
|
||||
plugin.title = registeredPlugin.title();
|
||||
} else {
|
||||
subGroupInfos = registeredPlugin.allClass().stream().filter(c -> c.getName().contains(subgroup)).map(clazz -> clazz.getPackage().getDeclaredAnnotation(PluginSubGroup.class)).toList().getFirst();
|
||||
subGroupInfos = registeredPlugin.allClass().stream()
|
||||
.filter(c -> c.getPackageName().contains(subgroup))
|
||||
.min(Comparator.comparingInt(a -> a.getPackageName().length()))
|
||||
.map(clazz -> clazz.getPackage().getDeclaredAnnotation(PluginSubGroup.class))
|
||||
.orElseThrow();
|
||||
plugin.title = !subGroupInfos.title().isEmpty() ? subGroupInfos.title() : subgroup.substring(subgroup.lastIndexOf('.') + 1);;
|
||||
|
||||
}
|
||||
plugin.group = registeredPlugin.group();
|
||||
plugin.description = subGroupInfos != null && !subGroupInfos.description().isEmpty() ? subGroupInfos.description() : registeredPlugin.description();
|
||||
@@ -80,17 +88,18 @@ public class Plugin {
|
||||
|
||||
plugin.subGroup = subgroup;
|
||||
|
||||
plugin.tasks = filterAndGetClassName(registeredPlugin.getTasks()).stream().filter(c -> subgroup == null || c.startsWith(subgroup)).toList();
|
||||
plugin.triggers = filterAndGetClassName(registeredPlugin.getTriggers()).stream().filter(c -> subgroup == null || c.startsWith(subgroup)).toList();
|
||||
plugin.conditions = filterAndGetClassName(registeredPlugin.getConditions()).stream().filter(c -> subgroup == null || c.startsWith(subgroup)).toList();
|
||||
plugin.storages = filterAndGetClassName(registeredPlugin.getStorages()).stream().filter(c -> subgroup == null || c.startsWith(subgroup)).toList();
|
||||
plugin.secrets = filterAndGetClassName(registeredPlugin.getSecrets()).stream().filter(c -> subgroup == null || c.startsWith(subgroup)).toList();
|
||||
plugin.taskRunners = filterAndGetClassName(registeredPlugin.getTaskRunners()).stream().filter(c -> subgroup == null || c.startsWith(subgroup)).toList();
|
||||
plugin.apps = filterAndGetClassName(registeredPlugin.getApps()).stream().filter(c -> subgroup == null || c.startsWith(subgroup)).toList();
|
||||
plugin.appBlocks = filterAndGetClassName(registeredPlugin.getAppBlocks()).stream().filter(c -> subgroup == null || c.startsWith(subgroup)).toList();
|
||||
plugin.charts = filterAndGetClassName(registeredPlugin.getCharts()).stream().filter(c -> subgroup == null || c.startsWith(subgroup)).toList();
|
||||
plugin.dataFilters = filterAndGetClassName(registeredPlugin.getDataFilters()).stream().filter(c -> subgroup == null || c.startsWith(subgroup)).toList();
|
||||
plugin.logExporters = filterAndGetClassName(registeredPlugin.getLogExporters()).stream().filter(c -> subgroup == null || c.startsWith(subgroup)).toList();
|
||||
Predicate<Class<?>> packagePredicate = c -> subgroup == null || c.getPackageName().equals(subgroup);
|
||||
plugin.tasks = filterAndGetClassName(registeredPlugin.getTasks(), includeDeprecated, packagePredicate).stream().toList();
|
||||
plugin.triggers = filterAndGetClassName(registeredPlugin.getTriggers(), includeDeprecated, packagePredicate).stream().toList();
|
||||
plugin.conditions = filterAndGetClassName(registeredPlugin.getConditions(), includeDeprecated, packagePredicate).stream().toList();
|
||||
plugin.storages = filterAndGetClassName(registeredPlugin.getStorages(), includeDeprecated, packagePredicate).stream().toList();
|
||||
plugin.secrets = filterAndGetClassName(registeredPlugin.getSecrets(), includeDeprecated, packagePredicate).stream().toList();
|
||||
plugin.taskRunners = filterAndGetClassName(registeredPlugin.getTaskRunners(), includeDeprecated, packagePredicate).stream().toList();
|
||||
plugin.apps = filterAndGetClassName(registeredPlugin.getApps(), includeDeprecated, packagePredicate).stream().toList();
|
||||
plugin.appBlocks = filterAndGetClassName(registeredPlugin.getAppBlocks(), includeDeprecated, packagePredicate).stream().toList();
|
||||
plugin.charts = filterAndGetClassName(registeredPlugin.getCharts(), includeDeprecated, packagePredicate).stream().toList();
|
||||
plugin.dataFilters = filterAndGetClassName(registeredPlugin.getDataFilters(), includeDeprecated, packagePredicate).stream().toList();
|
||||
plugin.logExporters = filterAndGetClassName(registeredPlugin.getLogExporters(), includeDeprecated, packagePredicate).stream().toList();
|
||||
|
||||
return plugin;
|
||||
}
|
||||
@@ -100,12 +109,15 @@ public class Plugin {
|
||||
* Those classes are only filtered from the documentation to ensure backward compatibility.
|
||||
*
|
||||
* @param list The list of classes?
|
||||
* @param includeDeprecated whether to include deprecated plugins or not
|
||||
* @return a filtered streams.
|
||||
*/
|
||||
private static List<String> filterAndGetClassName(final List<? extends Class<?>> list) {
|
||||
private static List<String> filterAndGetClassName(final List<? extends Class<?>> list, boolean includeDeprecated, Predicate<Class<?>> clazzFilter) {
|
||||
return list
|
||||
.stream()
|
||||
.filter(not(io.kestra.core.models.Plugin::isInternal))
|
||||
.filter(p -> includeDeprecated || !io.kestra.core.models.Plugin.isDeprecated(p))
|
||||
.filter(clazzFilter)
|
||||
.map(Class::getName)
|
||||
.filter(c -> !c.startsWith("org.kestra."))
|
||||
.toList();
|
||||
|
||||
@@ -142,12 +142,22 @@ public class HttpRequest {
|
||||
public abstract static class RequestBody {
|
||||
public abstract HttpEntity to() throws IOException;
|
||||
|
||||
public abstract Object getContent() throws IOException;
|
||||
|
||||
public abstract Charset getCharset() throws IOException;
|
||||
|
||||
public abstract String getContentType() throws IOException;
|
||||
|
||||
protected ContentType entityContentType() throws IOException {
|
||||
return this.getCharset() != null ? ContentType.create(this.getContentType(), this.getCharset()) : ContentType.create(this.getContentType());
|
||||
}
|
||||
|
||||
public static RequestBody from(HttpEntity entity) throws IOException {
|
||||
if (entity == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Charset charset = Charset.forName(entity.getContentEncoding());
|
||||
Charset charset = entity.getContentEncoding() != null ? Charset.forName(entity.getContentEncoding()) : StandardCharsets.UTF_8;
|
||||
|
||||
if (entity.getContentType().equals(ContentType.APPLICATION_OCTET_STREAM.getMimeType())) {
|
||||
return ByteArrayRequestBody.builder()
|
||||
@@ -172,71 +182,80 @@ public class HttpRequest {
|
||||
.build();
|
||||
}
|
||||
|
||||
throw new IllegalArgumentException("Unsupported Content-Type: " + entity.getContentType());
|
||||
return ByteArrayRequestBody.builder()
|
||||
.charset(charset)
|
||||
.contentType(entity.getContentType())
|
||||
.content(entity.getContent().readAllBytes())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
@SuperBuilder
|
||||
public static class InputStreamRequestBody extends RequestBody {
|
||||
@Builder.Default
|
||||
private String contentType = ContentType.APPLICATION_OCTET_STREAM.getMimeType();
|
||||
|
||||
@Builder.Default
|
||||
private Charset charset = StandardCharsets.UTF_8;
|
||||
private Charset charset;
|
||||
|
||||
private InputStream content;
|
||||
|
||||
public HttpEntity to() {
|
||||
return new InputStreamEntity(content, ContentType.create(contentType, charset));
|
||||
public HttpEntity to() throws IOException {
|
||||
return new InputStreamEntity(content, this.entityContentType());
|
||||
}
|
||||
}
|
||||
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
@SuperBuilder
|
||||
public static class StringRequestBody extends RequestBody {
|
||||
@Builder.Default
|
||||
private String contentType = ContentType.TEXT_PLAIN.getMimeType();
|
||||
|
||||
@Builder.Default
|
||||
private Charset charset = StandardCharsets.UTF_8;
|
||||
private Charset charset;
|
||||
|
||||
private String content;
|
||||
|
||||
public HttpEntity to() {
|
||||
return new StringEntity(this.content, ContentType.create(contentType, charset));
|
||||
public HttpEntity to() throws IOException {
|
||||
return new StringEntity(this.content, this.entityContentType());
|
||||
}
|
||||
}
|
||||
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
@SuperBuilder
|
||||
public static class ByteArrayRequestBody extends RequestBody {
|
||||
@Builder.Default
|
||||
private String contentType = ContentType.APPLICATION_OCTET_STREAM.getMimeType();
|
||||
|
||||
@Builder.Default
|
||||
private Charset charset = StandardCharsets.UTF_8;
|
||||
private Charset charset;
|
||||
|
||||
private byte[] content;
|
||||
|
||||
public HttpEntity to() {
|
||||
return new ByteArrayEntity(content, ContentType.create(contentType, charset));
|
||||
public HttpEntity to() throws IOException {
|
||||
return new ByteArrayEntity(content, this.entityContentType());
|
||||
}
|
||||
}
|
||||
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
@SuperBuilder
|
||||
public static class JsonRequestBody extends RequestBody {
|
||||
@Builder.Default
|
||||
private Charset charset = StandardCharsets.UTF_8;
|
||||
private Charset charset;
|
||||
|
||||
private Object content;
|
||||
|
||||
@Override
|
||||
public String getContentType() throws IOException {
|
||||
return ContentType.APPLICATION_JSON.getMimeType();
|
||||
}
|
||||
|
||||
public HttpEntity to() throws IOException {
|
||||
try {
|
||||
return new StringEntity(
|
||||
JacksonMapper.ofJson().writeValueAsString(content),
|
||||
ContentType.APPLICATION_JSON.withCharset(this.charset)
|
||||
this.charset != null ? ContentType.APPLICATION_JSON.withCharset(this.charset) : ContentType.APPLICATION_JSON
|
||||
);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IOException(e);
|
||||
@@ -244,37 +263,49 @@ public class HttpRequest {
|
||||
}
|
||||
}
|
||||
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
@SuperBuilder
|
||||
public static class UrlEncodedRequestBody extends RequestBody {
|
||||
@Builder.Default
|
||||
private Charset charset = StandardCharsets.UTF_8;
|
||||
private Charset charset;
|
||||
|
||||
private Map<String, Object> content;
|
||||
|
||||
@Override
|
||||
public String getContentType() throws IOException {
|
||||
return ContentType.APPLICATION_FORM_URLENCODED.getMimeType();
|
||||
}
|
||||
|
||||
public HttpEntity to() throws IOException {
|
||||
return new UrlEncodedFormEntity(
|
||||
this.content .entrySet()
|
||||
.stream()
|
||||
.map(e -> new BasicNameValuePair(e.getKey(), e.getValue().toString()))
|
||||
.toList(),
|
||||
this.charset
|
||||
);
|
||||
List<BasicNameValuePair> list = this.content.entrySet()
|
||||
.stream()
|
||||
.map(e -> new BasicNameValuePair(e.getKey(), e.getValue().toString()))
|
||||
.toList();
|
||||
|
||||
return this.charset != null ? new UrlEncodedFormEntity(list, this.charset) : new UrlEncodedFormEntity(list);
|
||||
}
|
||||
}
|
||||
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
@SuperBuilder
|
||||
public static class MultipartRequestBody extends RequestBody {
|
||||
@Builder.Default
|
||||
private Charset charset = StandardCharsets.UTF_8;
|
||||
private Charset charset;
|
||||
|
||||
private Map<String, Object> content;
|
||||
|
||||
@Override
|
||||
public String getContentType() throws IOException {
|
||||
return ContentType.MULTIPART_MIXED.getMimeType();
|
||||
}
|
||||
|
||||
public HttpEntity to() throws IOException {
|
||||
MultipartEntityBuilder builder = MultipartEntityBuilder
|
||||
.create()
|
||||
.setCharset(this.charset);
|
||||
.create();
|
||||
|
||||
if (this.charset != null) {
|
||||
builder.setCharset(this.charset);
|
||||
}
|
||||
|
||||
content.forEach((key, value) -> {
|
||||
switch (value) {
|
||||
|
||||
@@ -8,6 +8,7 @@ import io.kestra.core.http.client.apache.*;
|
||||
import io.kestra.core.http.client.configurations.HttpConfiguration;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.micrometer.common.util.StringUtils;
|
||||
import jakarta.annotation.Nullable;
|
||||
import lombok.Builder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -87,47 +88,49 @@ public class HttpClient implements Closeable {
|
||||
|
||||
// Timeout
|
||||
if (this.configuration.getTimeout() != null) {
|
||||
var connectTiemout = runContext.render(this.configuration.getTimeout().getConnectTimeout()).as(Duration.class);
|
||||
connectTiemout.ifPresent(duration -> connectionConfig.setConnectTimeout(Timeout.of(duration)));
|
||||
var connectTimeout = runContext.render(this.configuration.getTimeout().getConnectTimeout()).as(Duration.class);
|
||||
connectTimeout.ifPresent(duration -> connectionConfig.setConnectTimeout(Timeout.of(duration)));
|
||||
|
||||
var readIdleTiemout = runContext.render(this.configuration.getTimeout().getReadIdleTimeout()).as(Duration.class);
|
||||
readIdleTiemout.ifPresent(duration -> connectionConfig.setSocketTimeout(Timeout.of(duration)));
|
||||
var readIdleTimeout = runContext.render(this.configuration.getTimeout().getReadIdleTimeout()).as(Duration.class);
|
||||
readIdleTimeout.ifPresent(duration -> connectionConfig.setSocketTimeout(Timeout.of(duration)));
|
||||
}
|
||||
|
||||
// proxy
|
||||
if (this.configuration.getProxy() != null) {
|
||||
SocketAddress proxyAddr = new InetSocketAddress(
|
||||
runContext.render(configuration.getProxy().getAddress()).as(String.class).orElse(null),
|
||||
runContext.render(configuration.getProxy().getPort()).as(Integer.class).orElse(null)
|
||||
);
|
||||
if (this.configuration.getProxy() != null && configuration.getProxy().getAddress() != null) {
|
||||
String proxyAddress = runContext.render(configuration.getProxy().getAddress()).as(String.class).orElse(null);
|
||||
|
||||
Proxy proxy = new Proxy(runContext.render(configuration.getProxy().getType()).as(Proxy.Type.class).orElse(null), proxyAddr);
|
||||
|
||||
builder.setProxySelector(new ProxySelector() {
|
||||
@Override
|
||||
public void connectFailed(URI uri, SocketAddress sa, IOException e) {
|
||||
/* ignore */
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Proxy> select(URI uri) {
|
||||
return List.of(proxy);
|
||||
}
|
||||
});
|
||||
|
||||
if (this.configuration.getProxy().getUsername() != null && this.configuration.getProxy().getPassword() != null) {
|
||||
builder.setProxyAuthenticationStrategy(new DefaultAuthenticationStrategy());
|
||||
|
||||
credentialsStore.setCredentials(
|
||||
new AuthScope(
|
||||
runContext.render(this.configuration.getProxy().getAddress()).as(String.class).orElse(null),
|
||||
runContext.render(this.configuration.getProxy().getPort()).as(Integer.class).orElse(null)
|
||||
),
|
||||
new UsernamePasswordCredentials(
|
||||
runContext.render(this.configuration.getProxy().getUsername()).as(String.class).orElseThrow(),
|
||||
runContext.render(this.configuration.getProxy().getPassword()).as(String.class).orElseThrow().toCharArray()
|
||||
)
|
||||
if (StringUtils.isNotEmpty(proxyAddress)) {
|
||||
int port = runContext.render(configuration.getProxy().getPort()).as(Integer.class).orElseThrow();
|
||||
SocketAddress proxyAddr = new InetSocketAddress(
|
||||
proxyAddress,
|
||||
port
|
||||
);
|
||||
|
||||
Proxy proxy = new Proxy(runContext.render(configuration.getProxy().getType()).as(Proxy.Type.class).orElse(null), proxyAddr);
|
||||
|
||||
builder.setProxySelector(new ProxySelector() {
|
||||
@Override
|
||||
public void connectFailed(URI uri, SocketAddress sa, IOException e) {
|
||||
/* ignore */
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Proxy> select(URI uri) {
|
||||
return List.of(proxy);
|
||||
}
|
||||
});
|
||||
|
||||
if (this.configuration.getProxy().getUsername() != null && this.configuration.getProxy().getPassword() != null) {
|
||||
builder.setProxyAuthenticationStrategy(new DefaultAuthenticationStrategy());
|
||||
|
||||
credentialsStore.setCredentials(
|
||||
new AuthScope(proxyAddress, port),
|
||||
new UsernamePasswordCredentials(
|
||||
runContext.render(this.configuration.getProxy().getUsername()).as(String.class).orElseThrow(),
|
||||
runContext.render(this.configuration.getProxy().getPassword()).as(String.class).orElseThrow().toCharArray()
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -23,9 +23,9 @@ public class RunContextResponseInterceptor implements HttpResponseInterceptor {
|
||||
response instanceof BasicClassicHttpResponse httpResponse
|
||||
) {
|
||||
try {
|
||||
// FIXME temporary fix for https://github.com/kestra-io/kestra/issues/8092
|
||||
runContext.logger().debug(
|
||||
"Request '{}' from '{}' with the response code '{}'",
|
||||
httpClientContext.getRequest().getUri(),
|
||||
"Request " + httpClientContext.getRequest().getUri() + " from '{}' with the response code '{}'",
|
||||
httpClientContext.getEndpointDetails().getRemoteAddress(),
|
||||
response.getCode()
|
||||
);
|
||||
|
||||
@@ -3,8 +3,8 @@ package io.kestra.core.http.client.configurations;
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
|
||||
|
||||
@@ -14,8 +14,9 @@ import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
|
||||
@JsonSubTypes.Type(value = BearerAuthConfiguration.class, name = "BEARER")
|
||||
})
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@NoArgsConstructor
|
||||
public abstract class AbstractAuthConfiguration {
|
||||
public abstract Property<AuthType> getType();
|
||||
public abstract AuthType getType();
|
||||
|
||||
public abstract void configure(HttpClientBuilder builder, RunContext runContext) throws IllegalVariableEvaluationException;
|
||||
|
||||
|
||||
@@ -6,8 +6,7 @@ import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
|
||||
import org.apache.hc.core5.http.HttpHeaders;
|
||||
@@ -16,19 +15,20 @@ import org.apache.hc.core5.http.message.BasicHeader;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Base64;
|
||||
|
||||
@Getter
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
public class BasicAuthConfiguration extends AbstractAuthConfiguration {
|
||||
@NotNull
|
||||
@JsonInclude
|
||||
@Builder.Default
|
||||
protected Property<AuthType> type = Property.of(AuthType.BASIC);
|
||||
protected AuthType type = AuthType.BASIC;
|
||||
|
||||
@Schema(title = "The username for HTTP basic authentication.")
|
||||
private final Property<String> username;
|
||||
private Property<String> username;
|
||||
|
||||
@Schema(title = "The password for HTTP basic authentication.")
|
||||
private final Property<String> password;
|
||||
private Property<String> password;
|
||||
|
||||
@Override
|
||||
public void configure(HttpClientBuilder builder, RunContext runContext) throws IllegalVariableEvaluationException {
|
||||
|
||||
@@ -8,21 +8,23 @@ import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
|
||||
import org.apache.hc.core5.http.HttpHeaders;
|
||||
import org.apache.hc.core5.http.message.BasicHeader;
|
||||
|
||||
@Getter
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
public class BearerAuthConfiguration extends AbstractAuthConfiguration {
|
||||
@NotNull
|
||||
@JsonInclude
|
||||
@Builder.Default
|
||||
protected Property<AuthType> type = Property.of(AuthType.BEARER);
|
||||
protected AuthType type = AuthType.BEARER;
|
||||
|
||||
@Schema(title = "The token for bearer token authentication.")
|
||||
private final Property<String> token;
|
||||
private Property<String> token;
|
||||
|
||||
@Override
|
||||
public void configure(HttpClientBuilder builder, RunContext runContext) throws IllegalVariableEvaluationException {
|
||||
|
||||
@@ -2,20 +2,21 @@ package io.kestra.core.http.client.configurations;
|
||||
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.micronaut.http.client.HttpClientConfiguration;
|
||||
import io.micronaut.logging.LogLevel;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.jackson.Jacksonized;
|
||||
|
||||
import java.net.Proxy;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
|
||||
@Builder(toBuilder = true)
|
||||
@Getter
|
||||
@Jacksonized
|
||||
public class HttpConfiguration {
|
||||
@Schema(title = "The timeout configuration.")
|
||||
@PluginProperty
|
||||
@@ -28,6 +29,7 @@ public class HttpConfiguration {
|
||||
@Schema(title = "The authentification to use.")
|
||||
private AbstractAuthConfiguration auth;
|
||||
|
||||
@Setter
|
||||
@Schema(title = "The SSL request options")
|
||||
private SslOptions ssl;
|
||||
|
||||
@@ -35,6 +37,7 @@ public class HttpConfiguration {
|
||||
@Builder.Default
|
||||
private Property<Boolean> followRedirects = Property.of(true);
|
||||
|
||||
@Setter
|
||||
@Schema(title = "If true, allow a failed response code (response code >= 400)")
|
||||
@Builder.Default
|
||||
private Property<Boolean> allowFailed = Property.of(false);
|
||||
@@ -55,261 +58,212 @@ public class HttpConfiguration {
|
||||
}
|
||||
|
||||
// Deprecated properties
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
*/
|
||||
@Schema(title = "The time allowed to establish a connection to the server before failing.")
|
||||
@Deprecated
|
||||
private final Property<Duration> connectTimeout;
|
||||
private final Duration connectTimeout;
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
public void setConnectTimeout(Property<Duration> connectTimeout) {
|
||||
if (this.timeout == null) {
|
||||
this.timeout = TimeoutConfiguration.builder()
|
||||
.build();
|
||||
}
|
||||
|
||||
this.timeout = this.timeout.toBuilder()
|
||||
.connectTimeout(connectTimeout)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
*/
|
||||
@Schema(title = "The maximum time allowed for reading data from the server before failing.")
|
||||
@Builder.Default
|
||||
@Deprecated
|
||||
private final Property<Duration> readTimeout = Property.of(Duration.ofSeconds(HttpClientConfiguration.DEFAULT_READ_TIMEOUT_SECONDS));
|
||||
private final Duration readTimeout;
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
public void setReadTimeout(Property<Duration> readTimeout) {
|
||||
if (this.timeout == null) {
|
||||
this.timeout = TimeoutConfiguration.builder()
|
||||
.build();
|
||||
}
|
||||
|
||||
this.timeout = this.timeout.toBuilder()
|
||||
.readIdleTimeout(readTimeout)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
*/
|
||||
@Schema(title = "The type of proxy to use.")
|
||||
@Builder.Default
|
||||
@Deprecated
|
||||
private final Property<Proxy.Type> proxyType = Property.of(Proxy.Type.DIRECT);
|
||||
private final Proxy.Type proxyType;
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
public void setProxyType(Property<Proxy.Type> proxyType) {
|
||||
if (this.proxy == null) {
|
||||
this.proxy = ProxyConfiguration.builder()
|
||||
.build();
|
||||
}
|
||||
|
||||
this.proxy = this.proxy.toBuilder()
|
||||
.type(proxyType)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
*/
|
||||
@Schema(title = "The address of the proxy server.")
|
||||
@Deprecated
|
||||
private final Property<String> proxyAddress;
|
||||
private final String proxyAddress;
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
public void setProxyAddress(Property<String> proxyAddress) {
|
||||
if (this.proxy == null) {
|
||||
this.proxy = ProxyConfiguration.builder()
|
||||
.build();
|
||||
}
|
||||
|
||||
this.proxy = this.proxy.toBuilder()
|
||||
.address(proxyAddress)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
*/
|
||||
@Schema(title = "The port of the proxy server.")
|
||||
@Deprecated
|
||||
private final Property<Integer> proxyPort;
|
||||
private final Integer proxyPort;
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
public void setProxyPort(Property<Integer> proxyPort) {
|
||||
if (this.proxy == null) {
|
||||
this.proxy = ProxyConfiguration.builder()
|
||||
.build();
|
||||
}
|
||||
|
||||
this.proxy = this.proxy.toBuilder()
|
||||
.port(proxyPort)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
*/
|
||||
@Schema(title = "The username for proxy authentication.")
|
||||
@Deprecated
|
||||
private final Property<String> proxyUsername;
|
||||
private final String proxyUsername;
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
public void setProxyUsername(Property<String> proxyUsername) {
|
||||
if (this.proxy == null) {
|
||||
this.proxy = ProxyConfiguration.builder()
|
||||
.build();
|
||||
}
|
||||
|
||||
this.proxy = this.proxy.toBuilder()
|
||||
.username(proxyUsername)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
*/
|
||||
@Schema(title = "The password for proxy authentication.")
|
||||
@Deprecated
|
||||
private final Property<String> proxyPassword;
|
||||
private final String proxyPassword;
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
public void setProxyPassword(Property<String> proxyPassword) {
|
||||
if (this.proxy == null) {
|
||||
this.proxy = ProxyConfiguration.builder()
|
||||
.build();
|
||||
}
|
||||
|
||||
this.proxy = this.proxy.toBuilder()
|
||||
.password(proxyPassword)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
*/
|
||||
@Schema(title = "The username for HTTP basic authentication.")
|
||||
@Deprecated
|
||||
private final Property<String> basicAuthUser;
|
||||
private final String basicAuthUser;
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
public void setBasicAuthUser(Property<String> basicAuthUser) {
|
||||
if (this.auth == null || !(this.auth instanceof BasicAuthConfiguration)) {
|
||||
this.auth = BasicAuthConfiguration.builder()
|
||||
.build();
|
||||
}
|
||||
|
||||
this.auth = ((BasicAuthConfiguration) this.auth).toBuilder()
|
||||
.username(basicAuthUser)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
*/
|
||||
@Schema(title = "The password for HTTP basic authentication.")
|
||||
@Deprecated
|
||||
private final Property<String> basicAuthPassword;
|
||||
private final String basicAuthPassword;
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
private void setBasicAuthPassword(Property<String> basicAuthPassword) {
|
||||
if (this.auth == null || !(this.auth instanceof BasicAuthConfiguration)) {
|
||||
this.auth = BasicAuthConfiguration.builder()
|
||||
.build();
|
||||
}
|
||||
|
||||
this.auth = ((BasicAuthConfiguration) this.auth).toBuilder()
|
||||
.password(basicAuthPassword)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
*/
|
||||
@Schema(title = "The log level for the HTTP client.")
|
||||
@PluginProperty
|
||||
@Deprecated
|
||||
private final LogLevel logLevel;
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
*/
|
||||
// Deprecated properties with no equivalent value to be kept, silently ignore
|
||||
@Schema(title = "The time allowed for a read connection to remain idle before closing it.")
|
||||
@Deprecated
|
||||
private void setLogLevel(LogLevel logLevel) {
|
||||
if (logLevel == LogLevel.TRACE) {
|
||||
this.logs = new LoggingType[]{
|
||||
LoggingType.REQUEST_HEADERS,
|
||||
LoggingType.REQUEST_BODY,
|
||||
LoggingType.RESPONSE_HEADERS,
|
||||
LoggingType.RESPONSE_BODY
|
||||
};
|
||||
} else if (logLevel == LogLevel.DEBUG) {
|
||||
this.logs = new LoggingType[]{
|
||||
LoggingType.REQUEST_HEADERS,
|
||||
LoggingType.RESPONSE_HEADERS,
|
||||
};
|
||||
} else if (logLevel == LogLevel.INFO) {
|
||||
this.logs = new LoggingType[]{
|
||||
LoggingType.RESPONSE_HEADERS,
|
||||
};
|
||||
private final Duration readIdleTimeout;
|
||||
|
||||
@Schema(title = "The time an idle connection can remain in the client's connection pool before being closed.")
|
||||
@Deprecated
|
||||
private final Duration connectionPoolIdleTimeout;
|
||||
|
||||
@Schema(title = "The maximum content length of the response.")
|
||||
@Deprecated
|
||||
private final Integer maxContentLength;
|
||||
|
||||
public static class HttpConfigurationBuilder {
|
||||
@Deprecated
|
||||
public HttpConfigurationBuilder connectTimeout(Duration connectTimeout) {
|
||||
if (this.timeout == null) {
|
||||
this.timeout = TimeoutConfiguration.builder()
|
||||
.build();
|
||||
}
|
||||
|
||||
this.timeout = this.timeout.toBuilder()
|
||||
.connectTimeout(Property.of(connectTimeout))
|
||||
.build();
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public HttpConfigurationBuilder readTimeout(Duration readTimeout) {
|
||||
if (this.timeout == null) {
|
||||
this.timeout = TimeoutConfiguration.builder()
|
||||
.build();
|
||||
}
|
||||
|
||||
this.timeout = this.timeout.toBuilder()
|
||||
.readIdleTimeout(Property.of(readTimeout))
|
||||
.build();
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
@Deprecated
|
||||
public HttpConfigurationBuilder proxyType(Proxy.Type proxyType) {
|
||||
if (this.proxy == null) {
|
||||
this.proxy = ProxyConfiguration.builder()
|
||||
.build();
|
||||
}
|
||||
|
||||
this.proxy = this.proxy.toBuilder()
|
||||
.type(Property.of(proxyType))
|
||||
.build();
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public HttpConfigurationBuilder proxyAddress(String proxyAddress) {
|
||||
if (this.proxy == null) {
|
||||
this.proxy = ProxyConfiguration.builder()
|
||||
.build();
|
||||
}
|
||||
|
||||
this.proxy = this.proxy.toBuilder()
|
||||
.address(Property.of(proxyAddress))
|
||||
.build();
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public HttpConfigurationBuilder proxyPort(Integer proxyPort) {
|
||||
if (this.proxy == null) {
|
||||
this.proxy = ProxyConfiguration.builder()
|
||||
.build();
|
||||
}
|
||||
|
||||
this.proxy = this.proxy.toBuilder()
|
||||
.port(Property.of(proxyPort))
|
||||
.build();
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public HttpConfigurationBuilder proxyUsername(String proxyUsername) {
|
||||
if (this.proxy == null) {
|
||||
this.proxy = ProxyConfiguration.builder()
|
||||
.build();
|
||||
}
|
||||
|
||||
this.proxy = this.proxy.toBuilder()
|
||||
.username(Property.of(proxyUsername))
|
||||
.build();
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public HttpConfigurationBuilder proxyPassword(String proxyPassword) {
|
||||
if (this.proxy == null) {
|
||||
this.proxy = ProxyConfiguration.builder()
|
||||
.build();
|
||||
}
|
||||
|
||||
this.proxy = this.proxy.toBuilder()
|
||||
.password(Property.of(proxyPassword))
|
||||
.build();
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("DeprecatedIsStillUsed")
|
||||
@Deprecated
|
||||
public HttpConfigurationBuilder basicAuthUser(String basicAuthUser) {
|
||||
if (this.auth == null || !(this.auth instanceof BasicAuthConfiguration)) {
|
||||
this.auth = BasicAuthConfiguration.builder()
|
||||
.build();
|
||||
}
|
||||
|
||||
this.auth = ((BasicAuthConfiguration) this.auth).toBuilder()
|
||||
.username(Property.of(basicAuthUser))
|
||||
.build();
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("DeprecatedIsStillUsed")
|
||||
@Deprecated
|
||||
public HttpConfigurationBuilder basicAuthPassword(String basicAuthPassword) {
|
||||
if (this.auth == null || !(this.auth instanceof BasicAuthConfiguration)) {
|
||||
this.auth = BasicAuthConfiguration.builder()
|
||||
.build();
|
||||
}
|
||||
|
||||
this.auth = ((BasicAuthConfiguration) this.auth).toBuilder()
|
||||
.password(Property.of(basicAuthPassword))
|
||||
.build();
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public HttpConfigurationBuilder logLevel(LogLevel logLevel) {
|
||||
if (logLevel == LogLevel.TRACE) {
|
||||
this.logs = new LoggingType[]{
|
||||
LoggingType.REQUEST_HEADERS,
|
||||
LoggingType.REQUEST_BODY,
|
||||
LoggingType.RESPONSE_HEADERS,
|
||||
LoggingType.RESPONSE_BODY
|
||||
};
|
||||
} else if (logLevel == LogLevel.DEBUG) {
|
||||
this.logs = new LoggingType[]{
|
||||
LoggingType.REQUEST_HEADERS,
|
||||
LoggingType.RESPONSE_HEADERS,
|
||||
};
|
||||
} else if (logLevel == LogLevel.INFO) {
|
||||
this.logs = new LoggingType[]{
|
||||
LoggingType.RESPONSE_HEADERS,
|
||||
};
|
||||
}
|
||||
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
// Deprecated properties with no real value to be kept, silently ignore
|
||||
/**
|
||||
* @deprecated
|
||||
*/
|
||||
@Schema(title = "The time allowed for a read connection to remain idle before closing it.")
|
||||
@Builder.Default
|
||||
@Deprecated
|
||||
private final Property<Duration> readIdleTimeout = Property.of(Duration.of(HttpClientConfiguration.DEFAULT_READ_IDLE_TIMEOUT_MINUTES, ChronoUnit.MINUTES));
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
*/
|
||||
@Schema(title = "The time an idle connection can remain in the client's connection pool before being closed.")
|
||||
@Builder.Default
|
||||
@Deprecated
|
||||
private final Property<Duration> connectionPoolIdleTimeout = Property.of(Duration.ofSeconds(HttpClientConfiguration.DEFAULT_CONNECTION_POOL_IDLE_TIMEOUT_SECONDS));
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
*/
|
||||
@Schema(title = "The maximum content length of the response.")
|
||||
@Builder.Default
|
||||
@Deprecated
|
||||
private final Property<Integer> maxContentLength = Property.of(HttpClientConfiguration.DEFAULT_MAX_CONTENT_LENGTH);
|
||||
}
|
||||
|
||||
@@ -4,11 +4,13 @@ import io.kestra.core.models.property.Property;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.jackson.Jacksonized;
|
||||
|
||||
import java.net.Proxy;
|
||||
|
||||
@Getter
|
||||
@Builder(toBuilder = true)
|
||||
@Jacksonized
|
||||
public class ProxyConfiguration {
|
||||
@Schema(title = "The type of proxy to use.")
|
||||
@Builder.Default
|
||||
|
||||
@@ -21,6 +21,7 @@ import io.kestra.core.models.tasks.FlowableTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.tasks.retrys.AbstractRetry;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.core.models.validations.ManualConstraintViolation;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.serializers.ListOrMapOfLabelDeserializer;
|
||||
@@ -176,6 +177,14 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
);
|
||||
}
|
||||
|
||||
public static String uid(Trigger trigger) {
|
||||
return IdUtils.fromParts(
|
||||
trigger.getTenantId(),
|
||||
trigger.getNamespace(),
|
||||
trigger.getFlowId()
|
||||
);
|
||||
}
|
||||
|
||||
public static String uidWithoutRevision(Execution execution) {
|
||||
return IdUtils.fromParts(
|
||||
execution.getTenantId(),
|
||||
@@ -278,6 +287,14 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
.orElse(null);
|
||||
}
|
||||
|
||||
public AbstractTrigger findTriggerByTriggerId(String triggerId) {
|
||||
return this.triggers
|
||||
.stream()
|
||||
.filter(trigger -> trigger.getId().equals(triggerId))
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated should not be used
|
||||
*/
|
||||
|
||||
@@ -22,6 +22,8 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
|
||||
/**
|
||||
* Define a plugin properties that will be rendered and converted to a target type at use time.
|
||||
*
|
||||
@@ -136,12 +138,31 @@ public class Property<T> {
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#asList(Class, Map)
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T, I> T asList(Property<T> property, RunContext runContext, Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
if (property.value == null) {
|
||||
String rendered = runContext.render(property.expression, variables);
|
||||
JavaType type = MAPPER.getTypeFactory().constructCollectionLikeType(List.class, itemClazz);
|
||||
try {
|
||||
property.value = MAPPER.readValue(rendered, type);
|
||||
String trimmedExpression = property.expression.trim();
|
||||
// We need to detect if the expression is already a list or if it's a pebble expression (for eg. referencing a variable containing a list).
|
||||
// Doing that allows us to, if it's an expression, first render then read it as a list.
|
||||
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
|
||||
property.value = MAPPER.readValue(runContext.render(property.expression, variables), type);
|
||||
}
|
||||
// Otherwise, if it's already a list, we read it as a list first then render it from run context which handle list rendering by rendering each item of the list
|
||||
else {
|
||||
List<?> asRawList = MAPPER.readValue(property.expression, List.class);
|
||||
property.value = (T) asRawList.stream()
|
||||
.map(throwFunction(item -> {
|
||||
if (item instanceof String str) {
|
||||
return MAPPER.convertValue(runContext.render(str, variables), itemClazz);
|
||||
} else if (item instanceof Map map) {
|
||||
return MAPPER.convertValue(runContext.render(map, variables), itemClazz);
|
||||
}
|
||||
return item;
|
||||
}))
|
||||
.toList();
|
||||
}
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalVariableEvaluationException(e);
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.core.models.tasks.runners;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.Slugify;
|
||||
@@ -93,6 +94,18 @@ public final class ScriptService {
|
||||
|
||||
}
|
||||
|
||||
public static List<String> replaceInternalStorage(
|
||||
RunContext runContext,
|
||||
Map<String, Object> additionalVars,
|
||||
Property<List<String>> commands,
|
||||
boolean replaceWithRelativePath
|
||||
) throws IOException, IllegalVariableEvaluationException {
|
||||
return commands == null ? Collections.emptyList() :
|
||||
runContext.render(commands).asList(String.class, additionalVars).stream()
|
||||
.map(throwFunction(c -> ScriptService.replaceInternalStorage(runContext, c, replaceWithRelativePath)))
|
||||
.toList();
|
||||
}
|
||||
|
||||
public static List<String> replaceInternalStorage(
|
||||
RunContext runContext,
|
||||
List<String> commands
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
package io.kestra.core.models.tasks.runners;
|
||||
|
||||
import io.kestra.core.models.property.Property;
|
||||
import lombok.With;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
@@ -19,7 +22,11 @@ public interface TaskCommands {
|
||||
|
||||
AbstractLogConsumer getLogConsumer();
|
||||
|
||||
List<String> getCommands();
|
||||
Property<List<String>> getInterpreter();
|
||||
|
||||
Property<List<String>> getBeforeCommands();
|
||||
|
||||
Property<List<String>> getCommands();
|
||||
|
||||
Map<String, Object> getAdditionalVars();
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.WorkerJobLifecycle;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.plugin.core.runner.Process;
|
||||
import jakarta.validation.constraints.NotBlank;
|
||||
@@ -16,11 +17,11 @@ import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.apache.commons.lang3.SystemUtils;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static io.kestra.core.utils.WindowsUtils.windowsToUnixPath;
|
||||
|
||||
|
||||
@@ -2,23 +2,26 @@ package io.kestra.core.models.tasks.runners;
|
||||
|
||||
import io.kestra.core.models.tasks.Output;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
@AllArgsConstructor
|
||||
@Getter
|
||||
@Builder
|
||||
@SuperBuilder
|
||||
@NoArgsConstructor
|
||||
public class TaskRunnerResult<T extends TaskRunnerDetailResult> implements Output {
|
||||
private int exitCode;
|
||||
|
||||
private AbstractLogConsumer logConsumer;
|
||||
|
||||
@Nullable
|
||||
private T details;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public TaskRunnerResult(int exitCode, AbstractLogConsumer logConsumer) {
|
||||
this.exitCode = exitCode;
|
||||
this.logConsumer = logConsumer;
|
||||
this.details = (T) TaskRunnerDetailResult.builder().build();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@ package io.kestra.core.models.triggers;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.validations.TimeWindowValidation;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.With;
|
||||
@@ -19,7 +18,6 @@ public class TimeWindow {
|
||||
title = "The type of the SLA",
|
||||
description = "The default SLA is a sliding window (`DURATION_WINDOW`) with a window of 24 hours."
|
||||
)
|
||||
@NotNull
|
||||
@Builder.Default
|
||||
@PluginProperty
|
||||
private TimeWindow.Type type = TimeWindow.Type.DURATION_WINDOW;
|
||||
|
||||
@@ -6,6 +6,7 @@ import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.plugin.core.trigger.Schedule;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
@@ -210,6 +211,32 @@ public class Trigger extends TriggerContext implements HasUID {
|
||||
.build();
|
||||
}
|
||||
|
||||
public Trigger resetExecution(Flow flow, Execution execution, ConditionContext conditionContext) {
|
||||
boolean disabled = this.getStopAfter() != null ? this.getStopAfter().contains(execution.getState().getCurrent()) : this.getDisabled();
|
||||
if (!disabled) {
|
||||
AbstractTrigger abstractTrigger = flow.findTriggerByTriggerId(this.getTriggerId());
|
||||
if (abstractTrigger == null) {
|
||||
throw new IllegalArgumentException("Unable to find trigger with id '" + this.getTriggerId() + "'");
|
||||
}
|
||||
// If trigger is a schedule and execution ended after the next execution date
|
||||
else if (abstractTrigger instanceof Schedule schedule &&
|
||||
execution.getState().getEndDate().get().isAfter(this.getNextExecutionDate().toInstant())
|
||||
) {
|
||||
RecoverMissedSchedules recoverMissedSchedules = Optional.ofNullable(schedule.getRecoverMissedSchedules())
|
||||
.orElseGet(() -> schedule.defaultRecoverMissedSchedules(conditionContext.getRunContext()));
|
||||
|
||||
ZonedDateTime previousDate = schedule.previousEvaluationDate(conditionContext);
|
||||
|
||||
if (recoverMissedSchedules.equals(RecoverMissedSchedules.LAST)) {
|
||||
return resetExecution(execution.getState().getCurrent(), previousDate);
|
||||
} else if (recoverMissedSchedules.equals(RecoverMissedSchedules.NONE)) {
|
||||
return resetExecution(execution.getState().getCurrent(), schedule.nextEvaluationDate(conditionContext, Optional.empty()));
|
||||
}
|
||||
}
|
||||
}
|
||||
return resetExecution(execution.getState().getCurrent());
|
||||
}
|
||||
|
||||
public Trigger resetExecution(State.Type executionEndState) {
|
||||
return resetExecution(executionEndState, this.getNextExecutionDate());
|
||||
}
|
||||
|
||||
@@ -11,6 +11,8 @@ import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static io.kestra.core.models.triggers.TimeWindow.Type.DURATION_WINDOW;
|
||||
|
||||
public interface MultipleConditionStorageInterface {
|
||||
Optional<MultipleConditionWindow> get(Flow flow, String conditionId);
|
||||
|
||||
@@ -20,7 +22,8 @@ public interface MultipleConditionStorageInterface {
|
||||
ZonedDateTime now = ZonedDateTime.now().withNano(0);
|
||||
TimeWindow timeWindow = multipleCondition.getTimeWindow() != null ? multipleCondition.getTimeWindow() : TimeWindow.builder().build();
|
||||
|
||||
var startAndEnd = switch (timeWindow.getType()) {
|
||||
TimeWindow.Type type = timeWindow.getType() != null ? timeWindow.getType() : DURATION_WINDOW;
|
||||
var startAndEnd = switch (type) {
|
||||
case DURATION_WINDOW -> {
|
||||
Duration window = timeWindow.getWindow() == null ? Duration.ofDays(1) : timeWindow.getWindow();
|
||||
if (window.toDays() > 0) {
|
||||
|
||||
@@ -43,6 +43,7 @@ public class PluginClassLoader extends URLClassLoader {
|
||||
+ "|com.fasterxml.jackson.dataformat.xml"
|
||||
+ "|org.reactivestreams"
|
||||
+ "|dev.failsafe"
|
||||
+ "|reactor"
|
||||
+ ")\\..*$");
|
||||
|
||||
private final ClassLoader parent;
|
||||
|
||||
@@ -154,7 +154,7 @@ public class RegisteredPlugin {
|
||||
result.put("secrets", Arrays.asList(this.getSecrets().toArray(Class[]::new)));
|
||||
result.put("task-runners", Arrays.asList(this.getTaskRunners().toArray(Class[]::new)));
|
||||
result.put("apps", Arrays.asList(this.getApps().toArray(Class[]::new)));
|
||||
result.put("appBlocks", Arrays.asList(this.getAppBlocks().toArray(Class[]::new)));
|
||||
result.put("app-blocks", Arrays.asList(this.getAppBlocks().toArray(Class[]::new)));
|
||||
result.put("charts", Arrays.asList(this.getCharts().toArray(Class[]::new)));
|
||||
result.put("data-filters", Arrays.asList(this.getDataFilters().toArray(Class[]::new)));
|
||||
result.put("log-exporters", Arrays.asList(this.getLogExporters().toArray(Class[]::new)));
|
||||
@@ -173,15 +173,15 @@ public class RegisteredPlugin {
|
||||
var pluginSubGroup = clazz.getPackage().getDeclaredAnnotation(PluginSubGroup.class);
|
||||
|
||||
// some plugins declare subgroup for main plugins
|
||||
if (clazz.getPackageName().length() == this.group().length()) {
|
||||
if (this.group() == null || clazz.getPackageName().length() == this.group().length()) {
|
||||
pluginSubGroup = null;
|
||||
}
|
||||
|
||||
if (pluginSubGroup != null && clazz.getPackageName().startsWith(this.group()) ) {
|
||||
return this.group() + "." + clazz.getPackageName().substring(this.group().length() + 1);
|
||||
} else {
|
||||
if (pluginSubGroup == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return clazz.getPackageName();
|
||||
})
|
||||
.filter(Objects::nonNull)
|
||||
.collect(Collectors.toSet());
|
||||
@@ -328,6 +328,18 @@ public class RegisteredPlugin {
|
||||
b.append("] ");
|
||||
}
|
||||
|
||||
if (!this.getApps().isEmpty()) {
|
||||
b.append("[Apps: ");
|
||||
b.append(this.getApps().stream().map(Class::getName).collect(Collectors.joining(", ")));
|
||||
b.append("] ");
|
||||
}
|
||||
|
||||
if (!this.getAppBlocks().isEmpty()) {
|
||||
b.append("[AppBlocks: ");
|
||||
b.append(this.getAppBlocks().stream().map(Class::getName).collect(Collectors.joining(", ")));
|
||||
b.append("] ");
|
||||
}
|
||||
|
||||
if (!this.getCharts().isEmpty()) {
|
||||
b.append("[Charts: ");
|
||||
b.append(this.getCharts().stream().map(Class::getName).collect(Collectors.joining(", ")));
|
||||
|
||||
@@ -9,6 +9,7 @@ import jakarta.annotation.Nullable;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
@@ -19,6 +20,8 @@ public interface DashboardRepositoryInterface {
|
||||
|
||||
ArrayListTotal<Dashboard> list(Pageable pageable, String tenantId, String query);
|
||||
|
||||
List<Dashboard> findAll(String tenantId);
|
||||
|
||||
default Dashboard save(Dashboard dashboard, String source) {
|
||||
return this.save(null, dashboard, source);
|
||||
}
|
||||
|
||||
@@ -103,6 +103,8 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
|
||||
boolean allowDeleted
|
||||
);
|
||||
|
||||
Flux<Execution> findAllAsync(@Nullable String tenantId);
|
||||
|
||||
ArrayListTotal<TaskRun> findTaskRun(
|
||||
Pageable pageable,
|
||||
@Nullable String query,
|
||||
|
||||
@@ -93,6 +93,8 @@ public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry
|
||||
ZonedDateTime startDate
|
||||
);
|
||||
|
||||
Flux<LogEntry> findAllAsync(@Nullable String tenantId);
|
||||
|
||||
List<LogStatistics> statistics(
|
||||
@Nullable String query,
|
||||
@Nullable String tenantId,
|
||||
|
||||
@@ -6,6 +6,7 @@ import io.kestra.core.models.executions.metrics.MetricAggregations;
|
||||
import io.kestra.plugin.core.dashboard.data.Metrics;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.List;
|
||||
@@ -28,6 +29,8 @@ public interface MetricRepositoryInterface extends SaveRepositoryInterface<Metri
|
||||
|
||||
Integer purge(Execution execution);
|
||||
|
||||
Flux<MetricEntry> findAllAsync(@Nullable String tenantId);
|
||||
|
||||
default Function<String, String> sortMapping() throws IllegalArgumentException {
|
||||
return s -> s;
|
||||
}
|
||||
|
||||
@@ -475,7 +475,9 @@ public class DefaultRunContext extends RunContext {
|
||||
logger().warn("Unable to cleanup worker task", ex);
|
||||
}
|
||||
|
||||
logger.resetMDC();
|
||||
if (logger != null){
|
||||
logger.resetMDC();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -15,6 +15,7 @@ import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||
import io.kestra.core.services.ExecutionService;
|
||||
import io.kestra.core.storages.Storage;
|
||||
import io.kestra.core.trace.TracerFactory;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.MapUtils;
|
||||
import io.kestra.core.trace.propagation.ExecutionTextMapSetter;
|
||||
import io.opentelemetry.api.OpenTelemetry;
|
||||
@@ -153,7 +154,7 @@ public final class ExecutableUtils {
|
||||
throw new IllegalStateException("Cannot execute an invalid flow: " + fwe.getException());
|
||||
}
|
||||
|
||||
List<Label> newLabels = inheritLabels ? new ArrayList<>(currentExecution.getLabels()) : new ArrayList<>(systemLabels(currentExecution));
|
||||
List<Label> newLabels = inheritLabels ? new ArrayList<>(filterLabels(currentExecution.getLabels(), flow)) : new ArrayList<>(systemLabels(currentExecution));
|
||||
if (labels != null) {
|
||||
labels.forEach(throwConsumer(label -> newLabels.add(new Label(runContext.render(label.key()), runContext.render(label.value())))));
|
||||
}
|
||||
@@ -201,6 +202,16 @@ public final class ExecutableUtils {
|
||||
}));
|
||||
}
|
||||
|
||||
private static List<Label> filterLabels(List<Label> labels, Flow flow) {
|
||||
if (ListUtils.isEmpty(flow.getLabels())) {
|
||||
return labels;
|
||||
}
|
||||
|
||||
return labels.stream()
|
||||
.filter(label -> flow.getLabels().stream().noneMatch(flowLabel -> flowLabel.key().equals(label.key())))
|
||||
.toList();
|
||||
}
|
||||
|
||||
private static List<Label> systemLabels(Execution execution) {
|
||||
return Streams.of(execution.getLabels())
|
||||
.filter(label -> label.key().startsWith(Label.SYSTEM_PREFIX))
|
||||
@@ -302,7 +313,10 @@ public final class ExecutableUtils {
|
||||
public static boolean isSubflow(Execution execution) {
|
||||
return execution.getTrigger() != null && (
|
||||
"io.kestra.plugin.core.flow.Subflow".equals(execution.getTrigger().getType()) ||
|
||||
"io.kestra.plugin.core.flow.ForEachItem$ForEachItemExecutable".equals(execution.getTrigger().getType())
|
||||
"io.kestra.plugin.core.flow.ForEachItem$ForEachItemExecutable".equals(execution.getTrigger().getType()) ||
|
||||
"io.kestra.core.tasks.flows.Subflow".equals(execution.getTrigger().getType()) ||
|
||||
"io.kestra.core.tasks.flows.Flow".equals(execution.getTrigger().getType()) ||
|
||||
"io.kestra.core.tasks.flows.ForEachItem$ForEachItemExecutable".equals(execution.getTrigger().getType())
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -69,7 +69,6 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
public class FlowInputOutput {
|
||||
private static final Pattern URI_PATTERN = Pattern.compile("^[a-z]+:\\/\\/(?:www\\.)?[-a-zA-Z0-9@:%._\\+~#=]{1,256}\\.[a-zA-Z0-9()]{1,6}\\b(?:[-a-zA-Z0-9()@:%_\\+.~#?&\\/=]*)$");
|
||||
private static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml();
|
||||
private static final ObjectMapper JSON_MAPPER = JacksonMapper.ofJson();
|
||||
|
||||
private final StorageInterface storageInterface;
|
||||
private final Optional<String> secretKey;
|
||||
@@ -95,11 +94,12 @@ public class FlowInputOutput {
|
||||
* @return The list of {@link InputAndValue}.
|
||||
*/
|
||||
public Mono<List<InputAndValue>> validateExecutionInputs(final List<Input<?>> inputs,
|
||||
final Execution execution,
|
||||
final Publisher<CompletedPart> data) {
|
||||
final Flow flow,
|
||||
final Execution execution,
|
||||
final Publisher<CompletedPart> data) {
|
||||
if (ListUtils.isEmpty(inputs)) return Mono.just(Collections.emptyList());
|
||||
|
||||
return readData(inputs, execution, data, false).map(inputData -> resolveInputs(inputs, execution, inputData));
|
||||
return readData(inputs, execution, data, false).map(inputData -> resolveInputs(inputs, flow, execution, inputData));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -111,9 +111,9 @@ public class FlowInputOutput {
|
||||
* @return The Map of typed inputs.
|
||||
*/
|
||||
public Mono<Map<String, Object>> readExecutionInputs(final Flow flow,
|
||||
final Execution execution,
|
||||
final Publisher<CompletedPart> data) {
|
||||
return this.readExecutionInputs(flow.getInputs(), execution, data);
|
||||
final Execution execution,
|
||||
final Publisher<CompletedPart> data) {
|
||||
return this.readExecutionInputs(flow.getInputs(), flow, execution, data);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -125,9 +125,10 @@ public class FlowInputOutput {
|
||||
* @return The Map of typed inputs.
|
||||
*/
|
||||
public Mono<Map<String, Object>> readExecutionInputs(final List<Input<?>> inputs,
|
||||
final Flow flow,
|
||||
final Execution execution,
|
||||
final Publisher<CompletedPart> data) {
|
||||
return readData(inputs, execution, data, true).map(inputData -> this.readExecutionInputs(inputs, execution, inputData));
|
||||
return readData(inputs, execution, data, true).map(inputData -> this.readExecutionInputs(inputs, flow, execution, inputData));
|
||||
}
|
||||
|
||||
private Mono<Map<String, Object>> readData(List<Input<?>> inputs, Execution execution, Publisher<CompletedPart> data, boolean uploadFiles) {
|
||||
@@ -192,15 +193,16 @@ public class FlowInputOutput {
|
||||
final Execution execution,
|
||||
final Map<String, ?> data
|
||||
) {
|
||||
return readExecutionInputs(flow.getInputs(), execution, data);
|
||||
return readExecutionInputs(flow.getInputs(), flow, execution, data);
|
||||
}
|
||||
|
||||
private Map<String, Object> readExecutionInputs(
|
||||
final List<Input<?>> inputs,
|
||||
final Flow flow,
|
||||
final Execution execution,
|
||||
final Map<String, ?> data
|
||||
) {
|
||||
Map<String, Object> resolved = this.resolveInputs(inputs, execution, data)
|
||||
Map<String, Object> resolved = this.resolveInputs(inputs, flow, execution, data)
|
||||
.stream()
|
||||
.filter(InputAndValue::enabled)
|
||||
.map(it -> {
|
||||
@@ -225,6 +227,7 @@ public class FlowInputOutput {
|
||||
@VisibleForTesting
|
||||
public List<InputAndValue> resolveInputs(
|
||||
final List<Input<?>> inputs,
|
||||
final Flow flow,
|
||||
final Execution execution,
|
||||
final Map<String, ?> data
|
||||
) {
|
||||
@@ -240,7 +243,7 @@ public class FlowInputOutput {
|
||||
})
|
||||
.collect(Collectors.toMap(it -> it.get().input().getId(), Function.identity(), (o1, o2) -> o1, LinkedHashMap::new)));
|
||||
|
||||
resolvableInputMap.values().forEach(input -> resolveInputValue(input, execution, resolvableInputMap));
|
||||
resolvableInputMap.values().forEach(input -> resolveInputValue(input, flow, execution, resolvableInputMap));
|
||||
|
||||
return resolvableInputMap.values().stream().map(ResolvableInput::get).toList();
|
||||
}
|
||||
@@ -248,6 +251,7 @@ public class FlowInputOutput {
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
private InputAndValue resolveInputValue(
|
||||
final @NotNull ResolvableInput resolvable,
|
||||
final Flow flow,
|
||||
final @NotNull Execution execution,
|
||||
final @NotNull Map<String, ResolvableInput> inputs) {
|
||||
|
||||
@@ -258,8 +262,8 @@ public class FlowInputOutput {
|
||||
|
||||
try {
|
||||
// resolve all input dependencies and check whether input is enabled
|
||||
final Map<String, InputAndValue> dependencies = resolveAllDependentInputs(input, execution, inputs);
|
||||
final RunContext runContext = buildRunContextForExecutionAndInputs(execution, dependencies);
|
||||
final Map<String, InputAndValue> dependencies = resolveAllDependentInputs(input, flow, execution, inputs);
|
||||
final RunContext runContext = buildRunContextForExecutionAndInputs(flow, execution, dependencies);
|
||||
|
||||
boolean isInputEnabled = dependencies.isEmpty() || dependencies.values().stream().allMatch(InputAndValue::enabled);
|
||||
|
||||
@@ -325,15 +329,15 @@ public class FlowInputOutput {
|
||||
return resolvable.get();
|
||||
}
|
||||
|
||||
private RunContext buildRunContextForExecutionAndInputs(Execution execution, Map<String, InputAndValue> dependencies) {
|
||||
private RunContext buildRunContextForExecutionAndInputs(final Flow flow, final Execution execution, Map<String, InputAndValue> dependencies) {
|
||||
Map<String, Object> flattenInputs = MapUtils.flattenToNestedMap(dependencies.entrySet()
|
||||
.stream()
|
||||
.collect(HashMap::new, (m, v) -> m.put(v.getKey(), v.getValue().value()), HashMap::putAll)
|
||||
);
|
||||
return runContextFactory.of(null, execution, vars -> vars.withInputs(flattenInputs));
|
||||
return runContextFactory.of(flow, execution, vars -> vars.withInputs(flattenInputs));
|
||||
}
|
||||
|
||||
private Map<String, InputAndValue> resolveAllDependentInputs(final Input<?> input, final Execution execution, final Map<String, ResolvableInput> inputs) {
|
||||
private Map<String, InputAndValue> resolveAllDependentInputs(final Input<?> input, final Flow flow, final Execution execution, final Map<String, ResolvableInput> inputs) {
|
||||
return Optional.ofNullable(input.getDependsOn())
|
||||
.map(DependsOn::inputs)
|
||||
.stream()
|
||||
@@ -341,7 +345,7 @@ public class FlowInputOutput {
|
||||
.filter(id -> !id.equals(input.getId()))
|
||||
.map(inputs::get)
|
||||
.filter(Objects::nonNull) // input may declare unknown or non-necessary dependencies. Let's ignore.
|
||||
.map(it -> resolveInputValue(it, execution, inputs))
|
||||
.map(it -> resolveInputValue(it, flow, execution, inputs))
|
||||
.collect(Collectors.toMap(it -> it.input().getId(), Function.identity()));
|
||||
}
|
||||
|
||||
@@ -401,34 +405,34 @@ public class FlowInputOutput {
|
||||
private Object parseType(Execution execution, Type type, String id, Type elementType, Object current) throws Exception {
|
||||
try {
|
||||
return switch (type) {
|
||||
case SELECT, ENUM, STRING, EMAIL -> current;
|
||||
case SELECT, ENUM, STRING, EMAIL -> current.toString();
|
||||
case SECRET -> {
|
||||
if (secretKey.isEmpty()) {
|
||||
throw new Exception("Unable to use a `SECRET` input/output as encryption is not configured");
|
||||
}
|
||||
yield EncryptionService.encrypt(secretKey.get(), (String) current);
|
||||
yield EncryptionService.encrypt(secretKey.get(), current.toString());
|
||||
}
|
||||
case INT -> current instanceof Integer ? current : Integer.valueOf((String) current);
|
||||
case INT -> current instanceof Integer ? current : Integer.valueOf(current.toString());
|
||||
// Assuming that after the render we must have a double/int, so we can safely use its toString representation
|
||||
case FLOAT -> current instanceof Float ? current : Float.valueOf(current.toString());
|
||||
case BOOLEAN -> current instanceof Boolean ? current : Boolean.valueOf((String) current);
|
||||
case DATETIME -> current instanceof Instant ? current : Instant.parse(((String) current));
|
||||
case DATE -> current instanceof LocalDate ? current : LocalDate.parse(((String) current));
|
||||
case TIME -> current instanceof LocalTime ? current : LocalTime.parse(((String) current));
|
||||
case DURATION -> current instanceof Duration ? current : Duration.parse(((String) current));
|
||||
case BOOLEAN -> current instanceof Boolean ? current : Boolean.valueOf(current.toString());
|
||||
case DATETIME -> current instanceof Instant ? current : Instant.parse(current.toString());
|
||||
case DATE -> current instanceof LocalDate ? current : LocalDate.parse(current.toString());
|
||||
case TIME -> current instanceof LocalTime ? current : LocalTime.parse(current.toString());
|
||||
case DURATION -> current instanceof Duration ? current : Duration.parse(current.toString());
|
||||
case FILE -> {
|
||||
URI uri = URI.create(((String) current).replace(File.separator, "/"));
|
||||
URI uri = URI.create(current.toString().replace(File.separator, "/"));
|
||||
|
||||
if (uri.getScheme() != null && uri.getScheme().equals("kestra")) {
|
||||
yield uri;
|
||||
} else {
|
||||
yield storageInterface.from(execution, id, new File(((String) current)));
|
||||
yield storageInterface.from(execution, id, new File(current.toString()));
|
||||
}
|
||||
}
|
||||
case JSON -> JacksonMapper.toObject(((String) current));
|
||||
case YAML -> YAML_MAPPER.readValue((String) current, JacksonMapper.OBJECT_TYPE_REFERENCE);
|
||||
case JSON -> JacksonMapper.toObject(current.toString());
|
||||
case YAML -> YAML_MAPPER.readValue(current.toString(), JacksonMapper.OBJECT_TYPE_REFERENCE);
|
||||
case URI -> {
|
||||
Matcher matcher = URI_PATTERN.matcher((String) current);
|
||||
Matcher matcher = URI_PATTERN.matcher(current.toString());
|
||||
if (matcher.matches()) {
|
||||
yield current;
|
||||
} else {
|
||||
|
||||
@@ -838,6 +838,8 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
);
|
||||
} catch(Exception e) {
|
||||
// should only occur if it fails in the tracing code which should be unexpected
|
||||
// we add the exception to have some log in that case
|
||||
workerJobCallable.exception = e;
|
||||
return State.Type.FAILED;
|
||||
} finally {
|
||||
synchronized (this) {
|
||||
@@ -1016,6 +1018,17 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
|
||||
@VisibleForTesting
|
||||
public void shutdown() {
|
||||
// initiate shutdown
|
||||
shutdown.compareAndSet(false, true);
|
||||
|
||||
try {
|
||||
// close the WorkerJob queue to stop receiving new JobTask execution.
|
||||
workerJobQueue.close();
|
||||
} catch (IOException e) {
|
||||
log.error("Failed to close the WorkerJobQueue");
|
||||
}
|
||||
|
||||
// close all queues and shutdown now
|
||||
this.receiveCancellations.forEach(Runnable::run);
|
||||
this.executorService.shutdownNow();
|
||||
}
|
||||
|
||||
@@ -0,0 +1,107 @@
|
||||
package io.kestra.core.runners.pebble.functions;
|
||||
|
||||
import io.kestra.core.services.FlowService;
|
||||
import io.kestra.core.utils.Slugify;
|
||||
import io.pebbletemplates.pebble.error.PebbleException;
|
||||
import io.pebbletemplates.pebble.extension.Function;
|
||||
import io.pebbletemplates.pebble.template.EvaluationContext;
|
||||
import io.pebbletemplates.pebble.template.PebbleTemplate;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
abstract class AbstractFileFunction implements Function {
|
||||
static final String KESTRA_SCHEME = "kestra:///";
|
||||
static final String TRIGGER = "trigger";
|
||||
static final String NAMESPACE = "namespace";
|
||||
static final String TENANT_ID = "tenantId";
|
||||
static final String ID = "id";
|
||||
|
||||
private static final Pattern EXECUTION_FILE = Pattern.compile(".*/.*/executions/.*/tasks/.*/.*");
|
||||
|
||||
@Inject
|
||||
private FlowService flowService;
|
||||
|
||||
URI getUriFromThePath(Object path, int lineNumber, PebbleTemplate self) {
|
||||
if (path instanceof URI u) {
|
||||
return u;
|
||||
} else if (path instanceof String str && str.startsWith(KESTRA_SCHEME)) {
|
||||
return URI.create(str);
|
||||
} else {
|
||||
throw new PebbleException(null, "Unable to create the URI from the path " + path, lineNumber, self.getName());
|
||||
}
|
||||
}
|
||||
|
||||
boolean isFileUriValid(String namespace, String flowId, String executionId, URI path) {
|
||||
// Internal storage URI should be: kestra:///$namespace/$flowId/executions/$executionId/tasks/$taskName/$taskRunId/$random.ion or kestra:///$namespace/$flowId/executions/$executionId/trigger/$triggerName/$random.ion
|
||||
// We check that the file is for the given flow execution
|
||||
if (namespace == null || flowId == null || executionId == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
String authorizedBasePath = KESTRA_SCHEME + namespace.replace(".", "/") + "/" + Slugify.of(flowId) + "/executions/" + executionId + "/";
|
||||
return path.toString().startsWith(authorizedBasePath);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
String checkAllowedFileAndReturnNamespace(EvaluationContext context, URI path) {
|
||||
Map<String, String> flow = (Map<String, String>) context.getVariable("flow");
|
||||
Map<String, String> execution = (Map<String, String>) context.getVariable("execution");
|
||||
|
||||
// check if the file is from the current execution, the parent execution or an allowed namespaces
|
||||
boolean isFileFromCurrentExecution = isFileUriValid(flow.get(NAMESPACE), flow.get(ID), execution.get(ID), path);
|
||||
if (isFileFromCurrentExecution) {
|
||||
return flow.get(NAMESPACE);
|
||||
} else {
|
||||
if (isFileFromParentExecution(context, path)) {
|
||||
Map<String, String> trigger = (Map<String, String>) context.getVariable(TRIGGER);
|
||||
return trigger.get(NAMESPACE);
|
||||
}
|
||||
else {
|
||||
return checkIfFileFromAllowedNamespaceAndReturnIt(context, path, flow.get(TENANT_ID), flow.get(NAMESPACE));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private boolean isFileFromParentExecution(EvaluationContext context, URI path) {
|
||||
if (context.getVariable(TRIGGER) != null) {
|
||||
// if there is a trigger of type execution, we also allow accessing a file from the parent execution
|
||||
Map<String, String> trigger = (Map<String, String>) context.getVariable(TRIGGER);
|
||||
|
||||
if (!isFileUriValid(trigger.get(NAMESPACE), trigger.get("flowId"), trigger.get("executionId"), path)) {
|
||||
throw new IllegalArgumentException("Unable to read the file '" + path + "' as it didn't belong to the parent execution");
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private String checkIfFileFromAllowedNamespaceAndReturnIt(EvaluationContext context, URI path, String tenantId, String fromNamespace) {
|
||||
// Extract namespace from the path, it should be of the form: kestra:///({tenantId}/){namespace}/{flowId}/executions/{executionId}/tasks/{taskId}/{taskRunId}/{fileName}'
|
||||
// To extract the namespace, we must do it step by step as tenantId, namespace and taskId can contain the words 'executions' and 'tasks'
|
||||
String namespace = path.toString().substring(KESTRA_SCHEME.length());
|
||||
if (!EXECUTION_FILE.matcher(namespace).matches()) {
|
||||
throw new IllegalArgumentException("Unable to read the file '" + path + "' as it is not an execution file");
|
||||
}
|
||||
|
||||
// 1. remove the tenantId if existing
|
||||
if (tenantId != null) {
|
||||
namespace = namespace.substring(tenantId.length() + 1);
|
||||
}
|
||||
// 2. remove everything after tasks
|
||||
namespace = namespace.substring(0, namespace.lastIndexOf("/tasks/"));
|
||||
// 3. remove everything after executions
|
||||
namespace = namespace.substring(0, namespace.lastIndexOf("/executions/"));
|
||||
// 4. remove the flowId
|
||||
namespace = namespace.substring(0, namespace.lastIndexOf('/'));
|
||||
// 5. replace '/' with '.'
|
||||
namespace = namespace.replace("/", ".");
|
||||
|
||||
flowService.checkAllowedNamespace(tenantId, namespace, tenantId, fromNamespace);
|
||||
|
||||
return namespace;
|
||||
}
|
||||
}
|
||||
@@ -2,13 +2,11 @@ package io.kestra.core.runners.pebble.functions;
|
||||
|
||||
import io.kestra.core.storages.FileAttributes;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.Slugify;
|
||||
import io.pebbletemplates.pebble.error.PebbleException;
|
||||
import io.pebbletemplates.pebble.template.EvaluationContext;
|
||||
import io.pebbletemplates.pebble.template.PebbleTemplate;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import io.pebbletemplates.pebble.extension.Function;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
@@ -16,12 +14,8 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Singleton
|
||||
public class FileSizeFunction implements Function {
|
||||
public class FileSizeFunction extends AbstractFileFunction {
|
||||
private static final String ERROR_MESSAGE = "The 'fileSize' function expects an argument 'path' that is a path to the internal storage URI.";
|
||||
private static final String KESTRA_SCHEME = "kestra:///";
|
||||
private static final String TRIGGER = "trigger";
|
||||
private static final String NAMESPACE = "namespace";
|
||||
private static final String ID = "id";
|
||||
|
||||
@Inject
|
||||
private StorageInterface storageInterface;
|
||||
@@ -49,53 +43,13 @@ public class FileSizeFunction implements Function {
|
||||
|
||||
}
|
||||
|
||||
private URI getUriFromThePath(Object path, int lineNumber, PebbleTemplate self) {
|
||||
if (path instanceof URI u) {
|
||||
return u;
|
||||
} else if (path instanceof String str && str.startsWith(KESTRA_SCHEME)) {
|
||||
return URI.create(str);
|
||||
} else {
|
||||
throw new PebbleException(null, "Unable to create the URI from the path " + path, lineNumber, self.getName());
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private long getFileSizeFromInternalStorageUri(EvaluationContext context, URI path) throws IOException {
|
||||
// check if the file is from the current execution, the parent execution, or an allowed namespace
|
||||
String namespace = checkAllowedFileAndReturnNamespace(context, path);
|
||||
|
||||
Map<String, String> flow = (Map<String, String>) context.getVariable("flow");
|
||||
Map<String, String> execution = (Map<String, String>) context.getVariable("execution");
|
||||
|
||||
boolean isFileFromCurrentExecution = isFileUriValid(flow.get(NAMESPACE), flow.get(ID), execution.get(ID), path);
|
||||
|
||||
if (!isFileFromCurrentExecution) {
|
||||
checkIfFileFromParentExecution(context, path);
|
||||
}
|
||||
|
||||
FileAttributes fileAttributes = storageInterface.getAttributes(flow.get("tenantId"), flow.get("namespace"), path);
|
||||
FileAttributes fileAttributes = storageInterface.getAttributes(flow.get(TENANT_ID), namespace, path);
|
||||
return fileAttributes.getSize();
|
||||
}
|
||||
|
||||
private void checkIfFileFromParentExecution(EvaluationContext context, URI path) {
|
||||
if (context.getVariable(TRIGGER) != null) {
|
||||
// if there is a trigger of type execution, we also allow accessing a file from the parent execution
|
||||
Map<String, String> trigger = (Map<String, String>) context.getVariable(TRIGGER);
|
||||
|
||||
if (!isFileUriValid(trigger.get(NAMESPACE), trigger.get("flowId"), trigger.get("executionId"), path)) {
|
||||
throw new IllegalArgumentException("Unable to read the file '" + path + "' as it didn't belong to the current execution");
|
||||
}
|
||||
}
|
||||
else {
|
||||
throw new IllegalArgumentException("Unable to read the file '" + path + "' as it didn't belong to the current execution");
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isFileUriValid(String namespace, String flowId, String executionId, URI path) {
|
||||
// Internal storage URI should be: kestra:///$namespace/$flowId/executions/$executionId/tasks/$taskName/$taskRunId/$random.ion or kestra:///$namespace/$flowId/executions/$executionId/trigger/$triggerName/$random.ion
|
||||
// We check that the file is for the given flow execution
|
||||
if (namespace == null || flowId == null || executionId == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
String authorizedBasePath = KESTRA_SCHEME + namespace.replace(".", "/") + "/" + Slugify.of(flowId) + "/executions/" + executionId + "/";
|
||||
return path.toString().startsWith(authorizedBasePath);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,10 +2,7 @@ package io.kestra.core.runners.pebble.functions;
|
||||
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.Slugify;
|
||||
import io.micronaut.context.annotation.Value;
|
||||
import io.pebbletemplates.pebble.error.PebbleException;
|
||||
import io.pebbletemplates.pebble.extension.Function;
|
||||
import io.pebbletemplates.pebble.template.EvaluationContext;
|
||||
import io.pebbletemplates.pebble.template.PebbleTemplate;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -19,15 +16,15 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Singleton
|
||||
public class ReadFileFunction implements Function {
|
||||
public class ReadFileFunction extends AbstractFileFunction {
|
||||
private static final String ERROR_MESSAGE = "The 'read' function expects an argument 'path' that is a path to a namespace file or an internal storage URI.";
|
||||
private static final String KESTRA_SCHEME = "kestra:///";
|
||||
|
||||
@Inject
|
||||
private StorageInterface storageInterface;
|
||||
|
||||
@Value("${kestra.server-type:}") // default to empty as tests didn't set this property
|
||||
private String serverType;
|
||||
// @Value("${kestra.server-type:}") // default to empty as tests didn't set this property
|
||||
// private String serverType;
|
||||
|
||||
@Override
|
||||
public List<String> getArgumentNames() {
|
||||
@@ -70,45 +67,20 @@ public class ReadFileFunction implements Function {
|
||||
@SuppressWarnings("unchecked")
|
||||
private String readFromNamespaceFile(EvaluationContext context, String path) throws IOException {
|
||||
Map<String, String> flow = (Map<String, String>) context.getVariable("flow");
|
||||
URI namespaceFile = URI.create(StorageContext.namespaceFilePrefix(flow.get("namespace")) + "/" + path);
|
||||
try (InputStream inputStream = storageInterface.get(flow.get("tenantId"), flow.get("namespace"), namespaceFile)) {
|
||||
URI namespaceFile = URI.create(StorageContext.namespaceFilePrefix(flow.get(NAMESPACE)) + "/" + path);
|
||||
try (InputStream inputStream = storageInterface.get(flow.get(TENANT_ID), flow.get(NAMESPACE), namespaceFile)) {
|
||||
return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private String readFromInternalStorageUri(EvaluationContext context, URI path) throws IOException {
|
||||
// check if the file is from the current execution, the parent execution, or an allowed namespace
|
||||
String namespace = checkAllowedFileAndReturnNamespace(context, path);
|
||||
|
||||
Map<String, String> flow = (Map<String, String>) context.getVariable("flow");
|
||||
Map<String, String> execution = (Map<String, String>) context.getVariable("execution");
|
||||
|
||||
// check if the file is from the current execution
|
||||
if (!validateFileUri(flow.get("namespace"), flow.get("id"), execution.get("id"), path)) {
|
||||
// if not, it can be from the parent execution, so we check if there is a trigger of type execution
|
||||
if (context.getVariable("trigger") != null) {
|
||||
// if there is a trigger of type execution, we also allow accessing a file from the parent execution
|
||||
Map<String, String> trigger = (Map<String, String>) context.getVariable("trigger");
|
||||
if (!validateFileUri(trigger.get("namespace"), trigger.get("flowId"), trigger.get("executionId"), path)) {
|
||||
throw new IllegalArgumentException("Unable to read the file '" + path + "' as it didn't belong to the current execution");
|
||||
}
|
||||
}
|
||||
else {
|
||||
throw new IllegalArgumentException("Unable to read the file '" + path + "' as it didn't belong to the current execution");
|
||||
}
|
||||
}
|
||||
|
||||
try (InputStream inputStream = storageInterface.get(flow.get("tenantId"), flow.get("namespace"), path)) {
|
||||
try (InputStream inputStream = storageInterface.get(flow.get(TENANT_ID), namespace, path)) {
|
||||
return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean validateFileUri(String namespace, String flowId, String executionId, URI path) {
|
||||
// Internal storage URI should be: kestra:///$namespace/$flowId/executions/$executionId/tasks/$taskName/$taskRunId/$random.ion or kestra:///$namespace/$flowId/executions/$executionId/trigger/$triggerName/$random.ion
|
||||
// We check that the file is for the given flow execution
|
||||
if (namespace == null || flowId == null || executionId == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
String authorizedBasePath = KESTRA_SCHEME + namespace.replace(".", "/") + "/" + Slugify.of(flowId) + "/executions/" + executionId + "/";
|
||||
return path.toString().startsWith(authorizedBasePath);
|
||||
}
|
||||
}
|
||||
@@ -30,7 +30,7 @@ import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.Either;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.models.triggers.RecoverMissedSchedules;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.event.ApplicationEventPublisher;
|
||||
import io.micronaut.inject.qualifiers.Qualifiers;
|
||||
@@ -70,7 +70,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
private final QueueInterface<WorkerJob> workerTaskQueue;
|
||||
private final WorkerTriggerResultQueueInterface workerTriggerResultQueue;
|
||||
private final QueueInterface<ExecutionKilled> executionKilledQueue;
|
||||
@SuppressWarnings("rawtypes") private final Optional<QueueInterface> clusterEventQueue;
|
||||
@SuppressWarnings("rawtypes")
|
||||
private final Optional<QueueInterface> clusterEventQueue;
|
||||
protected final FlowListenersInterface flowListeners;
|
||||
private final RunContextFactory runContextFactory;
|
||||
private final RunContextInitializer runContextInitializer;
|
||||
@@ -268,6 +269,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
|
||||
flows
|
||||
.stream()
|
||||
.map(flow -> pluginDefaultService.injectDefaults(flow, log))
|
||||
.filter(Objects::nonNull)
|
||||
.filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty())
|
||||
.flatMap(flow -> flow.getTriggers().stream().filter(trigger -> trigger instanceof WorkerTriggerInterface).map(trigger -> new FlowAndTrigger(flow, trigger)))
|
||||
.forEach(flowAndTrigger -> {
|
||||
@@ -408,8 +411,20 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
private List<FlowWithTriggers> computeSchedulable(List<FlowWithSource> flows, List<Trigger> triggerContextsToEvaluate, ScheduleContextInterface scheduleContext) {
|
||||
List<String> flowToKeep = triggerContextsToEvaluate.stream().map(Trigger::getFlowId).toList();
|
||||
|
||||
triggerContextsToEvaluate.stream()
|
||||
.filter(trigger -> !flows.stream().map(FlowWithSource::uidWithoutRevision).toList().contains(Flow.uid(trigger)))
|
||||
.forEach(trigger -> {
|
||||
try {
|
||||
this.triggerState.delete(trigger);
|
||||
} catch (QueueException e) {
|
||||
log.error("Unable to delete the trigger: {}.{}.{}", trigger.getNamespace(), trigger.getFlowId(), trigger.getTriggerId(), e);
|
||||
}
|
||||
});
|
||||
|
||||
return flows
|
||||
.stream()
|
||||
.map(flow -> pluginDefaultService.injectDefaults(flow, log))
|
||||
.filter(Objects::nonNull)
|
||||
.filter(flow -> flowToKeep.contains(flow.getId()))
|
||||
.filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty())
|
||||
.filter(flow -> !flow.isDisabled() && !(flow instanceof FlowWithException))
|
||||
@@ -438,7 +453,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
logError(conditionContext, flow, abstractTrigger, e);
|
||||
return null;
|
||||
}
|
||||
this.triggerState.save(triggerContext, scheduleContext);
|
||||
this.triggerState.save(triggerContext, scheduleContext, "/kestra/services/scheduler/compute-schedulable/save/lastTrigger-nextDate-null");
|
||||
} else {
|
||||
triggerContext = lastTrigger;
|
||||
}
|
||||
@@ -565,7 +580,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
Trigger triggerRunning = Trigger.of(f.getTriggerContext(), now);
|
||||
var flowWithTrigger = f.toBuilder().triggerContext(triggerRunning).build();
|
||||
try {
|
||||
this.triggerState.save(triggerRunning, scheduleContext);
|
||||
this.triggerState.save(triggerRunning, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-true/polling");
|
||||
this.sendWorkerTriggerToWorker(flowWithTrigger);
|
||||
} catch (InternalException e) {
|
||||
logService.logTrigger(
|
||||
@@ -590,7 +605,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
schedule.nextEvaluationDate(f.getConditionContext(), Optional.of(f.getTriggerContext()))
|
||||
);
|
||||
trigger = trigger.checkBackfill();
|
||||
this.triggerState.save(trigger, scheduleContext);
|
||||
this.triggerState.save(trigger, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-true/schedule");
|
||||
}
|
||||
} else {
|
||||
logService.logTrigger(
|
||||
@@ -608,7 +623,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
logError(f, e);
|
||||
}
|
||||
var trigger = f.getTriggerContext().toBuilder().nextExecutionDate(nextExecutionDate).build().checkBackfill();
|
||||
this.triggerState.save(trigger, scheduleContext);
|
||||
this.triggerState.save(trigger, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-false");
|
||||
}
|
||||
} catch (Exception ie) {
|
||||
// validate schedule condition can fail to render variables
|
||||
@@ -625,13 +640,14 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
.build();
|
||||
ZonedDateTime nextExecutionDate = this.nextEvaluationDate(f.getAbstractTrigger());
|
||||
var trigger = f.getTriggerContext().resetExecution(State.Type.FAILED, nextExecutionDate);
|
||||
this.saveLastTriggerAndEmitExecution(execution, trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext));
|
||||
this.saveLastTriggerAndEmitExecution(execution, trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext, "/kestra/services/scheduler/handle/save/on-error"));
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
private void handleEvaluateWorkerTriggerResult(SchedulerExecutionWithTrigger result, ZonedDateTime nextExecutionDate) {
|
||||
private void handleEvaluateWorkerTriggerResult(SchedulerExecutionWithTrigger result, ZonedDateTime
|
||||
nextExecutionDate) {
|
||||
Optional.ofNullable(result)
|
||||
.ifPresent(executionWithTrigger -> {
|
||||
log(executionWithTrigger);
|
||||
@@ -649,7 +665,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
);
|
||||
}
|
||||
|
||||
private void handleEvaluateSchedulingTriggerResult(Schedulable schedule, SchedulerExecutionWithTrigger result, ConditionContext conditionContext, ScheduleContextInterface scheduleContext) throws Exception {
|
||||
private void handleEvaluateSchedulingTriggerResult(Schedulable schedule, SchedulerExecutionWithTrigger
|
||||
result, ConditionContext conditionContext, ScheduleContextInterface scheduleContext) throws Exception {
|
||||
log(result);
|
||||
Trigger trigger = Trigger.of(
|
||||
result.getTriggerContext(),
|
||||
@@ -665,10 +682,11 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
|
||||
// Schedule triggers are being executed directly from the handle method within the context where triggers are locked.
|
||||
// So we must save them by passing the scheduleContext.
|
||||
this.saveLastTriggerAndEmitExecution(result.getExecution(), trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext));
|
||||
this.saveLastTriggerAndEmitExecution(result.getExecution(), trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext, "/kestra/services/scheduler/handleEvaluateSchedulingTriggerResult/save"));
|
||||
}
|
||||
|
||||
protected void saveLastTriggerAndEmitExecution(Execution execution, Trigger trigger, Consumer<Trigger> saveAction) {
|
||||
protected void saveLastTriggerAndEmitExecution(Execution execution, Trigger
|
||||
trigger, Consumer<Trigger> saveAction) {
|
||||
saveAction.accept(trigger);
|
||||
this.emitExecution(execution, trigger);
|
||||
}
|
||||
@@ -844,7 +862,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
}
|
||||
}
|
||||
|
||||
private void logError(ConditionContext conditionContext, FlowWithSource flow, AbstractTrigger trigger, Throwable e) {
|
||||
private void logError(ConditionContext conditionContext, FlowWithSource flow, AbstractTrigger
|
||||
trigger, Throwable e) {
|
||||
Logger logger = conditionContext.getRunContext().logger();
|
||||
|
||||
logService.logFlow(
|
||||
@@ -1017,4 +1036,12 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
public ServiceState getState() {
|
||||
return state.get();
|
||||
}
|
||||
|
||||
protected Trigger resetExecution(FlowWithSource flow, Execution execution, Trigger trigger) {
|
||||
Flow flowWithDefaults = pluginDefaultService.injectDefaults(flow, execution);
|
||||
RunContext runContext = runContextFactory.of(flowWithDefaults, flowWithDefaults.findTriggerByTriggerId(trigger.getTriggerId()));
|
||||
ConditionContext conditionContext = conditionService.conditionContext(runContext, flowWithDefaults, null);
|
||||
|
||||
return trigger.resetExecution(flowWithDefaults, execution, conditionContext);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
@@ -21,11 +22,18 @@ public interface SchedulerTriggerStateInterface {
|
||||
|
||||
Trigger create(Trigger trigger) throws ConstraintViolationException;
|
||||
|
||||
Trigger save(Trigger trigger, ScheduleContextInterface scheduleContext, String headerContent) throws ConstraintViolationException;
|
||||
|
||||
Trigger create(Trigger trigger, String headerContent) throws ConstraintViolationException;
|
||||
|
||||
Trigger update(Trigger trigger);
|
||||
|
||||
Trigger update(Flow flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) throws Exception;
|
||||
|
||||
|
||||
/**
|
||||
* QueueException required for Kafka implementation
|
||||
*/
|
||||
void delete(Trigger trigger) throws QueueException;
|
||||
/**
|
||||
* Used by the JDBC implementation: find triggers in all tenants.
|
||||
*/
|
||||
|
||||
@@ -27,6 +27,7 @@ import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.GraphUtils;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.plugin.core.flow.Pause;
|
||||
import io.kestra.plugin.core.flow.WorkingDirectory;
|
||||
import io.micronaut.context.event.ApplicationEventPublisher;
|
||||
@@ -214,7 +215,7 @@ public class ExecutionService {
|
||||
execution.withState(State.Type.RESTARTED).getState()
|
||||
);
|
||||
|
||||
List<Label> newLabels = new ArrayList<>(execution.getLabels());
|
||||
List<Label> newLabels = new ArrayList<>(ListUtils.emptyOnNull(execution.getLabels()));
|
||||
if (!newLabels.contains(new Label(Label.RESTARTED, "true"))) {
|
||||
newLabels.add(new Label(Label.RESTARTED, "true"));
|
||||
}
|
||||
@@ -297,7 +298,7 @@ public class ExecutionService {
|
||||
taskRunId == null ? new State() : execution.withState(State.Type.RESTARTED).getState()
|
||||
);
|
||||
|
||||
List<Label> newLabels = new ArrayList<>(execution.getLabels());
|
||||
List<Label> newLabels = new ArrayList<>(ListUtils.emptyOnNull(execution.getLabels()));
|
||||
if (!newLabels.contains(new Label(Label.REPLAY, "true"))) {
|
||||
newLabels.add(new Label(Label.REPLAY, "true"));
|
||||
}
|
||||
@@ -486,7 +487,7 @@ public class ExecutionService {
|
||||
return getFirstPausedTaskOr(execution, flow)
|
||||
.flatMap(task -> {
|
||||
if (task.isPresent() && task.get() instanceof Pause pauseTask) {
|
||||
return Mono.just(flowInputOutput.resolveInputs(pauseTask.getOnResume(), execution, Map.of()));
|
||||
return Mono.just(flowInputOutput.resolveInputs(pauseTask.getOnResume(), flow, execution, Map.of()));
|
||||
} else {
|
||||
return Mono.just(Collections.emptyList());
|
||||
}
|
||||
@@ -507,7 +508,7 @@ public class ExecutionService {
|
||||
return getFirstPausedTaskOr(execution, flow)
|
||||
.flatMap(task -> {
|
||||
if (task.isPresent() && task.get() instanceof Pause pauseTask) {
|
||||
return flowInputOutput.validateExecutionInputs(pauseTask.getOnResume(), execution, inputs);
|
||||
return flowInputOutput.validateExecutionInputs(pauseTask.getOnResume(), flow, execution, inputs);
|
||||
} else {
|
||||
return Mono.just(Collections.emptyList());
|
||||
}
|
||||
@@ -528,7 +529,7 @@ public class ExecutionService {
|
||||
return getFirstPausedTaskOr(execution, flow)
|
||||
.flatMap(task -> {
|
||||
if (task.isPresent() && task.get() instanceof Pause pauseTask) {
|
||||
return flowInputOutput.readExecutionInputs(pauseTask.getOnResume(), execution, inputs);
|
||||
return flowInputOutput.readExecutionInputs(pauseTask.getOnResume(), flow, execution, inputs);
|
||||
} else {
|
||||
return Mono.just(Collections.<String, Object>emptyMap());
|
||||
}
|
||||
|
||||
@@ -128,21 +128,13 @@ public class FlowService {
|
||||
return deprecationTraversal("", flow).toList();
|
||||
}
|
||||
|
||||
public List<String> warnings(Flow flow) {
|
||||
|
||||
public List<String> warnings(Flow flow, String tenantId) {
|
||||
if (flow == null) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
List<String> warnings = new ArrayList<>();
|
||||
List<io.kestra.plugin.core.trigger.Flow> flowTriggers = ListUtils.emptyOnNull(flow.getTriggers()).stream()
|
||||
.filter(io.kestra.plugin.core.trigger.Flow.class::isInstance)
|
||||
.map(io.kestra.plugin.core.trigger.Flow.class::cast)
|
||||
.toList();
|
||||
flowTriggers.forEach(flowTrigger -> {
|
||||
if (ListUtils.emptyOnNull(flowTrigger.getConditions()).isEmpty() && flowTrigger.getPreconditions() == null) {
|
||||
warnings.add("This flow will be triggered for EVERY execution of EVERY flow on your instance. We recommend adding the preconditions property to the Flow trigger '" + flowTrigger.getId() + "'.");
|
||||
}
|
||||
});
|
||||
List<String> warnings = new ArrayList<>(checkValidSubflows(flow, tenantId));
|
||||
|
||||
return warnings;
|
||||
}
|
||||
@@ -161,29 +153,31 @@ public class FlowService {
|
||||
}
|
||||
|
||||
// check if subflow is present in given namespace
|
||||
public void checkValidSubflows(Flow flow) {
|
||||
public List<String> checkValidSubflows(Flow flow, String tenantId) {
|
||||
List<io.kestra.plugin.core.flow.Subflow> subFlows = ListUtils.emptyOnNull(flow.getTasks()).stream()
|
||||
.filter(io.kestra.plugin.core.flow.Subflow.class::isInstance)
|
||||
.map(io.kestra.plugin.core.flow.Subflow.class::cast)
|
||||
.toList();
|
||||
|
||||
Set<ConstraintViolation<?>> violations = new HashSet<>();
|
||||
List<String> violations = new ArrayList<>();
|
||||
|
||||
subFlows.forEach(subflow -> {
|
||||
Optional<Flow> optional = findById(flow.getTenantId(), subflow.getNamespace(), subflow.getFlowId());
|
||||
String regex = ".*\\{\\{.+}}.*"; // regex to check if string contains pebble
|
||||
String subflowId = subflow.getFlowId();
|
||||
String namespace = subflow.getNamespace();
|
||||
if (subflowId.matches(regex) || namespace.matches(regex)) {
|
||||
return;
|
||||
}
|
||||
Optional<Flow> optional = findById(tenantId, subflow.getNamespace(), subflow.getFlowId());
|
||||
|
||||
violations.add(ManualConstraintViolation.of(
|
||||
"The subflow '" + subflow.getFlowId() + "' not found in namespace '" + subflow.getNamespace() + "'.",
|
||||
flow,
|
||||
Flow.class,
|
||||
"flow.tasks",
|
||||
flow.getNamespace()
|
||||
));
|
||||
if (optional.isEmpty()) {
|
||||
violations.add("The subflow '" + subflow.getFlowId() + "' not found in namespace '" + subflow.getNamespace() + "'.");
|
||||
} else if (optional.get().isDisabled()) {
|
||||
violations.add("The subflow '" + subflow.getFlowId() + "' is disabled in namespace '" + subflow.getNamespace() + "'.");
|
||||
}
|
||||
});
|
||||
|
||||
if (!violations.isEmpty()) {
|
||||
throw new ConstraintViolationException(violations);
|
||||
}
|
||||
return violations;
|
||||
}
|
||||
|
||||
public record Relocation(String from, String to) {}
|
||||
|
||||
@@ -6,6 +6,7 @@ import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import jakarta.annotation.Nullable;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
@@ -54,9 +55,9 @@ public final class LabelService {
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean containsAll(List<Label> labelsContainer, List<Label> labelsThatMustBeIncluded) {
|
||||
Map<String, String> labelsContainerMap = labelsContainer.stream().collect(HashMap::new, (m, label)-> m.put(label.key(), label.value()), HashMap::putAll);
|
||||
public static boolean containsAll(@Nullable List<Label> labelsContainer, @Nullable List<Label> labelsThatMustBeIncluded) {
|
||||
Map<String, String> labelsContainerMap = ListUtils.emptyOnNull(labelsContainer).stream().collect(HashMap::new, (m, label)-> m.put(label.key(), label.value()), HashMap::putAll);
|
||||
|
||||
return labelsThatMustBeIncluded.stream().allMatch(label -> Objects.equals(labelsContainerMap.get(label.key()), label.value()));
|
||||
return ListUtils.emptyOnNull(labelsThatMustBeIncluded).stream().allMatch(label -> Objects.equals(labelsContainerMap.get(label.key()), label.value()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -177,16 +177,28 @@ public class PluginDefaultService {
|
||||
*/
|
||||
public FlowWithSource injectDefaults(FlowWithSource flow) throws ConstraintViolationException {
|
||||
try {
|
||||
Map<String, Object> flowAsMap = OBJECT_MAPPER.readValue(flow.getSource(), JacksonMapper.MAP_TYPE_REFERENCE);
|
||||
String source = flow.getSource();
|
||||
if (source == null) {
|
||||
// Flow revisions created from older Kestra versions may not be linked to their original source.
|
||||
// In such cases, fall back to the generated source approach to enable plugin default injection.
|
||||
source = flow.generateSource();
|
||||
}
|
||||
|
||||
Flow withDefault = innerInjectDefault(flow, flowAsMap);
|
||||
if (source == null) {
|
||||
// return immediately if source is still null (should never happen)
|
||||
return flow;
|
||||
}
|
||||
|
||||
Map<String, Object> flowAsMap = OBJECT_MAPPER.readValue(source, JacksonMapper.MAP_TYPE_REFERENCE);
|
||||
|
||||
Flow withDefault = innerInjectDefault(flow, flowAsMap);
|
||||
|
||||
// revision and tenants are not in the source, so we copy them manually
|
||||
return withDefault.toBuilder()
|
||||
.tenantId(flow.getTenantId())
|
||||
.revision(flow.getRevision())
|
||||
.build()
|
||||
.withSource(flow.getSource());
|
||||
.withSource(source);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
@@ -36,12 +36,15 @@ public final class TraceUtils {
|
||||
public static Attributes attributesFrom(RunContext runContext) {
|
||||
var flowInfo = runContext.flowInfo();
|
||||
var execution = (Map<String, Object>) runContext.getVariables().get("execution");
|
||||
var executionId = (String) execution.get("id");
|
||||
var executionId = execution != null ? (String) execution.get("id") : null;
|
||||
|
||||
var builder = Attributes.builder()
|
||||
.put(ATTR_NAMESPACE, flowInfo.namespace())
|
||||
.put(ATTR_FLOW_ID, flowInfo.id())
|
||||
.put(ATTR_EXECUTION_ID, executionId);
|
||||
.put(ATTR_FLOW_ID, flowInfo.id());
|
||||
|
||||
if (executionId != null) {
|
||||
builder.put(ATTR_EXECUTION_ID, executionId);
|
||||
}
|
||||
|
||||
if (flowInfo.tenantId() != null) {
|
||||
builder.put(ATTR_TENANT_ID, flowInfo.tenantId());
|
||||
|
||||
@@ -10,6 +10,8 @@ import io.micronaut.validation.validator.constraints.ConstraintValidator;
|
||||
import io.micronaut.validation.validator.constraints.ConstraintValidatorContext;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import static io.kestra.core.models.triggers.TimeWindow.Type.DURATION_WINDOW;
|
||||
|
||||
@Singleton
|
||||
@Introspected
|
||||
public class TimeWindowValidator implements ConstraintValidator<TimeWindowValidation, TimeWindow> {
|
||||
@@ -23,7 +25,8 @@ public class TimeWindowValidator implements ConstraintValidator<TimeWindowValida
|
||||
return true;
|
||||
}
|
||||
|
||||
return switch (value.getType()) {
|
||||
TimeWindow.Type type = value.getType() != null ? value.getType() : DURATION_WINDOW;
|
||||
return switch (type) {
|
||||
case DAILY_TIME_DEADLINE -> {
|
||||
if (value.getWindow() != null || value.getWindowAdvance() != null || value.getStartTime() != null || value.getEndTime() != null) {
|
||||
context.disableDefaultConstraintViolation();
|
||||
|
||||
@@ -28,9 +28,9 @@ import java.util.*;
|
||||
@Schema(
|
||||
title = "Run a flow if the list of preconditions are met in a time window.",
|
||||
description = """
|
||||
**This task is deprecated**, use io.kestra.plugin.core.condition.ExecutionsWindow or io.kestra.plugin.core.condition.FilteredExecutionsWindow instead.
|
||||
**This task is deprecated**, use the `preconditions` property of the `io.kestra.plugin.core.trigger.Flow` trigger instead.
|
||||
Will trigger an executions when all the flows defined by the preconditions are successfully executed in a specific period of time.
|
||||
The period is defined by the `timeSLA` property and is by default a duration window of 24 hours."""
|
||||
The period is defined by the `timeWindow` property and is by default a duration window of 24 hours."""
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
@@ -47,7 +47,7 @@ import java.util.*;
|
||||
" - SUCCESS",
|
||||
" - id: multiple",
|
||||
" type: io.kestra.plugin.core.condition.MultipleCondition",
|
||||
" sla:",
|
||||
" timeWindow:",
|
||||
" window: PT12H",
|
||||
" conditions:",
|
||||
" flow-a:",
|
||||
@@ -102,7 +102,7 @@ public class MultipleCondition extends Condition implements io.kestra.core.model
|
||||
|
||||
@Schema(
|
||||
title = "The duration of the window",
|
||||
description = "Deprecated, use `timeSLA.window` instead.")
|
||||
description = "Deprecated, use `timeWindow.window` instead.")
|
||||
@PluginProperty
|
||||
@Deprecated
|
||||
private Duration window;
|
||||
@@ -114,7 +114,7 @@ public class MultipleCondition extends Condition implements io.kestra.core.model
|
||||
|
||||
@Schema(
|
||||
title = "The window advance duration",
|
||||
description = "Deprecated, use `timeSLA.windowAdvance` instead.")
|
||||
description = "Deprecated, use `timeWindow.windowAdvance` instead.")
|
||||
@PluginProperty
|
||||
@Deprecated
|
||||
private Duration windowAdvance;
|
||||
|
||||
@@ -37,8 +37,8 @@ import static io.kestra.core.utils.Rethrow.throwPredicate;
|
||||
"- conditions:",
|
||||
" - type: io.kestra.plugin.core.condition.Not",
|
||||
" conditions:",
|
||||
" - type: io.kestra.plugin.core.condition.DateBetween",
|
||||
" after: \"2013-09-08T16:19:12\"",
|
||||
" - type: io.kestra.plugin.core.condition.DateTimeBetween",
|
||||
" after: \"2013-09-08T16:19:12Z\"",
|
||||
}
|
||||
)
|
||||
},
|
||||
|
||||
@@ -0,0 +1,4 @@
|
||||
@PluginSubGroup(categories = PluginSubGroup.PluginCategory.CORE)
|
||||
package io.kestra.plugin.core.dashboard.chart;
|
||||
|
||||
import io.kestra.core.models.annotations.PluginSubGroup;
|
||||
@@ -0,0 +1,4 @@
|
||||
@PluginSubGroup(title = "Data Filter", categories = PluginSubGroup.PluginCategory.CORE)
|
||||
package io.kestra.plugin.core.dashboard.data;
|
||||
|
||||
import io.kestra.core.models.annotations.PluginSubGroup;
|
||||
@@ -68,8 +68,8 @@ public class Return extends Task implements RunnableTask<Return.Output> {
|
||||
long end = System.nanoTime();
|
||||
|
||||
runContext
|
||||
.metric(Counter.of("length", Optional.ofNullable(render).map(String::length).orElse(0), "format", render))
|
||||
.metric(Timer.of("duration", Duration.ofNanos(end - start), "format", render));
|
||||
.metric(Counter.of("length", Optional.ofNullable(render).map(String::length).orElse(0)))
|
||||
.metric(Timer.of("duration", Duration.ofNanos(end - start)));
|
||||
|
||||
return Output.builder()
|
||||
.value(render)
|
||||
|
||||
@@ -26,7 +26,6 @@ import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.http.HttpHeaders;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.util.AbstractMap;
|
||||
import java.util.HashMap;
|
||||
@@ -57,7 +56,8 @@ public abstract class AbstractHttp extends Task implements HttpInterface {
|
||||
|
||||
protected Property<Map<CharSequence, CharSequence>> headers;
|
||||
|
||||
protected HttpConfiguration options;
|
||||
@Builder.Default
|
||||
protected HttpConfiguration options = HttpConfiguration.builder().build();
|
||||
|
||||
@Deprecated
|
||||
@Schema(
|
||||
@@ -72,10 +72,7 @@ public abstract class AbstractHttp extends Task implements HttpInterface {
|
||||
this.options = HttpConfiguration.builder()
|
||||
.build();
|
||||
}
|
||||
|
||||
this.options = this.options.toBuilder()
|
||||
.allowFailed(allowFailed)
|
||||
.build();
|
||||
this.options.setAllowFailed(allowFailed);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
@@ -89,9 +86,7 @@ public abstract class AbstractHttp extends Task implements HttpInterface {
|
||||
}
|
||||
|
||||
this.sslOptions = sslOptions;
|
||||
this.options = this.options.toBuilder()
|
||||
.ssl(sslOptions)
|
||||
.build();
|
||||
this.options.setSsl(sslOptions);
|
||||
}
|
||||
|
||||
protected HttpClient client(RunContext runContext) throws IllegalVariableEvaluationException, MalformedURLException, URISyntaxException {
|
||||
@@ -103,9 +98,11 @@ public abstract class AbstractHttp extends Task implements HttpInterface {
|
||||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
protected HttpRequest request(RunContext runContext) throws IllegalVariableEvaluationException, URISyntaxException, IOException {
|
||||
// ideally we should URLEncode the path of the UI, but as we cannot URLEncode everything, we handle the common case of space in the URI.
|
||||
String renderedUri = runContext.render(this.uri).as(String.class).map(s -> s.replace(" ", "%20")).orElseThrow();
|
||||
HttpRequest.HttpRequestBuilder request = HttpRequest.builder()
|
||||
.method(runContext.render(this.method).as(String.class).orElse(null))
|
||||
.uri(new URI(runContext.render(this.uri).as(String.class).orElseThrow()));
|
||||
.uri(new URI(renderedUri));
|
||||
|
||||
var renderedFormData = runContext.render(this.formData).asMap(String.class, Object.class);
|
||||
if (!renderedFormData.isEmpty()) {
|
||||
@@ -157,9 +154,11 @@ public abstract class AbstractHttp extends Task implements HttpInterface {
|
||||
request.body(HttpRequest.StringRequestBody.builder()
|
||||
.content(runContext.render(body).as(String.class).orElseThrow())
|
||||
.contentType(runContext.render(this.contentType).as(String.class).orElse(null))
|
||||
.charset(this.options != null ? runContext.render(this.options.getDefaultCharset()).as(Charset.class).orElse(null) : StandardCharsets.UTF_8)
|
||||
.charset(this.options != null && this.options.getDefaultCharset() != null ? runContext.render(this.options.getDefaultCharset()).as(Charset.class).orElse(null) : null)
|
||||
.build()
|
||||
);
|
||||
} else if (this.contentType != null) {
|
||||
request.addHeader("Content-Type", runContext.render(this.contentType).as(String.class).orElse(null));
|
||||
}
|
||||
|
||||
var renderedHeader = runContext.render(this.headers).asMap(CharSequence.class, CharSequence.class);
|
||||
@@ -178,6 +177,7 @@ public abstract class AbstractHttp extends Task implements HttpInterface {
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
return request.build();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,6 +135,13 @@ public class Trigger extends AbstractTrigger implements PollingTriggerInterface,
|
||||
RunContext runContext = conditionContext.getRunContext();
|
||||
Logger logger = runContext.logger();
|
||||
|
||||
if (this.options == null){
|
||||
this.options = HttpConfiguration.builder().build();
|
||||
}
|
||||
// we allow failed status code as it is the condition that must determine whether we trigger the flow
|
||||
options.setAllowFailed(Property.of(true));
|
||||
options.setSsl(this.options.getSsl() != null ? this.options.getSsl() : this.sslOptions);
|
||||
|
||||
var request = Request.builder()
|
||||
.uri(this.uri)
|
||||
.method(this.method)
|
||||
@@ -142,12 +149,7 @@ public class Trigger extends AbstractTrigger implements PollingTriggerInterface,
|
||||
.formData(this.formData)
|
||||
.contentType(this.contentType)
|
||||
.headers(this.headers)
|
||||
.options((this.options == null ? HttpConfiguration.builder() : this.options.toBuilder())
|
||||
// we allow failed status code as it is the condition that must determine whether we trigger the flow
|
||||
.allowFailed(Property.of(true))
|
||||
.ssl(this.options != null && this.options.getSsl() != null ? this.options.getSsl() : this.sslOptions)
|
||||
.build()
|
||||
)
|
||||
.options(this.options)
|
||||
.encryptBody(this.encryptBody)
|
||||
.build();
|
||||
var output = request.run(runContext);
|
||||
@@ -185,8 +187,6 @@ public class Trigger extends AbstractTrigger implements PollingTriggerInterface,
|
||||
}
|
||||
|
||||
this.sslOptions = sslOptions;
|
||||
this.options = this.options.toBuilder()
|
||||
.ssl(sslOptions)
|
||||
.build();
|
||||
this.options.setSsl(sslOptions);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -96,7 +96,7 @@ public class PurgeLogs extends Task implements RunnableTask<PurgeLogs.Output> {
|
||||
flowService.checkAllowedNamespace(flowInfo.tenantId(), runContext.render(namespace).as(String.class).orElse(null), flowInfo.tenantId(), flowInfo.namespace());
|
||||
}
|
||||
|
||||
var logLevelsRendered = runContext.render(this.logLevels).asList(String.class);
|
||||
var logLevelsRendered = runContext.render(this.logLevels).asList(Level.class);
|
||||
var renderedDate = runContext.render(startDate).as(String.class).orElse(null);
|
||||
int deleted = logService.purge(
|
||||
flowInfo.tenantId(),
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package io.kestra.plugin.core.runner;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.runners.*;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
@@ -14,6 +16,7 @@ import lombok.experimental.SuperBuilder;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
@@ -133,11 +136,14 @@ public class Process extends TaskRunner<TaskRunnerDetailResult> {
|
||||
environment.putAll(this.env(runContext, taskCommands));
|
||||
|
||||
processBuilder.directory(taskCommands.getWorkingDirectory().toFile());
|
||||
processBuilder.command(taskCommands.getCommands());
|
||||
|
||||
List<String> renderedCommands = runContext.render(taskCommands.getCommands()).asList(String.class);
|
||||
|
||||
processBuilder.command(renderedCommands);
|
||||
|
||||
java.lang.Process process = processBuilder.start();
|
||||
long pid = process.pid();
|
||||
logger.debug("Starting command with pid {} [{}]", pid, String.join(" ", taskCommands.getCommands()));
|
||||
logger.debug("Starting command with pid {} [{}]", pid, String.join(" ", renderedCommands));
|
||||
|
||||
LogRunnable stdOutRunnable = new LogRunnable(process.getInputStream(), defaultLogConsumer, false);
|
||||
LogRunnable stdErrRunnable = new LogRunnable(process.getErrorStream(), defaultLogConsumer, true);
|
||||
|
||||
@@ -0,0 +1,37 @@
|
||||
package io.kestra.core.contexts;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
@KestraTest
|
||||
class KestraContextTest {
|
||||
|
||||
@Inject
|
||||
KestraContext context;
|
||||
|
||||
@Test
|
||||
void shouldGetWorkerMaxNumThreads() {
|
||||
// When
|
||||
context.injectWorkerConfigs(16, null);
|
||||
|
||||
// Then
|
||||
assertThat(KestraContext.getContext().getWorkerMaxNumThreads(), is(Optional.of(16)));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldGetWorkerGroupKey() {
|
||||
// When
|
||||
context.injectWorkerConfigs(null, "my-key");
|
||||
|
||||
// Then
|
||||
assertThat(KestraContext.getContext().getWorkerGroupKey(), is(Optional.of("my-key")));
|
||||
}
|
||||
}
|
||||
@@ -149,7 +149,7 @@ class ClassPluginDocumentationTest {
|
||||
assertThat(oneOf.getFirst().get("type"), is("integer"));
|
||||
assertThat(oneOf.getFirst().get("$dynamic"), is(true));
|
||||
assertThat(oneOf.get(1).get("type"), is("string"));
|
||||
assertThat(oneOf.get(1).get("pattern"), is(".*{{.*}}.*"));
|
||||
// assertThat(oneOf.get(1).get("pattern"), is(".*{{.*}}.*"));
|
||||
|
||||
Map<String, Object> withDefault = (Map<String, Object>) properties.get("withDefault");
|
||||
assertThat(withDefault.get("type"), is("string"));
|
||||
|
||||
@@ -26,6 +26,7 @@ import io.kestra.plugin.core.flow.Dag;
|
||||
import io.kestra.plugin.core.log.Log;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.hamcrest.Matchers;
|
||||
@@ -238,6 +239,15 @@ class JsonSchemaGeneratorTest {
|
||||
assertThat(((Map<String, Map<String, Object>>) generate.get("properties")).get("beta").get("$beta"), is(true));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
void requiredAreRemovedIfThereIsADefault() {
|
||||
Map<String, Object> generate = jsonSchemaGenerator.properties(Task.class, RequiredWithDefault.class);
|
||||
assertThat(generate, is(not(nullValue())));
|
||||
assertThat((List<String>) generate.get("required"), not(containsInAnyOrder("requiredWithDefault")));
|
||||
assertThat((List<String>) generate.get("required"), containsInAnyOrder("requiredWithNoDefault"));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
void dashboard() throws URISyntaxException {
|
||||
@@ -324,6 +334,7 @@ class JsonSchemaGeneratorTest {
|
||||
}
|
||||
|
||||
@Schema(title = "Test class")
|
||||
@Builder
|
||||
private static class TestClass {
|
||||
@Schema(title = "Test property")
|
||||
public String testProperty;
|
||||
@@ -360,4 +371,21 @@ class JsonSchemaGeneratorTest {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@SuperBuilder
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Plugin
|
||||
public static class RequiredWithDefault extends Task {
|
||||
@PluginProperty
|
||||
@NotNull
|
||||
@Builder.Default
|
||||
private Property<TaskWithEnum.TestClass> requiredWithDefault = Property.of(TaskWithEnum.TestClass.builder().testProperty("test").build());
|
||||
|
||||
@PluginProperty
|
||||
@NotNull
|
||||
private Property<TaskWithEnum.TestClass> requiredWithNoDefault;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -351,6 +351,21 @@ class HttpClientTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void specialContentType() throws IllegalVariableEvaluationException, HttpClientException, IOException {
|
||||
try (HttpClient client = client()) {
|
||||
HttpResponse<String> response = client.request(
|
||||
HttpRequest.of(URI.create(embeddedServerUri + "/http/content-type"), Map.of(
|
||||
"Content-Type", List.of("application/vnd.campaignsexport.v1+json")
|
||||
)),
|
||||
String.class
|
||||
);
|
||||
|
||||
assertThat(response.getStatus().getCode(), is(200));
|
||||
assertThat(response.getBody(), is("application/vnd.campaignsexport.v1+json"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void getProxy() throws IllegalVariableEvaluationException, HttpClientException, IOException {
|
||||
try (HttpClient client = client(b -> b
|
||||
@@ -383,6 +398,12 @@ class HttpClientTest {
|
||||
return io.micronaut.http.HttpResponse.ok("pong");
|
||||
}
|
||||
|
||||
@Get("content-type")
|
||||
@Produces(MediaType.TEXT_PLAIN)
|
||||
public io.micronaut.http.HttpResponse<String> contentType(io.micronaut.http.HttpRequest<?> request) {
|
||||
return io.micronaut.http.HttpResponse.ok(request.getContentType().orElseThrow().toString());
|
||||
}
|
||||
|
||||
@Get("json")
|
||||
public io.micronaut.http.HttpResponse<Object> json(@QueryValue(defaultValue = "false") Boolean array) {
|
||||
return io.micronaut.http.HttpResponse.ok(array ? List.of(1, 2, 3) : Map.of("ping", "pong"));
|
||||
|
||||
@@ -298,6 +298,30 @@ class PropertyTest {
|
||||
assertThat(output.getMap().get("mapKey2"), is("mapValue2"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void aListToRender() throws Exception {
|
||||
var task = DynamicPropertyExampleTask.builder()
|
||||
.items(new Property<>("""
|
||||
["python test.py --input1 \\"{{ item1 }}\\" --input2 \\"{{ item2 }}\\"", "'gs://{{ renderOnce(\\"bucket\\") }}/{{ 'table' }}/{{ 'file' }}_*.csv.gz'"]"""))
|
||||
.properties(new Property<>("""
|
||||
{
|
||||
"key1": "{{value1}}",
|
||||
"key2": "{{value2}}"
|
||||
}"""))
|
||||
.build();
|
||||
var runContext = runContextFactory.of(Map.ofEntries(
|
||||
entry("item1", "item1"),
|
||||
entry("item2", "item2"),
|
||||
entry("value1", "value1"),
|
||||
entry("value2", "value2")
|
||||
));
|
||||
|
||||
var output = task.run(runContext);
|
||||
|
||||
assertThat(output, notNullValue());
|
||||
assertThat(output.getList(), containsInAnyOrder("python test.py --input1 \"item1\" --input2 \"item2\"", "'gs://bucket/table/file_*.csv.gz'"));
|
||||
}
|
||||
|
||||
@Builder
|
||||
@Getter
|
||||
private static class TestObj {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.core.models.tasks.runners;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
@@ -149,7 +150,17 @@ public class TaskRunnerTest {
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getCommands() {
|
||||
public Property<List<String>> getInterpreter() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Property<List<String>> getBeforeCommands() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Property<List<String>> getCommands() {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@@ -711,4 +711,12 @@ public abstract class AbstractExecutionRepositoryTest {
|
||||
.taskRunList(List.of())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Test
|
||||
protected void findAllAsync() {
|
||||
inject();
|
||||
|
||||
List<Execution> executions = executionRepository.findAllAsync(null).collectList().block();
|
||||
assertThat(executions, hasSize(28));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ import java.util.List;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
@KestraTest
|
||||
@@ -199,7 +200,7 @@ public abstract class AbstractLogRepositoryTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
void findAsych() {
|
||||
void findAsync() {
|
||||
logRepository.save(logEntry(Level.INFO).build());
|
||||
logRepository.save(logEntry(Level.ERROR).build());
|
||||
logRepository.save(logEntry(Level.WARN).build());
|
||||
@@ -208,18 +209,29 @@ public abstract class AbstractLogRepositoryTest {
|
||||
|
||||
Flux<LogEntry> find = logRepository.findAsync(null, "io.kestra.unittest", Level.INFO, startDate);
|
||||
List<LogEntry> logEntries = find.collectList().block();
|
||||
assertThat(logEntries.size(), is(3));
|
||||
assertThat(logEntries, hasSize(3));
|
||||
|
||||
find = logRepository.findAsync(null, null, Level.ERROR, startDate);
|
||||
logEntries = find.collectList().block();
|
||||
assertThat(logEntries.size(), is(1));
|
||||
assertThat(logEntries, hasSize(1));
|
||||
|
||||
find = logRepository.findAsync(null, "io.kestra.unused", Level.INFO, startDate);
|
||||
logEntries = find.collectList().block();
|
||||
assertThat(logEntries.size(), is(0));
|
||||
assertThat(logEntries, hasSize(0));
|
||||
|
||||
find = logRepository.findAsync(null, null, Level.INFO, startDate.plusSeconds(2));
|
||||
logEntries = find.collectList().block();
|
||||
assertThat(logEntries.size(), is(0));
|
||||
assertThat(logEntries, hasSize(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
void findAllAsync() {
|
||||
logRepository.save(logEntry(Level.INFO).build());
|
||||
logRepository.save(logEntry(Level.ERROR).build());
|
||||
logRepository.save(logEntry(Level.WARN).build());
|
||||
|
||||
Flux<LogEntry> find = logRepository.findAllAsync(null);
|
||||
List<LogEntry> logEntries = find.collectList().block();
|
||||
assertThat(logEntries, hasSize(3));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,7 +9,6 @@ import io.kestra.core.models.executions.metrics.Timer;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.time.Duration;
|
||||
@@ -17,6 +16,7 @@ import java.time.ZonedDateTime;
|
||||
import java.util.List;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
@KestraTest
|
||||
@@ -95,6 +95,20 @@ public abstract class AbstractMetricRepositoryTest {
|
||||
assertThat(tasksWithMetrics.size(), is(2));
|
||||
}
|
||||
|
||||
@Test
|
||||
void findAllAsync() {
|
||||
String executionId = FriendlyId.createFriendlyId();
|
||||
TaskRun taskRun1 = taskRun(executionId, "task");
|
||||
MetricEntry counter = MetricEntry.of(taskRun1, counter("counter"));
|
||||
TaskRun taskRun2 = taskRun(executionId, "task");
|
||||
MetricEntry timer = MetricEntry.of(taskRun2, timer());
|
||||
metricRepository.save(counter);
|
||||
metricRepository.save(timer);
|
||||
|
||||
List<MetricEntry> results = metricRepository.findAllAsync(null).collectList().block();
|
||||
assertThat(results, hasSize(2));
|
||||
}
|
||||
|
||||
private Counter counter(String metricName) {
|
||||
return Counter.of(metricName, 1);
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ package io.kestra.core.runners;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
import io.kestra.core.junit.annotations.ExecuteFlow;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
@@ -10,11 +9,9 @@ import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.queues.MessageTooBigException;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.plugin.core.flow.EachSequentialTest;
|
||||
import io.kestra.plugin.core.flow.FlowCaseTest;
|
||||
import io.kestra.plugin.core.flow.ForEachItemCaseTest;
|
||||
@@ -23,20 +20,12 @@ import io.kestra.plugin.core.flow.WaitForCaseTest;
|
||||
import io.kestra.plugin.core.flow.WorkingDirectoryTest;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.junit.jupiter.api.Tag;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.TestInstance;
|
||||
import org.junitpioneer.jupiter.RetryingTest;
|
||||
import org.slf4j.event.Level;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
@KestraTest(startRunner = true)
|
||||
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
||||
|
||||
@@ -65,7 +65,7 @@ class FlowInputOutputTest {
|
||||
Map<String, Object> data = Map.of("input1", "value1", "input2", "value2");
|
||||
|
||||
// When
|
||||
List<InputAndValue> values = flowInputOutput.resolveInputs(inputs, DEFAULT_TEST_EXECUTION, data);
|
||||
List<InputAndValue> values = flowInputOutput.resolveInputs(inputs, null, DEFAULT_TEST_EXECUTION, data);
|
||||
|
||||
// Then
|
||||
Assertions.assertEquals(
|
||||
@@ -98,7 +98,7 @@ class FlowInputOutputTest {
|
||||
Map<String, Object> data = Map.of("input1", "v1", "input2", "v2", "input3", "v3");
|
||||
|
||||
// When
|
||||
List<InputAndValue> values = flowInputOutput.resolveInputs(inputs, DEFAULT_TEST_EXECUTION, data);
|
||||
List<InputAndValue> values = flowInputOutput.resolveInputs(inputs, null, DEFAULT_TEST_EXECUTION, data);
|
||||
|
||||
// Then
|
||||
Assertions.assertEquals(
|
||||
@@ -132,7 +132,7 @@ class FlowInputOutputTest {
|
||||
Map<String, Object> data = Map.of("input1", "v1", "input2", "v2", "input3", "v3");
|
||||
|
||||
// When
|
||||
List<InputAndValue> values = flowInputOutput.resolveInputs(inputs, DEFAULT_TEST_EXECUTION, data);
|
||||
List<InputAndValue> values = flowInputOutput.resolveInputs(inputs, null, DEFAULT_TEST_EXECUTION, data);
|
||||
|
||||
// Then
|
||||
Assertions.assertEquals(
|
||||
@@ -162,7 +162,7 @@ class FlowInputOutputTest {
|
||||
Map<String, Object> data = Map.of("input1", "value1", "input2", "value2");
|
||||
|
||||
// When
|
||||
List<InputAndValue> values = flowInputOutput.resolveInputs(inputs, DEFAULT_TEST_EXECUTION, data);
|
||||
List<InputAndValue> values = flowInputOutput.resolveInputs(inputs, null, DEFAULT_TEST_EXECUTION, data);
|
||||
|
||||
// Then
|
||||
Assertions.assertEquals(
|
||||
@@ -191,7 +191,7 @@ class FlowInputOutputTest {
|
||||
Map<String, Object> data = Map.of("input1", "value1", "input2", "value2");
|
||||
|
||||
// When
|
||||
List<InputAndValue> values = flowInputOutput.resolveInputs(inputs, DEFAULT_TEST_EXECUTION, data);
|
||||
List<InputAndValue> values = flowInputOutput.resolveInputs(inputs, null, DEFAULT_TEST_EXECUTION, data);
|
||||
|
||||
// Then
|
||||
Assertions.assertEquals(2, values.size());
|
||||
@@ -211,7 +211,7 @@ class FlowInputOutputTest {
|
||||
Publisher<CompletedPart> data = Mono.just(new MemoryCompletedFileUpload("input", "input", "???".getBytes(StandardCharsets.UTF_8)));
|
||||
|
||||
// When
|
||||
List<InputAndValue> values = flowInputOutput.validateExecutionInputs(List.of(input), DEFAULT_TEST_EXECUTION, data).block();
|
||||
List<InputAndValue> values = flowInputOutput.validateExecutionInputs(List.of(input), null, DEFAULT_TEST_EXECUTION, data).block();
|
||||
|
||||
// Then
|
||||
Assertions.assertNull(values.getFirst().exception());
|
||||
@@ -238,7 +238,7 @@ class FlowInputOutputTest {
|
||||
Map<String, Object> data = Map.of("input42", "foo");
|
||||
|
||||
// When
|
||||
List<InputAndValue> values = flowInputOutput.resolveInputs(inputs, DEFAULT_TEST_EXECUTION, data);
|
||||
List<InputAndValue> values = flowInputOutput.resolveInputs(inputs, null, DEFAULT_TEST_EXECUTION, data);
|
||||
|
||||
// Then
|
||||
Assertions.assertEquals(
|
||||
|
||||
@@ -73,7 +73,7 @@ public class FileSizeFunctionTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldThrowIllegalArgumentException_givenMissingTrigger_andParentExecution() throws IOException {
|
||||
void shouldReadFromAnotherExecution() throws IOException, IllegalVariableEvaluationException {
|
||||
String executionId = IdUtils.create();
|
||||
URI internalStorageURI = getInternalStorageURI(executionId);
|
||||
URI internalStorageFile = getInternalStorageFile(internalStorageURI);
|
||||
@@ -85,12 +85,8 @@ public class FileSizeFunctionTest {
|
||||
"execution", Map.of("id", IdUtils.create())
|
||||
);
|
||||
|
||||
Exception ex = assertThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> variableRenderer.render("{{ fileSize('" + internalStorageFile + "') }}", variables)
|
||||
);
|
||||
|
||||
assertTrue(ex.getMessage().startsWith("Unable to read the file"), "Exception message doesn't match expected one");
|
||||
String size = variableRenderer.render("{{ fileSize('" + internalStorageFile + "') }}", variables);
|
||||
assertThat(size, is(FILE_SIZE));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -125,14 +125,13 @@ class ReadFileFunctionTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
void readUnauthorizedInternalStorageFile() throws IOException {
|
||||
void readInternalStorageFileFromAnotherExecution() throws IOException, IllegalVariableEvaluationException {
|
||||
String namespace = "my.namespace";
|
||||
String flowId = "flow";
|
||||
String executionId = IdUtils.create();
|
||||
URI internalStorageURI = URI.create("/" + namespace.replace(".", "/") + "/" + flowId + "/executions/" + executionId + "/tasks/task/" + IdUtils.create() + "/123456.ion");
|
||||
URI internalStorageFile = storageInterface.put(null, namespace, internalStorageURI, new ByteArrayInputStream("Hello from a task output".getBytes()));
|
||||
|
||||
// test for an un-authorized execution with no trigger
|
||||
Map<String, Object> variables = Map.of(
|
||||
"flow", Map.of(
|
||||
"id", "notme",
|
||||
@@ -140,39 +139,8 @@ class ReadFileFunctionTest {
|
||||
"execution", Map.of("id", "notme")
|
||||
);
|
||||
|
||||
var exception = assertThrows(IllegalArgumentException.class, () -> variableRenderer.render("{{ read('" + internalStorageFile + "') }}", variables));
|
||||
assertThat(exception.getMessage(), is("Unable to read the file '" + internalStorageFile + "' as it didn't belong to the current execution"));
|
||||
|
||||
// test for an un-authorized execution with a trigger of type execution
|
||||
Map<String, Object> executionTriggerVariables = Map.of(
|
||||
"flow", Map.of(
|
||||
"id", "notme",
|
||||
"namespace", "notme"),
|
||||
"execution", Map.of("id", "notme"),
|
||||
"trigger", Map.of(
|
||||
"flowId", "notme",
|
||||
"namespace", "notme",
|
||||
"executionId", "notme"
|
||||
)
|
||||
);
|
||||
|
||||
exception = assertThrows(IllegalArgumentException.class, () -> variableRenderer.render("{{ read('" + internalStorageFile + "') }}", executionTriggerVariables));
|
||||
assertThat(exception.getMessage(), is("Unable to read the file '" + internalStorageFile + "' as it didn't belong to the current execution"));
|
||||
|
||||
// test for an un-authorized execution with a trigger of another type
|
||||
Map<String, Object> triggerVariables = Map.of(
|
||||
"flow", Map.of(
|
||||
"id", "notme",
|
||||
"namespace", "notme"),
|
||||
"execution", Map.of("id", "notme"),
|
||||
"trigger", Map.of(
|
||||
"date", "somedate",
|
||||
"row", "somerow"
|
||||
)
|
||||
);
|
||||
|
||||
exception = assertThrows(IllegalArgumentException.class, () -> variableRenderer.render("{{ read('" + internalStorageFile + "') }}", triggerVariables));
|
||||
assertThat(exception.getMessage(), is("Unable to read the file '" + internalStorageFile + "' as it didn't belong to the current execution"));
|
||||
String render = variableRenderer.render("{{ read('" + internalStorageFile + "') }}", variables);
|
||||
assertThat(render, is("Hello from a task output"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -16,6 +16,7 @@ import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.plugin.core.debug.Return;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.plugin.core.flow.Sleep;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -101,6 +102,17 @@ abstract public class AbstractSchedulerTest {
|
||||
return FlowWithSource.of(flow, flow.generateSource());
|
||||
}
|
||||
|
||||
protected static FlowWithSource createLongRunningFlow(List<AbstractTrigger> triggers, List<PluginDefault> list) {
|
||||
return createFlow(triggers, list)
|
||||
.toBuilder()
|
||||
.tasks(
|
||||
Collections.singletonList(
|
||||
Sleep.builder().id("sleep").type(Sleep.class.getName()).duration(Duration.ofSeconds(125)).build()
|
||||
)
|
||||
)
|
||||
.build();
|
||||
}
|
||||
|
||||
protected static int COUNTER = 0;
|
||||
|
||||
@SuperBuilder
|
||||
|
||||
@@ -1,14 +1,15 @@
|
||||
package io.kestra.core.schedulers;
|
||||
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.jdbc.runner.JdbcScheduler;
|
||||
import io.kestra.plugin.core.condition.DayWeekInMonth;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.plugin.core.trigger.Schedule;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.runners.FlowListeners;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.jdbc.runner.JdbcScheduler;
|
||||
import io.kestra.plugin.core.condition.DayWeekInMonth;
|
||||
import io.kestra.plugin.core.trigger.Schedule;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import reactor.core.publisher.Flux;
|
||||
@@ -37,6 +38,9 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
|
||||
@Inject
|
||||
protected SchedulerExecutionStateInterface executionState;
|
||||
|
||||
@Inject
|
||||
protected FlowRepositoryInterface flowRepository;
|
||||
|
||||
private static Flow createScheduleFlow() {
|
||||
Schedule schedule = Schedule.builder()
|
||||
.id("hourly")
|
||||
@@ -66,6 +70,7 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
|
||||
CountDownLatch queueCount = new CountDownLatch(4);
|
||||
|
||||
Flow flow = createScheduleFlow();
|
||||
flowRepository.create(flow, flow.generateSource(), flow);
|
||||
|
||||
triggerState.create(Trigger.builder()
|
||||
.namespace(flow.getNamespace())
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
package io.kestra.core.schedulers;
|
||||
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.jdbc.runner.JdbcScheduler;
|
||||
import io.kestra.plugin.core.condition.Expression;
|
||||
@@ -46,6 +48,8 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
|
||||
@Inject
|
||||
private FlowListeners flowListenersService;
|
||||
|
||||
@Inject
|
||||
protected FlowRepositoryInterface flowRepository;
|
||||
|
||||
@Test
|
||||
void pollingTrigger() throws Exception {
|
||||
@@ -92,6 +96,7 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
|
||||
.toBuilder()
|
||||
.tasks(List.of(Fail.builder().id("fail").type(Fail.class.getName()).build()))
|
||||
.build();
|
||||
flowRepository.create(flow, flow.generateSource(), flow);
|
||||
doReturn(List.of(flow))
|
||||
.when(flowListenersServiceSpy)
|
||||
.flows();
|
||||
|
||||
@@ -6,13 +6,13 @@ import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.triggers.RecoverMissedSchedules;
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.runners.FlowListeners;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.jdbc.runner.JdbcScheduler;
|
||||
import io.kestra.plugin.core.trigger.ScheduleOnDates;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.RepeatedTest;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
@@ -25,7 +25,8 @@ import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.oneOf;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
@@ -36,6 +37,9 @@ public class SchedulerScheduleOnDatesTest extends AbstractSchedulerTest {
|
||||
@Inject
|
||||
protected SchedulerTriggerStateInterface triggerState;
|
||||
|
||||
@Inject
|
||||
protected FlowRepositoryInterface flowRepository;
|
||||
|
||||
private ScheduleOnDates.ScheduleOnDatesBuilder<?, ?> createScheduleOnDatesTrigger(String zone, List<ZonedDateTime> dates, String triggerId) {
|
||||
return ScheduleOnDates.builder()
|
||||
.id(triggerId)
|
||||
@@ -49,9 +53,9 @@ public class SchedulerScheduleOnDatesTest extends AbstractSchedulerTest {
|
||||
|
||||
private Flow createScheduleFlow(String zone, String triggerId) {
|
||||
var now = ZonedDateTime.now();
|
||||
var before = now.minusSeconds(1).truncatedTo(ChronoUnit.SECONDS);
|
||||
var after = now.plusSeconds(1).truncatedTo(ChronoUnit.SECONDS);
|
||||
var later = now.plusSeconds(2).truncatedTo(ChronoUnit.SECONDS);
|
||||
var before = now.minusSeconds(3).truncatedTo(ChronoUnit.SECONDS);
|
||||
var after = now.plusSeconds(3).truncatedTo(ChronoUnit.SECONDS);
|
||||
var later = now.plusSeconds(6).truncatedTo(ChronoUnit.SECONDS);
|
||||
ScheduleOnDates schedule = createScheduleOnDatesTrigger(zone, List.of(before, after, later), triggerId).build();
|
||||
|
||||
return createFlow(Collections.singletonList(schedule));
|
||||
@@ -75,6 +79,7 @@ public class SchedulerScheduleOnDatesTest extends AbstractSchedulerTest {
|
||||
|
||||
// then flow should be executed 4 times
|
||||
Flow flow = createScheduleFlow("Europe/Paris", "schedule");
|
||||
flowRepository.create(flow, flow.generateSource(), flow);
|
||||
|
||||
doReturn(List.of(flow))
|
||||
.when(flowListenersServiceSpy)
|
||||
@@ -101,11 +106,11 @@ public class SchedulerScheduleOnDatesTest extends AbstractSchedulerTest {
|
||||
date.add((String) execution.getTrigger().getVariables().get("date"));
|
||||
executionId.add(execution.getId());
|
||||
|
||||
queueCount.countDown();
|
||||
if (execution.getState().getCurrent() == State.Type.CREATED) {
|
||||
executionQueue.emit(execution.withState(State.Type.SUCCESS));
|
||||
}
|
||||
assertThat(execution.getFlowId(), is(flow.getId()));
|
||||
queueCount.countDown();
|
||||
}));
|
||||
|
||||
scheduler.run();
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
package io.kestra.core.schedulers;
|
||||
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.flows.PluginDefault;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.jdbc.runner.JdbcScheduler;
|
||||
import io.kestra.plugin.core.condition.Expression;
|
||||
@@ -30,8 +33,7 @@ import java.util.concurrent.TimeUnit;
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
@Inject
|
||||
@@ -43,6 +45,9 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
@Inject
|
||||
protected SchedulerExecutionStateInterface executionState;
|
||||
|
||||
@Inject
|
||||
protected FlowRepositoryInterface flowRepository;
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
|
||||
protected QueueInterface<LogEntry> logQueue;
|
||||
@@ -93,6 +98,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
FlowWithSource invalid = createScheduleFlow("Asia/Delhi", "schedule", true);
|
||||
FlowWithSource flow = createScheduleFlow("Europe/Paris", "schedule", false);
|
||||
|
||||
flowRepository.create(flow, flow.generateSource(), flow);
|
||||
doReturn(List.of(invalid, flow))
|
||||
.when(flowListenersServiceSpy)
|
||||
.flows();
|
||||
@@ -421,6 +427,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
.when(flowListenersServiceSpy)
|
||||
.flows();
|
||||
|
||||
flowRepository.create(flow, flow.generateSource(), flow);
|
||||
// to avoid waiting too much before a trigger execution, we add a last trigger with a date now - 1m.
|
||||
Trigger lastTrigger = Trigger
|
||||
.builder()
|
||||
@@ -515,4 +522,146 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void recoverLASTLongRunningExecution() throws Exception {
|
||||
// mock flow listeners
|
||||
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
|
||||
String triggerId = "recoverLASTLongRunningExecution";
|
||||
Schedule schedule = Schedule.builder().id(triggerId).type(Schedule.class.getName()).cron("*/5 * * * * *").withSeconds(true).build();
|
||||
FlowWithSource flow = createLongRunningFlow(
|
||||
Collections.singletonList(schedule),
|
||||
List.of(
|
||||
PluginDefault.builder()
|
||||
.type(Schedule.class.getName())
|
||||
.values(Map.of("recoverMissedSchedules", "LAST"))
|
||||
.build()
|
||||
)
|
||||
);
|
||||
flowRepository.create(flow, flow.generateSource(), flow);
|
||||
doReturn(List.of(flow))
|
||||
.when(flowListenersServiceSpy)
|
||||
.flows();
|
||||
|
||||
// to avoid waiting too much before a trigger execution, we add a last trigger with a date now - 1m.
|
||||
Trigger lastTrigger = Trigger
|
||||
.builder()
|
||||
.triggerId(triggerId)
|
||||
.flowId(flow.getId())
|
||||
.namespace(flow.getNamespace())
|
||||
.date(ZonedDateTime.now().minusMinutes(1L))
|
||||
.nextExecutionDate(ZonedDateTime.now().truncatedTo(ChronoUnit.MINUTES))
|
||||
.build();
|
||||
triggerState.create(lastTrigger);
|
||||
|
||||
CountDownLatch queueCount = new CountDownLatch(1);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
// wait for execution
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, throwConsumer(either -> {
|
||||
Execution execution = either.getLeft();
|
||||
assertThat(execution.getFlowId(), is(flow.getId()));
|
||||
|
||||
queueCount.countDown();
|
||||
if (execution.getState().getCurrent() == State.Type.CREATED) {
|
||||
Thread.sleep(11000);
|
||||
executionQueue.emit(execution.withState(State.Type.SUCCESS)
|
||||
.toBuilder()
|
||||
.taskRunList(List.of(TaskRun.builder()
|
||||
.id("test")
|
||||
.executionId(execution.getId())
|
||||
.state(State.of(State.Type.SUCCESS,
|
||||
List.of(new State.History(
|
||||
State.Type.SUCCESS,
|
||||
lastTrigger.getNextExecutionDate().plusMinutes(3).toInstant()
|
||||
))))
|
||||
.build()))
|
||||
.build()
|
||||
);
|
||||
}
|
||||
}));
|
||||
|
||||
scheduler.run();
|
||||
|
||||
queueCount.await(3, TimeUnit.MINUTES);
|
||||
receive.blockLast();
|
||||
|
||||
assertThat(queueCount.getCount(), is(0L));
|
||||
|
||||
Trigger trigger = Trigger.of(flow, schedule);
|
||||
Await.until(() -> this.triggerState.findLast(trigger).map(t -> t.getNextExecutionDate().isAfter(lastTrigger.getNextExecutionDate().plusSeconds(10))).orElse(false).booleanValue(), Duration.ofMillis(100), Duration.ofSeconds(20));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void recoverNONELongRunningExecution() throws Exception {
|
||||
// mock flow listeners
|
||||
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
|
||||
String triggerId = "recoverNONELongRunningExecution";
|
||||
Schedule schedule = Schedule.builder().id(triggerId).type(Schedule.class.getName()).cron("*/5 * * * * *").withSeconds(true).build();
|
||||
FlowWithSource flow = createLongRunningFlow(
|
||||
Collections.singletonList(schedule),
|
||||
List.of(
|
||||
PluginDefault.builder()
|
||||
.type(Schedule.class.getName())
|
||||
.values(Map.of("recoverMissedSchedules", "LAST"))
|
||||
.build()
|
||||
)
|
||||
);
|
||||
flowRepository.create(flow, flow.generateSource(), flow);
|
||||
doReturn(List.of(flow))
|
||||
.when(flowListenersServiceSpy)
|
||||
.flows();
|
||||
|
||||
// to avoid waiting too much before a trigger execution, we add a last trigger with a date now - 1m.
|
||||
Trigger lastTrigger = Trigger
|
||||
.builder()
|
||||
.triggerId(triggerId)
|
||||
.flowId(flow.getId())
|
||||
.namespace(flow.getNamespace())
|
||||
.date(ZonedDateTime.now().minusMinutes(1L))
|
||||
.nextExecutionDate(ZonedDateTime.now().truncatedTo(ChronoUnit.MINUTES))
|
||||
.build();
|
||||
triggerState.create(lastTrigger);
|
||||
|
||||
CountDownLatch queueCount = new CountDownLatch(1);
|
||||
|
||||
// scheduler
|
||||
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
|
||||
// wait for execution
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, throwConsumer(either -> {
|
||||
Execution execution = either.getLeft();
|
||||
assertThat(execution.getFlowId(), is(flow.getId()));
|
||||
|
||||
queueCount.countDown();
|
||||
if (execution.getState().getCurrent() == State.Type.CREATED) {
|
||||
Thread.sleep(10000);
|
||||
executionQueue.emit(execution.withState(State.Type.SUCCESS)
|
||||
.toBuilder()
|
||||
.taskRunList(List.of(TaskRun.builder()
|
||||
.id("test")
|
||||
.executionId(execution.getId())
|
||||
.state(State.of(State.Type.SUCCESS,
|
||||
List.of(new State.History(
|
||||
State.Type.SUCCESS,
|
||||
lastTrigger.getNextExecutionDate().plusMinutes(3).toInstant()
|
||||
))))
|
||||
.build()))
|
||||
.build()
|
||||
);
|
||||
}
|
||||
}));
|
||||
|
||||
scheduler.run();
|
||||
|
||||
queueCount.await(3, TimeUnit.MINUTES);
|
||||
receive.blockLast();
|
||||
|
||||
assertThat(queueCount.getCount(), is(0L));
|
||||
|
||||
Trigger trigger = Trigger.of(flow, schedule);
|
||||
Await.until(() -> this.triggerState.findLast(trigger).map(t -> t.getNextExecutionDate().isAfter(lastTrigger.getNextExecutionDate().plusSeconds(10))).orElse(false).booleanValue(), Duration.ofMillis(100), Duration.ofSeconds(20));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3,9 +3,8 @@ package io.kestra.core.schedulers;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.PluginDefault;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.tasks.WorkerGroup;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.runners.FlowListeners;
|
||||
import io.kestra.core.runners.TestMethodScopedWorker;
|
||||
import io.kestra.core.runners.Worker;
|
||||
@@ -17,8 +16,6 @@ import org.junit.jupiter.api.Test;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@@ -27,6 +24,7 @@ import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
import static org.mockito.Mockito.any;
|
||||
|
||||
public class SchedulerThreadTest extends AbstractSchedulerTest {
|
||||
@Inject
|
||||
@@ -35,9 +33,13 @@ public class SchedulerThreadTest extends AbstractSchedulerTest {
|
||||
@Inject
|
||||
protected SchedulerExecutionStateInterface executionState;
|
||||
|
||||
@Inject
|
||||
protected FlowRepositoryInterface flowRepository;
|
||||
|
||||
@Test
|
||||
void thread() throws Exception {
|
||||
Flow flow = createThreadFlow();
|
||||
flowRepository.create(flow, flow.generateSource(), flow);
|
||||
CountDownLatch queueCount = new CountDownLatch(2);
|
||||
|
||||
// wait for execution
|
||||
|
||||
@@ -5,6 +5,7 @@ import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionKilled;
|
||||
import io.kestra.core.models.executions.ExecutionKilledTrigger;
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
@@ -13,12 +14,16 @@ import io.kestra.core.models.triggers.TriggerContext;
|
||||
import io.kestra.core.models.triggers.TriggerService;
|
||||
import io.kestra.core.queues.QueueFactoryInterface;
|
||||
import io.kestra.core.queues.QueueInterface;
|
||||
import io.kestra.core.runners.*;
|
||||
import io.kestra.jdbc.runner.JdbcScheduler;
|
||||
import io.kestra.plugin.core.debug.Return;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.runners.FlowListeners;
|
||||
import io.kestra.core.runners.TestMethodScopedWorker;
|
||||
import io.kestra.core.runners.Worker;
|
||||
import io.kestra.core.runners.WorkerTrigger;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.jdbc.runner.JdbcScheduler;
|
||||
import io.kestra.plugin.core.debug.Return;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import lombok.*;
|
||||
@@ -27,14 +32,16 @@ import org.junit.jupiter.api.Test;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.*;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
|
||||
|
||||
public class SchedulerTriggerChangeTest extends AbstractSchedulerTest {
|
||||
@Inject
|
||||
@@ -52,6 +59,10 @@ public class SchedulerTriggerChangeTest extends AbstractSchedulerTest {
|
||||
@Named(QueueFactoryInterface.KILL_NAMED)
|
||||
protected QueueInterface<ExecutionKilled> killedQueue;
|
||||
|
||||
@Inject
|
||||
protected FlowRepositoryInterface flowRepository;
|
||||
|
||||
|
||||
public static FlowWithSource createFlow(Duration sleep) {
|
||||
SleepTriggerTest schedule = SleepTriggerTest.builder()
|
||||
.id("sleep")
|
||||
@@ -59,7 +70,7 @@ public class SchedulerTriggerChangeTest extends AbstractSchedulerTest {
|
||||
.sleep(sleep)
|
||||
.build();
|
||||
|
||||
return FlowWithSource.builder()
|
||||
Flow flow = Flow.builder()
|
||||
.id(SchedulerTriggerChangeTest.class.getSimpleName())
|
||||
.namespace("io.kestra.unittest")
|
||||
.revision(1)
|
||||
@@ -71,6 +82,8 @@ public class SchedulerTriggerChangeTest extends AbstractSchedulerTest {
|
||||
.build())
|
||||
)
|
||||
.build();
|
||||
|
||||
return FlowWithSource.of(flow, flow.generateSource());
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -106,6 +119,7 @@ public class SchedulerTriggerChangeTest extends AbstractSchedulerTest {
|
||||
|
||||
// emit a flow trigger to be started
|
||||
FlowWithSource flow = createFlow(Duration.ofSeconds(10));
|
||||
flowRepository.create(flow, flow.generateSource(), flow);
|
||||
flowQueue.emit(flow);
|
||||
|
||||
Await.until(() -> STARTED_COUNT == 1, Duration.ofMillis(100), Duration.ofSeconds(30));
|
||||
|
||||
@@ -22,6 +22,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.Assert.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@KestraTest
|
||||
@@ -204,26 +205,6 @@ class FlowServiceTest {
|
||||
assertThat(collect.stream().filter(flow -> flow.getId().equals("test3")).findFirst().orElseThrow().getRevision(), is(3));
|
||||
}
|
||||
|
||||
@Test
|
||||
void warnings() {
|
||||
Flow flow = create("test", "test", 1).toBuilder()
|
||||
.namespace("system")
|
||||
.triggers(List.of(
|
||||
io.kestra.plugin.core.trigger.Flow.builder()
|
||||
.id("flow-trigger")
|
||||
.type(io.kestra.plugin.core.trigger.Flow.class.getName())
|
||||
.build()
|
||||
))
|
||||
.build();
|
||||
|
||||
List<String> warnings = flowService.warnings(flow);
|
||||
|
||||
assertThat(warnings.size(), is(1));
|
||||
assertThat(warnings, containsInAnyOrder(
|
||||
"This flow will be triggered for EVERY execution of EVERY flow on your instance. We recommend adding the preconditions property to the Flow trigger 'flow-trigger'."
|
||||
));
|
||||
}
|
||||
|
||||
@Test
|
||||
void aliases() {
|
||||
List<FlowService.Relocation> warnings = flowService.relocations("""
|
||||
@@ -323,7 +304,7 @@ class FlowServiceTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
void checkValidSubflowsNotFound() {
|
||||
void checkSubflowNotFound() {
|
||||
Flow flow = create("mainFlow", "task", 1).toBuilder()
|
||||
.tasks(List.of(
|
||||
io.kestra.plugin.core.flow.Subflow.builder()
|
||||
@@ -335,11 +316,30 @@ class FlowServiceTest {
|
||||
))
|
||||
.build();
|
||||
|
||||
ConstraintViolationException exception = assertThrows(ConstraintViolationException.class, () -> {
|
||||
flowService.checkValidSubflows(flow);
|
||||
});
|
||||
List<String> exceptions = flowService.checkValidSubflows(flow, null);
|
||||
|
||||
assertThat(exception.getConstraintViolations().size(), is(1));
|
||||
assertThat(exception.getConstraintViolations().iterator().next().getMessage(), is("The subflow 'nonExistentSubflow' not found in namespace 'io.kestra.unittest'."));
|
||||
assertThat(exceptions.size(), is(1));
|
||||
assertThat(exceptions.iterator().next(), is("The subflow 'nonExistentSubflow' not found in namespace 'io.kestra.unittest'."));
|
||||
}
|
||||
|
||||
@Test
|
||||
void checkValidSubflow() {
|
||||
Flow subflow = create("existingSubflow", "task", 1);
|
||||
flowRepository.create(subflow, subflow.generateSource(), subflow);
|
||||
|
||||
Flow flow = create("mainFlow", "task", 1).toBuilder()
|
||||
.tasks(List.of(
|
||||
io.kestra.plugin.core.flow.Subflow.builder()
|
||||
.id("subflowTask")
|
||||
.type(io.kestra.plugin.core.flow.Subflow.class.getName())
|
||||
.namespace("io.kestra.unittest")
|
||||
.flowId("existingSubflow")
|
||||
.build()
|
||||
))
|
||||
.build();
|
||||
|
||||
List<String> exceptions = flowService.checkValidSubflows(flow, null);
|
||||
|
||||
assertThat(exceptions.size(), is(0));
|
||||
}
|
||||
}
|
||||
@@ -10,11 +10,14 @@ import io.kestra.plugin.core.trigger.Schedule;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@KestraTest
|
||||
class LabelServiceTest {
|
||||
@@ -65,4 +68,15 @@ class LabelServiceTest {
|
||||
assertThat(labels, hasSize(2));
|
||||
assertThat(labels, hasItems(new Label("key", "value"), new Label("scheduleLabel", "scheduleValue")));
|
||||
}
|
||||
|
||||
@Test
|
||||
void containsAll() {
|
||||
assertFalse(LabelService.containsAll(null, List.of(new Label("key", "value"))));
|
||||
assertFalse(LabelService.containsAll(Collections.emptyList(), List.of(new Label("key", "value"))));
|
||||
assertFalse(LabelService.containsAll(List.of(new Label("key1", "value1")), List.of(new Label("key2", "value2"))));
|
||||
assertTrue(LabelService.containsAll(List.of(new Label("key", "value")), null));
|
||||
assertTrue(LabelService.containsAll(List.of(new Label("key", "value")), Collections.emptyList()));
|
||||
assertTrue(LabelService.containsAll(List.of(new Label("key1", "value1")), List.of(new Label("key1", "value1"))));
|
||||
assertTrue(LabelService.containsAll(List.of(new Label("key1", "value1"), new Label("key2", "value2")), List.of(new Label("key1", "value1"))));
|
||||
}
|
||||
}
|
||||
@@ -51,6 +51,13 @@ class PluginDefaultServiceTest {
|
||||
Map.of("id", "my-task", "type", "io.kestra.test")
|
||||
)
|
||||
);
|
||||
public static final String TEST_LOG_FLOW_SOURCE = """
|
||||
id: test
|
||||
namespace: io.kestra.unittest
|
||||
tasks:
|
||||
- id: log
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
""";
|
||||
|
||||
@Inject
|
||||
private PluginDefaultService pluginDefaultService;
|
||||
@@ -58,6 +65,19 @@ class PluginDefaultServiceTest {
|
||||
@Inject
|
||||
private YamlParser yamlParser;
|
||||
|
||||
@Test
|
||||
void shouldInjectGivenFlowWithNullSource() {
|
||||
// Given
|
||||
FlowWithSource flow = yamlParser.parse(TEST_LOG_FLOW_SOURCE, FlowWithSource.class);
|
||||
|
||||
// When
|
||||
FlowWithSource result = pluginDefaultService.injectDefaults(flow);
|
||||
|
||||
// Then
|
||||
Log task = (Log) result.getTasks().getFirst();
|
||||
assertThat(task.getMessage(), is("This is a default message"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldInjectGivenDefaultsIncludingType() {
|
||||
// Given
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
package io.kestra.core.storages;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import io.kestra.core.exceptions.KestraRuntimeException;
|
||||
import io.kestra.core.plugins.DefaultPluginRegistry;
|
||||
import io.kestra.storage.local.LocalStorage;
|
||||
@@ -24,21 +28,28 @@ class StorageInterfaceFactoryTest {
|
||||
void shouldReturnStorageGivenValidId() {
|
||||
StorageInterface storage = StorageInterfaceFactory.make(registry, "local", Map.of("basePath", "/tmp/kestra"), validator);
|
||||
Assertions.assertNotNull(storage);
|
||||
Assertions.assertEquals(LocalStorage.class.getName(), storage.getType());
|
||||
assertEquals(LocalStorage.class.getName(), storage.getType());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldFailedGivenInvalidId() {
|
||||
Assertions.assertThrows(KestraRuntimeException.class,
|
||||
assertThrows(KestraRuntimeException.class,
|
||||
() -> StorageInterfaceFactory.make(registry, "invalid", Map.of(), validator));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldFailedGivenInvalidConfig() {
|
||||
KestraRuntimeException e = Assertions.assertThrows(KestraRuntimeException.class,
|
||||
KestraRuntimeException e = assertThrows(KestraRuntimeException.class,
|
||||
() -> StorageInterfaceFactory.make(registry, "local", Map.of(), validator));
|
||||
|
||||
Assertions.assertTrue(e.getCause() instanceof ConstraintViolationException);
|
||||
Assertions.assertEquals("basePath: must not be null", e.getCause().getMessage());
|
||||
assertTrue(e.getCause() instanceof ConstraintViolationException);
|
||||
assertEquals("basePath: must not be null", e.getCause().getMessage());
|
||||
}
|
||||
|
||||
@Test
|
||||
void should_not_found_unknown_storage(){
|
||||
KestraRuntimeException e = assertThrows(KestraRuntimeException.class,
|
||||
() -> StorageInterfaceFactory.make(registry, "unknown", Map.of(), validator));
|
||||
assertEquals("No storage interface can be found for 'kestra.storage.type=unknown'. Supported types are: [local]", e.getMessage());
|
||||
}
|
||||
}
|
||||
@@ -49,6 +49,36 @@ public class FlowCaseTest {
|
||||
this.run("OK", State.Type.SUCCESS, State.Type.SUCCESS, 2, "default > amazing", false);
|
||||
}
|
||||
|
||||
public void oldTaskName() throws Exception {
|
||||
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
AtomicReference<Execution> triggered = new AtomicReference<>();
|
||||
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
|
||||
Execution execution = either.getLeft();
|
||||
if (execution.getFlowId().equals("minimal") && execution.getState().getCurrent().isTerminated()) {
|
||||
triggered.set(execution);
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Execution execution = runnerUtils.runOne(
|
||||
null,
|
||||
"io.kestra.tests",
|
||||
"subflow-old-task-name"
|
||||
);
|
||||
|
||||
countDownLatch.await(1, TimeUnit.MINUTES);
|
||||
receive.blockLast();
|
||||
|
||||
assertThat(execution.getTaskRunList(), hasSize(1));
|
||||
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
|
||||
assertThat(execution.getTaskRunList().getFirst().getOutputs().get("executionId"), is(triggered.get().getId()));
|
||||
assertThat(triggered.get().getTrigger().getType(), is("io.kestra.core.tasks.flows.Subflow"));
|
||||
assertThat(triggered.get().getTrigger().getVariables().get("executionId"), is(execution.getId()));
|
||||
assertThat(triggered.get().getTrigger().getVariables().get("flowId"), is(execution.getFlowId()));
|
||||
assertThat(triggered.get().getTrigger().getVariables().get("namespace"), is(execution.getNamespace()));
|
||||
}
|
||||
|
||||
@SuppressWarnings({"ResultOfMethodCallIgnored", "unchecked"})
|
||||
void run(String input, State.Type fromState, State.Type triggerState, int count, String outputs, boolean testInherited) throws Exception {
|
||||
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
@@ -99,21 +129,24 @@ public class FlowCaseTest {
|
||||
assertThat(triggered.get().getState().getCurrent(), is(triggerState));
|
||||
|
||||
if (testInherited) {
|
||||
assertThat(triggered.get().getLabels().size(), is(5));
|
||||
assertThat(triggered.get().getLabels().size(), is(6));
|
||||
assertThat(triggered.get().getLabels(), hasItems(
|
||||
new Label(Label.CORRELATION_ID, execution.getId()),
|
||||
new Label("mainFlowExecutionLabel", "execFoo"),
|
||||
new Label("mainFlowLabel", "flowFoo"),
|
||||
new Label("launchTaskLabel", "launchFoo"),
|
||||
new Label("switchFlowLabel", "switchFoo")
|
||||
new Label("switchFlowLabel", "switchFoo"),
|
||||
new Label("overriding", "child")
|
||||
));
|
||||
} else {
|
||||
assertThat(triggered.get().getLabels().size(), is(3));
|
||||
assertThat(triggered.get().getLabels().size(), is(4));
|
||||
assertThat(triggered.get().getLabels(), hasItems(
|
||||
new Label(Label.CORRELATION_ID, execution.getId()),
|
||||
new Label("launchTaskLabel", "launchFoo"),
|
||||
new Label("switchFlowLabel", "switchFoo")
|
||||
new Label("switchFlowLabel", "switchFoo"),
|
||||
new Label("overriding", "child")
|
||||
));
|
||||
assertThat(triggered.get().getLabels(), not(hasItems(new Label("inherited", "label"))));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ import org.junit.jupiter.api.Test;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
@KestraTest(startRunner = true)
|
||||
public class FlowTest {
|
||||
class FlowTest {
|
||||
@Inject
|
||||
FlowCaseTest flowCaseTest;
|
||||
|
||||
@@ -15,7 +15,7 @@ public class FlowTest {
|
||||
@LoadFlows({"flows/valids/task-flow.yaml",
|
||||
"flows/valids/task-flow-inherited-labels.yaml",
|
||||
"flows/valids/switch.yaml"})
|
||||
public void waitSuccess() throws Exception {
|
||||
void waitSuccess() throws Exception {
|
||||
flowCaseTest.waitSuccess();
|
||||
}
|
||||
|
||||
@@ -23,7 +23,7 @@ public class FlowTest {
|
||||
@LoadFlows({"flows/valids/task-flow.yaml",
|
||||
"flows/valids/task-flow-inherited-labels.yaml",
|
||||
"flows/valids/switch.yaml"})
|
||||
public void waitFailed() throws Exception {
|
||||
void waitFailed() throws Exception {
|
||||
flowCaseTest.waitFailed();
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ public class FlowTest {
|
||||
@LoadFlows({"flows/valids/task-flow.yaml",
|
||||
"flows/valids/task-flow-inherited-labels.yaml",
|
||||
"flows/valids/switch.yaml"})
|
||||
public void invalidOutputs() throws Exception {
|
||||
void invalidOutputs() throws Exception {
|
||||
flowCaseTest.invalidOutputs();
|
||||
}
|
||||
|
||||
@@ -39,7 +39,14 @@ public class FlowTest {
|
||||
@LoadFlows({"flows/valids/task-flow.yaml",
|
||||
"flows/valids/task-flow-inherited-labels.yaml",
|
||||
"flows/valids/switch.yaml"})
|
||||
public void noLabels() throws Exception {
|
||||
void noLabels() throws Exception {
|
||||
flowCaseTest.noLabels();
|
||||
}
|
||||
|
||||
@Test
|
||||
@LoadFlows({"flows/valids/subflow-old-task-name.yaml",
|
||||
"flows/valids/minimal.yaml"})
|
||||
void oldTaskName() throws Exception {
|
||||
flowCaseTest.oldTaskName();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,6 +37,7 @@ import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static io.kestra.core.models.flows.State.Type.FAILED;
|
||||
import static io.kestra.core.utils.Rethrow.throwRunnable;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
@@ -136,7 +137,6 @@ public class ForEachItemCaseTest {
|
||||
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
|
||||
Execution execution = either.getLeft();
|
||||
if (execution.getFlowId().equals("for-each-item-subflow")) {
|
||||
log.info("Received sub-execution " + execution.getId() + " with status " + execution.getState().getCurrent());
|
||||
if (execution.getState().getCurrent().isTerminated()) {
|
||||
triggered.set(execution);
|
||||
countDownLatch.countDown();
|
||||
@@ -204,8 +204,9 @@ public class ForEachItemCaseTest {
|
||||
// assert on the main flow execution
|
||||
assertThat(execution.getTaskRunList(), hasSize(3));
|
||||
assertThat(execution.getTaskRunList().get(2).getAttempts(), hasSize(1));
|
||||
assertThat(execution.getTaskRunList().get(2).getAttempts().getFirst().getState().getCurrent(), is(State.Type.FAILED));
|
||||
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
|
||||
assertThat(execution.getTaskRunList().get(2).getAttempts().getFirst().getState().getCurrent(), is(
|
||||
FAILED));
|
||||
assertThat(execution.getState().getCurrent(), is(FAILED));
|
||||
Map<String, Object> outputs = execution.getTaskRunList().get(2).getOutputs();
|
||||
assertThat(outputs.get("numberOfBatches"), is(26));
|
||||
assertThat(outputs.get("iterations"), notNullValue());
|
||||
@@ -215,7 +216,7 @@ public class ForEachItemCaseTest {
|
||||
assertThat(iterations.get("FAILED"), is(26));
|
||||
|
||||
// assert on the last subflow execution
|
||||
assertThat(triggered.get().getState().getCurrent(), is(State.Type.FAILED));
|
||||
assertThat(triggered.get().getState().getCurrent(), is(FAILED));
|
||||
assertThat(triggered.get().getFlowId(), is("for-each-item-subflow-failed"));
|
||||
assertThat((String) triggered.get().getInputs().get("items"), matchesRegex("kestra:///io/kestra/tests/for-each-item-failed/executions/.*/tasks/each-split/.*\\.txt"));
|
||||
assertThat(triggered.get().getTaskRunList(), hasSize(1));
|
||||
@@ -276,7 +277,7 @@ public class ForEachItemCaseTest {
|
||||
}
|
||||
|
||||
public void restartForEachItem() throws Exception {
|
||||
CountDownLatch countDownLatch = new CountDownLatch(26);
|
||||
CountDownLatch countDownLatch = new CountDownLatch(6);
|
||||
Flux<Execution> receiveSubflows = TestsUtils.receive(executionQueue, either -> {
|
||||
Execution subflowExecution = either.getLeft();
|
||||
if (subflowExecution.getFlowId().equals("restart-child") && subflowExecution.getState().getCurrent().isFailed()) {
|
||||
@@ -285,29 +286,33 @@ public class ForEachItemCaseTest {
|
||||
});
|
||||
|
||||
URI file = storageUpload();
|
||||
Map<String, Object> inputs = Map.of("file", file.toString(), "batch", 4);
|
||||
Map<String, Object> inputs = Map.of("file", file.toString(), "batch", 20);
|
||||
Execution execution = runnerUtils.runOne(null, TEST_NAMESPACE, "restart-for-each-item", null,
|
||||
(flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs),
|
||||
Duration.ofSeconds(30));
|
||||
assertThat(execution.getTaskRunList(), hasSize(3));
|
||||
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
|
||||
assertThat(execution.getState().getCurrent(), is(FAILED));
|
||||
|
||||
// here we must have 1 failed subflows
|
||||
assertTrue(countDownLatch.await(1, TimeUnit.MINUTES));
|
||||
receiveSubflows.blockLast();
|
||||
|
||||
CountDownLatch successLatch = new CountDownLatch(26);
|
||||
CountDownLatch successLatch = new CountDownLatch(6);
|
||||
receiveSubflows = TestsUtils.receive(executionQueue, either -> {
|
||||
Execution subflowExecution = either.getLeft();
|
||||
if (subflowExecution.getFlowId().equals("restart-child") && subflowExecution.getState().getCurrent().isSuccess()) {
|
||||
successLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
//Wait before restarting until the failed execution tasks are persisted.
|
||||
Thread.sleep(1000L);
|
||||
|
||||
Execution restarted = executionService.restart(execution, null);
|
||||
execution = runnerUtils.awaitExecution(
|
||||
e -> e.getState().getCurrent() == State.Type.SUCCESS && e.getFlowId().equals("restart-for-each-item"),
|
||||
throwRunnable(() -> executionQueue.emit(restarted)),
|
||||
Duration.ofSeconds(10)
|
||||
Duration.ofSeconds(20)
|
||||
);
|
||||
assertThat(execution.getTaskRunList(), hasSize(4));
|
||||
assertTrue(successLatch.await(1, TimeUnit.MINUTES));
|
||||
|
||||
@@ -16,7 +16,7 @@ import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junitpioneer.jupiter.RetryingTest;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import java.time.Duration;
|
||||
@@ -43,7 +43,7 @@ class TimeoutTest {
|
||||
@Inject
|
||||
private RunnerUtils runnerUtils;
|
||||
|
||||
@Test
|
||||
@RetryingTest(5) // Flaky on CI but never locally even with 100 repetitions
|
||||
void timeout() throws TimeoutException, QueueException {
|
||||
List<LogEntry> logs = new CopyOnWriteArrayList<>();
|
||||
Flux<LogEntry> receive = TestsUtils.receive(workerTaskLogQueue, either -> logs.add(either.getLeft()));
|
||||
|
||||
@@ -20,6 +20,7 @@ import io.micronaut.http.*;
|
||||
import io.micronaut.http.annotation.*;
|
||||
import io.micronaut.http.multipart.StreamingFileUpload;
|
||||
import io.micronaut.runtime.server.EmbeddedServer;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -188,6 +189,37 @@ class RequestTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void failedPost() throws Exception {
|
||||
try (
|
||||
ApplicationContext applicationContext = ApplicationContext.run();
|
||||
EmbeddedServer server = applicationContext.getBean(EmbeddedServer.class).start();
|
||||
|
||||
) {
|
||||
Request task = Request.builder()
|
||||
.id(RequestTest.class.getSimpleName())
|
||||
.type(RequestTest.class.getName())
|
||||
.uri(Property.of(server.getURL().toString() + "/markdown"))
|
||||
.method(Property.of("POST"))
|
||||
.body(Property.of("# hello web!"))
|
||||
.contentType(Property.of("text/markdown"))
|
||||
.options(HttpConfiguration.builder().defaultCharset(Property.of(null)).build())
|
||||
.build();
|
||||
|
||||
RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, task, ImmutableMap.of());
|
||||
|
||||
HttpClientResponseException exception = assertThrows(
|
||||
HttpClientResponseException.class,
|
||||
() -> task.run(runContext)
|
||||
);
|
||||
|
||||
assertThat(exception.getResponse().getStatus().getCode(), is(417));
|
||||
assertThat(exception.getMessage(), containsString("hello world"));
|
||||
byte[] content = ((io.kestra.core.http.HttpRequest.ByteArrayRequestBody) exception.getRequest().getBody()).getContent();
|
||||
assertThat(new String(content) , containsString("hello web"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void selfSigned() throws Exception {
|
||||
try (
|
||||
@@ -437,6 +469,33 @@ class RequestTest {
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test
|
||||
void basicAuthOld() throws Exception {
|
||||
try (
|
||||
ApplicationContext applicationContext = ApplicationContext.run();
|
||||
EmbeddedServer server = applicationContext.getBean(EmbeddedServer.class).start();
|
||||
) {
|
||||
Request task = Request.builder()
|
||||
.id(RequestTest.class.getSimpleName())
|
||||
.type(RequestTest.class.getName())
|
||||
.uri(Property.of(server.getURL().toString() + "/auth/basic"))
|
||||
.options(HttpConfiguration.builder()
|
||||
.basicAuthUser("John")
|
||||
.basicAuthPassword("p4ss")
|
||||
.build()
|
||||
)
|
||||
.build();
|
||||
|
||||
RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, task, Map.of());
|
||||
|
||||
Request.Output output = task.run(runContext);
|
||||
|
||||
assertThat(output.getBody(), is("{\"hello\":\"John\"}"));
|
||||
assertThat(output.getCode(), is(200));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void bearerAuth() throws Exception {
|
||||
try (
|
||||
@@ -464,6 +523,54 @@ class RequestTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void specialContentType() throws Exception {
|
||||
try (
|
||||
ApplicationContext applicationContext = ApplicationContext.run();
|
||||
EmbeddedServer server = applicationContext.getBean(EmbeddedServer.class).start();
|
||||
|
||||
) {
|
||||
Request task = Request.builder()
|
||||
.id(RequestTest.class.getSimpleName())
|
||||
.type(RequestTest.class.getName())
|
||||
.uri(Property.of(server.getURL().toString() + "/content-type"))
|
||||
.method(Property.of("POST"))
|
||||
.body(Property.of("{}"))
|
||||
.contentType(Property.of("application/vnd.campaignsexport.v1+json"))
|
||||
.options(HttpConfiguration.builder().logs(HttpConfiguration.LoggingType.values()).defaultCharset(null).build())
|
||||
.build();
|
||||
|
||||
RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, task, ImmutableMap.of());
|
||||
|
||||
Request.Output output = task.run(runContext);
|
||||
|
||||
assertThat(output.getBody(), is("application/vnd.campaignsexport.v1+json"));
|
||||
assertThat(output.getCode(), is(200));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void spaceInURI() throws Exception {
|
||||
try (
|
||||
ApplicationContext applicationContext = ApplicationContext.run();
|
||||
EmbeddedServer server = applicationContext.getBean(EmbeddedServer.class).start();
|
||||
|
||||
) {
|
||||
Request task = Request.builder()
|
||||
.id(RequestTest.class.getSimpleName())
|
||||
.type(RequestTest.class.getName())
|
||||
.uri(Property.of(server.getURL().toString() + "/uri with space"))
|
||||
.build();
|
||||
|
||||
RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, task, ImmutableMap.of());
|
||||
|
||||
Request.Output output = task.run(runContext);
|
||||
|
||||
assertThat(output.getBody(), is("Hello World"));
|
||||
assertThat(output.getCode(), is(200));
|
||||
}
|
||||
}
|
||||
|
||||
@Controller
|
||||
static class MockController {
|
||||
@Get("/hello")
|
||||
@@ -471,6 +578,13 @@ class RequestTest {
|
||||
return HttpResponse.ok("{ \"hello\": \"world\" }");
|
||||
}
|
||||
|
||||
@Post("content-type")
|
||||
@Consumes("application/vnd.campaignsexport.v1+json")
|
||||
@Produces(MediaType.TEXT_PLAIN)
|
||||
public io.micronaut.http.HttpResponse<String> contentType(io.micronaut.http.HttpRequest<?> request, @Nullable @Body Map<String, String> body) {
|
||||
return io.micronaut.http.HttpResponse.ok(request.getContentType().orElseThrow().toString());
|
||||
}
|
||||
|
||||
@Head("/hello")
|
||||
HttpResponse<String> head() {
|
||||
return HttpResponse.ok();
|
||||
@@ -481,6 +595,13 @@ class RequestTest {
|
||||
return HttpResponse.status(HttpStatus.EXPECTATION_FAILED).body("{ \"hello\": \"world\" }");
|
||||
}
|
||||
|
||||
@Post("/markdown")
|
||||
@Consumes(MediaType.TEXT_MARKDOWN)
|
||||
@Produces(MediaType.TEXT_MARKDOWN)
|
||||
HttpResponse<String> postMarkdown() {
|
||||
return HttpResponse.status(HttpStatus.EXPECTATION_FAILED).body("# hello world");
|
||||
}
|
||||
|
||||
@Get("/redirect")
|
||||
HttpResponse<String> redirect() {
|
||||
return HttpResponse.redirect(URI.create("/hello"));
|
||||
@@ -537,5 +658,10 @@ class RequestTest {
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
@Get("/uri%20with%20space")
|
||||
HttpResponse<String> uriWithSpace() {
|
||||
return HttpResponse.ok("Hello World");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,23 +1,28 @@
|
||||
package io.kestra.plugin.core.log;
|
||||
|
||||
import io.kestra.core.junit.annotations.KestraTest;
|
||||
import io.kestra.core.junit.annotations.LoadFlows;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.repositories.LogRepositoryInterface;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import io.kestra.core.runners.RunnerUtils;
|
||||
import jakarta.inject.Inject;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.stream.Stream;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@KestraTest
|
||||
@KestraTest(startRunner = true)
|
||||
class PurgeLogsTest {
|
||||
@Inject
|
||||
private RunContextFactory runContextFactory;
|
||||
@@ -25,8 +30,12 @@ class PurgeLogsTest {
|
||||
@Inject
|
||||
private LogRepositoryInterface logRepository;
|
||||
|
||||
@Inject
|
||||
protected RunnerUtils runnerUtils;
|
||||
|
||||
@Test
|
||||
void run() throws Exception {
|
||||
@LoadFlows("flows/valids/purge_logs_no_arguments.yaml")
|
||||
void run_with_no_arguments() throws Exception {
|
||||
// create an execution to delete
|
||||
var logEntry = LogEntry.builder()
|
||||
.namespace("namespace")
|
||||
@@ -37,12 +46,71 @@ class PurgeLogsTest {
|
||||
.build();
|
||||
logRepository.save(logEntry);
|
||||
|
||||
var purge = PurgeLogs.builder()
|
||||
.endDate(Property.of(ZonedDateTime.now().plusMinutes(1).format(DateTimeFormatter.ISO_ZONED_DATE_TIME)))
|
||||
.build();
|
||||
var runContext = runContextFactory.of(Map.of("flow", Map.of("namespace", "namespace", "id", "flowId")));
|
||||
var output = purge.run(runContext);
|
||||
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "purge_logs_no_arguments");
|
||||
|
||||
assertThat(output.getCount(), is(1));
|
||||
assertTrue(execution.getState().isSuccess());
|
||||
assertThat(execution.getTaskRunList().size(), is(1));
|
||||
assertThat(execution.getTaskRunList().getFirst().getOutputs().get("count"), is(1));
|
||||
}
|
||||
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("buildArguments")
|
||||
@LoadFlows("flows/valids/purge_logs_full_arguments.yaml")
|
||||
void run_with_full_arguments(LogEntry logEntry, int resultCount, String failingReason) throws Exception {
|
||||
logRepository.save(logEntry);
|
||||
|
||||
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "purge_logs_full_arguments");
|
||||
|
||||
assertTrue(execution.getState().isSuccess());
|
||||
assertThat(execution.getTaskRunList().size(), is(1));
|
||||
assertThat(failingReason, execution.getTaskRunList().getFirst().getOutputs().get("count"), is(resultCount));
|
||||
}
|
||||
|
||||
static Stream<Arguments> buildArguments() {
|
||||
return Stream.of(
|
||||
Arguments.of(LogEntry.builder()
|
||||
.namespace("purge.namespace")
|
||||
.flowId("purgeFlowId")
|
||||
.timestamp(Instant.now().plus(5, ChronoUnit.HOURS))
|
||||
.level(Level.INFO)
|
||||
.message("Hello World")
|
||||
.build(), 0, "The log is too recent to be found"),
|
||||
Arguments.of(LogEntry.builder()
|
||||
.namespace("purge.namespace")
|
||||
.flowId("purgeFlowId")
|
||||
.timestamp(Instant.now().minus(5, ChronoUnit.HOURS))
|
||||
.level(Level.INFO)
|
||||
.message("Hello World")
|
||||
.build(), 0, "The log is too old to be found"),
|
||||
Arguments.of(LogEntry.builder()
|
||||
.namespace("uncorrect.namespace")
|
||||
.flowId("purgeFlowId")
|
||||
.timestamp(Instant.now().minusSeconds(10))
|
||||
.level(Level.INFO)
|
||||
.message("Hello World")
|
||||
.build(), 0, "The log has an incorrect namespace"),
|
||||
Arguments.of(LogEntry.builder()
|
||||
.namespace("purge.namespace")
|
||||
.flowId("wrongFlowId")
|
||||
.timestamp(Instant.now().minusSeconds(10))
|
||||
.level(Level.INFO)
|
||||
.message("Hello World")
|
||||
.build(), 0, "The log has an incorrect flow id"),
|
||||
Arguments.of(LogEntry.builder()
|
||||
.namespace("purge.namespace")
|
||||
.flowId("purgeFlowId")
|
||||
.timestamp(Instant.now().minusSeconds(10))
|
||||
.level(Level.WARN)
|
||||
.message("Hello World")
|
||||
.build(), 0, "The log has an incorrect LogLevel"),
|
||||
Arguments.of(LogEntry.builder()
|
||||
.namespace("purge.namespace")
|
||||
.flowId("purgeFlowId")
|
||||
.timestamp(Instant.now().minusSeconds(10))
|
||||
.level(Level.INFO)
|
||||
.message("Hello World")
|
||||
.build(), 1, "The log should be deleted")
|
||||
);
|
||||
}
|
||||
}
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user