mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 11:12:12 -05:00
Compare commits
203 Commits
fix/kafka_
...
timeline-f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
93d53b9d57 | ||
|
|
11e5e14e4e | ||
|
|
72ce317c3d | ||
|
|
4da44013c1 | ||
|
|
c9995c6f42 | ||
|
|
a409299dd8 | ||
|
|
34cf67b0a4 | ||
|
|
d092556bc2 | ||
|
|
308106d532 | ||
|
|
8fe8f96278 | ||
|
|
a5cad6d87c | ||
|
|
199d67fbe2 | ||
|
|
558a2e3f01 | ||
|
|
e1d2c30e54 | ||
|
|
700c6de411 | ||
|
|
2b838a5012 | ||
|
|
617daa79db | ||
|
|
1791127acb | ||
|
|
7feb571fb3 | ||
|
|
a315bd0e1c | ||
|
|
e2ac1e7e98 | ||
|
|
c6f40eff52 | ||
|
|
ccd42f7a1a | ||
|
|
ef08c8ac30 | ||
|
|
7b527c85a9 | ||
|
|
d121867066 | ||
|
|
a084a9f6f0 | ||
|
|
f6fff11081 | ||
|
|
3d5015938f | ||
|
|
951c93cedb | ||
|
|
9c06b37989 | ||
|
|
a916a03fdd | ||
|
|
4e728da331 | ||
|
|
166a3932c9 | ||
|
|
0a21971bbf | ||
|
|
8c4d7c0f9e | ||
|
|
b709913071 | ||
|
|
5be401d23c | ||
|
|
bb9f4be8c2 | ||
|
|
01e8e46b77 | ||
|
|
d00f4b0768 | ||
|
|
279f59c874 | ||
|
|
d897509726 | ||
|
|
0d592342af | ||
|
|
fc690bf7cd | ||
|
|
0a1b919863 | ||
|
|
2f4e981a29 | ||
|
|
5e7739432e | ||
|
|
8aba863b8c | ||
|
|
7eaa43c50f | ||
|
|
267ff78bfe | ||
|
|
7272cfe01f | ||
|
|
91e2fdb2cc | ||
|
|
a236688be6 | ||
|
|
81763d40ae | ||
|
|
677efb6739 | ||
|
|
b35924fef1 | ||
|
|
9dd93294b6 | ||
|
|
fac6dfe9a0 | ||
|
|
3bf9764505 | ||
|
|
c35cea5d19 | ||
|
|
4d8e9479f1 | ||
|
|
3f24e8e838 | ||
|
|
7175fcb666 | ||
|
|
2ddfa13b1b | ||
|
|
ba2a5dfec8 | ||
|
|
f84441dac7 | ||
|
|
433b788e4a | ||
|
|
65c5fd6331 | ||
|
|
421ab40276 | ||
|
|
efb2779693 | ||
|
|
74d371c0ca | ||
|
|
90a7869020 | ||
|
|
d9ccb50b0f | ||
|
|
aea0b87ef8 | ||
|
|
9a144fc3fe | ||
|
|
ddd9cebc63 | ||
|
|
1bebbb9b73 | ||
|
|
8de4dc867e | ||
|
|
fc49694e76 | ||
|
|
152300abae | ||
|
|
1ff5dda4e1 | ||
|
|
84f9b8876d | ||
|
|
575955567f | ||
|
|
d6d2580b45 | ||
|
|
070e54b902 | ||
|
|
829ca4380f | ||
|
|
381c7a75ad | ||
|
|
1688c489a9 | ||
|
|
93ccbf5f9b | ||
|
|
ac1cb235e5 | ||
|
|
9d3d3642e8 | ||
|
|
3d306a885e | ||
|
|
ef193c5774 | ||
|
|
d0f46169f4 | ||
|
|
3005ab527c | ||
|
|
688e2af12b | ||
|
|
4c0a05f484 | ||
|
|
108f8fc2c7 | ||
|
|
8b81a37559 | ||
|
|
9222f97d63 | ||
|
|
43e3591417 | ||
|
|
438dc9ecf6 | ||
|
|
7292837c58 | ||
|
|
7fa93d7764 | ||
|
|
a3c9b35b25 | ||
|
|
2c03101422 | ||
|
|
7ee2cca3ae | ||
|
|
ddb48a4384 | ||
|
|
a62c5ab637 | ||
|
|
1b934d31f1 | ||
|
|
f887f53c25 | ||
|
|
098fa5a4ef | ||
|
|
c833ab6bc1 | ||
|
|
dfb7c61455 | ||
|
|
ed509d4461 | ||
|
|
8b50f191d8 | ||
|
|
7c5b531117 | ||
|
|
810579bce9 | ||
|
|
a0e7c50b28 | ||
|
|
bfbc3f70a4 | ||
|
|
9b5c4b0052 | ||
|
|
b7063d986b | ||
|
|
46ec0ae701 | ||
|
|
ba0615ba01 | ||
|
|
0a26098a91 | ||
|
|
0e2863e6fd | ||
|
|
4b6559203c | ||
|
|
b6993d71f2 | ||
|
|
ea6daf381d | ||
|
|
0649c1309b | ||
|
|
6e21d650f9 | ||
|
|
88acc91323 | ||
|
|
a822f3b372 | ||
|
|
4a3c6ee9e9 | ||
|
|
0ed4e5853d | ||
|
|
71cdd02230 | ||
|
|
fedddcde00 | ||
|
|
03f256cb9b | ||
|
|
f9317ba8ea | ||
|
|
eefca3d7a4 | ||
|
|
96b9e3c74b | ||
|
|
0a78778e5c | ||
|
|
5342948bfb | ||
|
|
f9beb0f4af | ||
|
|
70c1621025 | ||
|
|
a9098e7dc9 | ||
|
|
249839833c | ||
|
|
97ec24fc6a | ||
|
|
be5e24217b | ||
|
|
a5724bcb18 | ||
|
|
f3057d2d57 | ||
|
|
e8a953fc6b | ||
|
|
267f4fcc86 | ||
|
|
af1e2e3059 | ||
|
|
638d58697b | ||
|
|
0ae9374bf5 | ||
|
|
6a0344a09e | ||
|
|
e46b4a75d1 | ||
|
|
8fe1bae739 | ||
|
|
9ef59fdd23 | ||
|
|
79ab4415ad | ||
|
|
dd3829cc48 | ||
|
|
fa187904f9 | ||
|
|
c659599b1f | ||
|
|
85db1eafa7 | ||
|
|
9863f0807f | ||
|
|
b3b0d630cf | ||
|
|
97665449a8 | ||
|
|
2e1ed792e9 | ||
|
|
59e3ae5922 | ||
|
|
d46b54746a | ||
|
|
4fbaed744b | ||
|
|
b0638437d5 | ||
|
|
e3f9d0f8ff | ||
|
|
ae80738f33 | ||
|
|
2b4f208569 | ||
|
|
8acbc8ba03 | ||
|
|
d92cc099c7 | ||
|
|
3d806022bc | ||
|
|
b72bafe344 | ||
|
|
5e2063aa57 | ||
|
|
3838f8c87f | ||
|
|
55c89244b4 | ||
|
|
59c9ae57b7 | ||
|
|
1b8a2cd19a | ||
|
|
16992626d2 | ||
|
|
2c94922736 | ||
|
|
b45c0b13be | ||
|
|
aed055dcb1 | ||
|
|
cfe107705b | ||
|
|
58da5fe7d8 | ||
|
|
869f7c718c | ||
|
|
985ed2ac89 | ||
|
|
bf0361778d | ||
|
|
04a6adf012 | ||
|
|
0ffb71c25d | ||
|
|
63659eca79 | ||
|
|
357d4e0d69 | ||
|
|
cf301a1192 | ||
|
|
bc08fc7d07 | ||
|
|
85ac124740 | ||
|
|
8021257bf4 |
@@ -23,7 +23,15 @@ In the meantime, you can move onto the next step...
|
||||
|
||||
---
|
||||
|
||||
### Requirements
|
||||
|
||||
- Java 21 (LTS versions).
|
||||
> ⚠️ Java 24 and above are not supported yet and will fail with `invalid source release: 21`.
|
||||
- Gradle (comes with wrapper `./gradlew`)
|
||||
- Docker (optional, for running Kestra in containers)
|
||||
|
||||
### Development:
|
||||
|
||||
- (Optional) By default, your dev server will target `localhost:8080`. If your backend is running elsewhere, you can create `.env.development.local` under `ui` folder with this content:
|
||||
```
|
||||
VITE_APP_API_URL={myApiUrl}
|
||||
|
||||
4
.github/workflows/auto-translate-ui-keys.yml
vendored
4
.github/workflows/auto-translate-ui-keys.yml
vendored
@@ -26,7 +26,7 @@ jobs:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5
|
||||
uses: actions/setup-python@v6
|
||||
with:
|
||||
python-version: "3.x"
|
||||
|
||||
@@ -39,7 +39,7 @@ jobs:
|
||||
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
|
||||
|
||||
- name: Set up Node
|
||||
uses: actions/setup-node@v4
|
||||
uses: actions/setup-node@v5
|
||||
with:
|
||||
node-version: "20.x"
|
||||
|
||||
|
||||
2
.github/workflows/e2e.yml
vendored
2
.github/workflows/e2e.yml
vendored
@@ -37,7 +37,7 @@ jobs:
|
||||
path: kestra
|
||||
|
||||
# Setup build
|
||||
- uses: kestra-io/actions/.github/actions/setup-build@main
|
||||
- uses: kestra-io/actions/composite/setup-build@main
|
||||
name: Setup - Build
|
||||
id: build
|
||||
with:
|
||||
|
||||
18
.github/workflows/gradle-release-plugins.yml
vendored
18
.github/workflows/gradle-release-plugins.yml
vendored
@@ -4,7 +4,7 @@ on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
releaseVersion:
|
||||
description: 'The release version (e.g., 0.21.0-rc1)'
|
||||
description: 'The release version (e.g., 0.21.0)'
|
||||
required: true
|
||||
type: string
|
||||
nextVersion:
|
||||
@@ -25,21 +25,13 @@ jobs:
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
ref: main
|
||||
|
||||
# Setup build
|
||||
- uses: ./actions/.github/actions/setup-build
|
||||
- uses: kestra-io/actions/composite/setup-build@main
|
||||
id: build
|
||||
with:
|
||||
java-enabled: true
|
||||
node-enabled: true
|
||||
python-enabled: true
|
||||
caches-enabled: true
|
||||
|
||||
# Get Plugins List
|
||||
- name: Get Plugins List
|
||||
@@ -60,7 +52,7 @@ jobs:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
chmod +x ./dev-tools/release-plugins.sh;
|
||||
|
||||
|
||||
./dev-tools/release-plugins.sh \
|
||||
--release-version=${{github.event.inputs.releaseVersion}} \
|
||||
--next-version=${{github.event.inputs.nextVersion}} \
|
||||
@@ -73,10 +65,10 @@ jobs:
|
||||
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
run: |
|
||||
chmod +x ./dev-tools/release-plugins.sh;
|
||||
|
||||
|
||||
./dev-tools/release-plugins.sh \
|
||||
--release-version=${{github.event.inputs.releaseVersion}} \
|
||||
--next-version=${{github.event.inputs.nextVersion}} \
|
||||
--dry-run \
|
||||
--yes \
|
||||
${{ steps.plugins-list.outputs.repositories }}
|
||||
${{ steps.plugins-list.outputs.repositories }}
|
||||
|
||||
18
.github/workflows/gradle-release.yml
vendored
18
.github/workflows/gradle-release.yml
vendored
@@ -4,7 +4,7 @@ on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
releaseVersion:
|
||||
description: 'The release version (e.g., 0.21.0-rc1)'
|
||||
description: 'The release version (e.g., 0.21.0)'
|
||||
required: true
|
||||
type: string
|
||||
nextVersion:
|
||||
@@ -23,8 +23,8 @@ jobs:
|
||||
# Checks
|
||||
- name: Check Inputs
|
||||
run: |
|
||||
if ! [[ "$RELEASE_VERSION" =~ ^[0-9]+(\.[0-9]+)\.0-rc[01](-SNAPSHOT)?$ ]]; then
|
||||
echo "Invalid release version. Must match regex: ^[0-9]+(\.[0-9]+)\.0-rc[01](-SNAPSHOT)?$"
|
||||
if ! [[ "$RELEASE_VERSION" =~ ^[0-9]+(\.[0-9]+)\.0$ ]]; then
|
||||
echo "Invalid release version. Must match regex: ^[0-9]+(\.[0-9]+)\.0$"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
@@ -38,15 +38,8 @@ jobs:
|
||||
fetch-depth: 0
|
||||
path: kestra
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
ref: main
|
||||
|
||||
# Setup build
|
||||
- uses: ./actions/.github/actions/setup-build
|
||||
- uses: kestra-io/actions/composite/setup-build@main
|
||||
id: build
|
||||
with:
|
||||
java-enabled: true
|
||||
@@ -78,7 +71,6 @@ jobs:
|
||||
git checkout develop;
|
||||
|
||||
if [[ "$RELEASE_VERSION" == *"-SNAPSHOT" ]]; then
|
||||
# -SNAPSHOT qualifier maybe used to test release-candidates
|
||||
./gradlew release -Prelease.useAutomaticVersion=true \
|
||||
-Prelease.releaseVersion="${RELEASE_VERSION}" \
|
||||
-Prelease.newVersion="${NEXT_VERSION}" \
|
||||
@@ -89,4 +81,4 @@ jobs:
|
||||
-Prelease.releaseVersion="${RELEASE_VERSION}" \
|
||||
-Prelease.newVersion="${NEXT_VERSION}" \
|
||||
-Prelease.pushReleaseVersionBranch="${PUSH_RELEASE_BRANCH}"
|
||||
fi
|
||||
fi
|
||||
|
||||
32
.github/workflows/kestra-devtools-test.yml
vendored
Normal file
32
.github/workflows/kestra-devtools-test.yml
vendored
Normal file
@@ -0,0 +1,32 @@
|
||||
name: kestra-devtools test
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
branches:
|
||||
- develop
|
||||
paths:
|
||||
- 'dev-tools/kestra-devtools/**'
|
||||
|
||||
env:
|
||||
# to save corepack from itself
|
||||
COREPACK_INTEGRITY_KEYS: 0
|
||||
|
||||
jobs:
|
||||
test:
|
||||
name: kestra-devtools tests
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v5
|
||||
|
||||
- name: Npm - install
|
||||
working-directory: 'dev-tools/kestra-devtools'
|
||||
run: npm ci
|
||||
|
||||
- name: Run tests
|
||||
working-directory: 'dev-tools/kestra-devtools'
|
||||
run: npm run test
|
||||
|
||||
- name: Npm - Run build
|
||||
working-directory: 'dev-tools/kestra-devtools'
|
||||
run: npm run build
|
||||
25
.github/workflows/main.yml
vendored
25
.github/workflows/main.yml
vendored
@@ -3,6 +3,14 @@ name: Main Workflow
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
skip-test:
|
||||
description: 'Skip test'
|
||||
type: choice
|
||||
required: true
|
||||
default: 'false'
|
||||
options:
|
||||
- "true"
|
||||
- "false"
|
||||
plugin-version:
|
||||
description: "plugins version"
|
||||
required: false
|
||||
@@ -24,13 +32,14 @@ jobs:
|
||||
tests:
|
||||
name: Execute tests
|
||||
uses: ./.github/workflows/workflow-test.yml
|
||||
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
|
||||
with:
|
||||
report-status: false
|
||||
|
||||
release:
|
||||
name: Release
|
||||
needs: [tests]
|
||||
if: "!startsWith(github.ref, 'refs/heads/releases')"
|
||||
if: "!failure() && !cancelled() && !startsWith(github.ref, 'refs/heads/releases')"
|
||||
uses: ./.github/workflows/workflow-release.yml
|
||||
with:
|
||||
plugin-version: ${{ inputs.plugin-version != '' && inputs.plugin-version || (github.ref == 'refs/heads/develop' && 'LATEST-SNAPSHOT' || 'LATEST') }}
|
||||
@@ -44,13 +53,12 @@ jobs:
|
||||
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
|
||||
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
|
||||
|
||||
end:
|
||||
runs-on: ubuntu-latest
|
||||
needs:
|
||||
- release
|
||||
if: always()
|
||||
env:
|
||||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
steps:
|
||||
- name: Trigger EE Workflow
|
||||
uses: peter-evans/repository-dispatch@v3
|
||||
@@ -60,14 +68,9 @@ jobs:
|
||||
repository: kestra-io/kestra-ee
|
||||
event-type: "oss-updated"
|
||||
|
||||
|
||||
# Slack
|
||||
- name: Slack - Notification
|
||||
uses: Gamesight/slack-workflow-status@master
|
||||
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
|
||||
if: ${{ failure() && env.SLACK_WEBHOOK_URL != 0 && (github.ref == 'refs/heads/master' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/develop') }}
|
||||
uses: kestra-io/actions/composite/slack-status@main
|
||||
with:
|
||||
repo_token: ${{ secrets.GITHUB_TOKEN }}
|
||||
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
name: GitHub Actions
|
||||
icon_emoji: ":github-actions:"
|
||||
channel: "C02DQ1A7JLR" # _int_git channel
|
||||
webhook-url: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
|
||||
16
.github/workflows/pull-request.yml
vendored
16
.github/workflows/pull-request.yml
vendored
@@ -60,19 +60,3 @@ jobs:
|
||||
name: E2E - Tests
|
||||
uses: ./.github/workflows/e2e.yml
|
||||
|
||||
end:
|
||||
name: End
|
||||
runs-on: ubuntu-latest
|
||||
if: always()
|
||||
needs: [frontend, backend]
|
||||
steps:
|
||||
# Slack
|
||||
- name: Slack notification
|
||||
uses: Gamesight/slack-workflow-status@master
|
||||
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
|
||||
with:
|
||||
repo_token: ${{ secrets.GITHUB_TOKEN }}
|
||||
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
name: GitHub Actions
|
||||
icon_emoji: ":github-actions:"
|
||||
channel: "C02DQ1A7JLR"
|
||||
9
.github/workflows/setversion-tag.yml
vendored
9
.github/workflows/setversion-tag.yml
vendored
@@ -34,11 +34,14 @@ jobs:
|
||||
fi
|
||||
|
||||
# Checkout
|
||||
- uses: actions/checkout@v5
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
token: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
|
||||
- name: Configure Git
|
||||
# Configure
|
||||
- name: Git - Configure
|
||||
run: |
|
||||
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
|
||||
git config --global user.name "github-actions[bot]"
|
||||
@@ -54,4 +57,4 @@ jobs:
|
||||
git commit -m"chore(version): update to version '$RELEASE_VERSION'"
|
||||
git push
|
||||
git tag -a "v$RELEASE_VERSION" -m"v$RELEASE_VERSION"
|
||||
git push --tags
|
||||
git push --tags
|
||||
|
||||
30
.github/workflows/vulnerabilities-check.yml
vendored
30
.github/workflows/vulnerabilities-check.yml
vendored
@@ -21,13 +21,6 @@ jobs:
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
ref: main
|
||||
|
||||
# Setup build
|
||||
- uses: ./actions/.github/actions/setup-build
|
||||
id: build
|
||||
@@ -70,15 +63,8 @@ jobs:
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
ref: main
|
||||
|
||||
# Setup build
|
||||
- uses: ./actions/.github/actions/setup-build
|
||||
- uses: kestra-io/actions/composite/setup-build@main
|
||||
id: build
|
||||
with:
|
||||
java-enabled: false
|
||||
@@ -87,7 +73,7 @@ jobs:
|
||||
|
||||
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
|
||||
- name: Docker Vulnerabilities Check
|
||||
uses: aquasecurity/trivy-action@0.32.0
|
||||
uses: aquasecurity/trivy-action@0.33.1
|
||||
with:
|
||||
image-ref: kestra/kestra:develop
|
||||
format: 'template'
|
||||
@@ -115,24 +101,16 @@ jobs:
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
ref: main
|
||||
|
||||
# Setup build
|
||||
- uses: ./actions/.github/actions/setup-build
|
||||
- uses: kestra-io/actions/composite/setup-build@main
|
||||
id: build
|
||||
with:
|
||||
java-enabled: false
|
||||
node-enabled: false
|
||||
caches-enabled: true
|
||||
|
||||
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
|
||||
- name: Docker Vulnerabilities Check
|
||||
uses: aquasecurity/trivy-action@0.32.0
|
||||
uses: aquasecurity/trivy-action@0.33.1
|
||||
with:
|
||||
image-ref: kestra/kestra:latest
|
||||
format: table
|
||||
|
||||
31
.github/workflows/workflow-backend-test.yml
vendored
31
.github/workflows/workflow-backend-test.yml
vendored
@@ -20,6 +20,7 @@ permissions:
|
||||
contents: write
|
||||
checks: write
|
||||
actions: read
|
||||
pull-requests: write
|
||||
|
||||
jobs:
|
||||
test:
|
||||
@@ -35,7 +36,7 @@ jobs:
|
||||
fetch-depth: 0
|
||||
|
||||
# Setup build
|
||||
- uses: kestra-io/actions/.github/actions/setup-build@main
|
||||
- uses: kestra-io/actions/composite/setup-build@main
|
||||
name: Setup - Build
|
||||
id: build
|
||||
with:
|
||||
@@ -59,6 +60,30 @@ jobs:
|
||||
export GOOGLE_APPLICATION_CREDENTIALS=$HOME/.gcp-service-account.json
|
||||
./gradlew check javadoc --parallel
|
||||
|
||||
- name: comment PR with test report
|
||||
if: ${{ !cancelled() && github.event_name == 'pull_request' }}
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_AUTH_TOKEN }}
|
||||
run: |
|
||||
export KESTRA_PWD=$(pwd) && sh -c 'cd dev-tools/kestra-devtools && npm ci && npm run build && node dist/kestra-devtools-cli.cjs generateTestReportSummary --only-errors --ci $KESTRA_PWD' > report.md
|
||||
cat report.md
|
||||
# Gradle check
|
||||
- name: 'generate Timeline flamegraph'
|
||||
if: always()
|
||||
env:
|
||||
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
|
||||
shell: bash
|
||||
run: |
|
||||
echo $GOOGLE_SERVICE_ACCOUNT | base64 -d > ~/.gcp-service-account.json
|
||||
export GOOGLE_APPLICATION_CREDENTIALS=$HOME/.gcp-service-account.json
|
||||
./gradlew mergeTestTimeline
|
||||
- name: 'Upload Timeline flamegraph'
|
||||
uses: actions/upload-artifact@v4
|
||||
if: always()
|
||||
with:
|
||||
name: all-test-timelines.json
|
||||
path: build/reports/test-timelines-report/all-test-timelines.json
|
||||
retention-days: 5
|
||||
# report test
|
||||
- name: Test - Publish Test Results
|
||||
uses: dorny/test-reporter@v2
|
||||
@@ -86,13 +111,13 @@ jobs:
|
||||
id: auth
|
||||
if: always() && env.GOOGLE_SERVICE_ACCOUNT != ''
|
||||
continue-on-error: true
|
||||
uses: "google-github-actions/auth@v2"
|
||||
uses: "google-github-actions/auth@v3"
|
||||
with:
|
||||
credentials_json: "${{ secrets.GOOGLE_SERVICE_ACCOUNT }}"
|
||||
|
||||
- name: GCP - Setup Cloud SDK
|
||||
if: env.GOOGLE_SERVICE_ACCOUNT != ''
|
||||
uses: "google-github-actions/setup-gcloud@v2"
|
||||
uses: "google-github-actions/setup-gcloud@v3"
|
||||
|
||||
# Allure check
|
||||
- uses: rlespinasse/github-slug-action@v5
|
||||
|
||||
@@ -26,7 +26,7 @@ jobs:
|
||||
run: npm ci
|
||||
|
||||
# Setup build
|
||||
- uses: kestra-io/actions/.github/actions/setup-build@main
|
||||
- uses: kestra-io/actions/composite/setup-build@main
|
||||
name: Setup - Build
|
||||
id: build
|
||||
with:
|
||||
|
||||
13
.github/workflows/workflow-github-release.yml
vendored
13
.github/workflows/workflow-github-release.yml
vendored
@@ -25,15 +25,6 @@ jobs:
|
||||
fetch-depth: 0
|
||||
submodules: true
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- name: Checkout - Actions
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
sparse-checkout-cone-mode: true
|
||||
path: actions
|
||||
sparse-checkout: |
|
||||
.github/actions
|
||||
|
||||
# Download Exec
|
||||
# Must be done after checkout actions
|
||||
@@ -59,7 +50,7 @@ jobs:
|
||||
|
||||
# GitHub Release
|
||||
- name: Create GitHub release
|
||||
uses: ./actions/.github/actions/github-release
|
||||
uses: kestra-io/actions/composite/github-release@main
|
||||
if: ${{ startsWith(github.ref, 'refs/tags/v') }}
|
||||
env:
|
||||
MAKE_LATEST: ${{ steps.is_latest.outputs.latest }}
|
||||
@@ -82,7 +73,7 @@ jobs:
|
||||
|
||||
- name: Merge Release Notes
|
||||
if: ${{ startsWith(github.ref, 'refs/tags/v') }}
|
||||
uses: ./actions/.github/actions/github-release-note-merge
|
||||
uses: kestra-io/actions/composite/github-release-note-merge@main
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
RELEASE_TAG: ${{ github.ref_name }}
|
||||
|
||||
26
.github/workflows/workflow-publish-docker.yml
vendored
26
.github/workflows/workflow-publish-docker.yml
vendored
@@ -11,6 +11,14 @@ on:
|
||||
options:
|
||||
- "true"
|
||||
- "false"
|
||||
retag-lts:
|
||||
description: 'Retag LTS Docker images'
|
||||
required: true
|
||||
type: choice
|
||||
default: "false"
|
||||
options:
|
||||
- "true"
|
||||
- "false"
|
||||
release-tag:
|
||||
description: 'Kestra Release Tag (by default, deduced with the ref)'
|
||||
required: false
|
||||
@@ -179,6 +187,11 @@ jobs:
|
||||
run: |
|
||||
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
|
||||
|
||||
- name: Retag to LTS
|
||||
if: startsWith(github.ref, 'refs/tags/v') && inputs.retag-lts == 'true'
|
||||
run: |
|
||||
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest-lts{0}', matrix.image.name) }}
|
||||
|
||||
end:
|
||||
runs-on: ubuntu-latest
|
||||
needs:
|
||||
@@ -187,14 +200,9 @@ jobs:
|
||||
env:
|
||||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
steps:
|
||||
|
||||
# Slack
|
||||
- name: Slack notification
|
||||
uses: Gamesight/slack-workflow-status@master
|
||||
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
|
||||
if: ${{ failure() && env.SLACK_WEBHOOK_URL != 0 }}
|
||||
uses: kestra-io/actions/composite/slack-status@main
|
||||
with:
|
||||
repo_token: ${{ secrets.GITHUB_TOKEN }}
|
||||
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
name: GitHub Actions
|
||||
icon_emoji: ':github-actions:'
|
||||
channel: 'C02DQ1A7JLR' # _int_git channel
|
||||
webhook-url: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
|
||||
|
||||
2
.github/workflows/workflow-publish-maven.yml
vendored
2
.github/workflows/workflow-publish-maven.yml
vendored
@@ -29,7 +29,7 @@ jobs:
|
||||
|
||||
# Setup build
|
||||
- name: Setup - Build
|
||||
uses: kestra-io/actions/.github/actions/setup-build@main
|
||||
uses: kestra-io/actions/composite/setup-build@main
|
||||
id: build
|
||||
with:
|
||||
java-enabled: true
|
||||
|
||||
@@ -7,7 +7,7 @@ on:
|
||||
jobs:
|
||||
publish:
|
||||
name: Pull Request - Delete Docker
|
||||
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
|
||||
if: github.repository == 'kestra-io/kestra' # prevent running on forks
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: dataaxiom/ghcr-cleanup-action@v1
|
||||
|
||||
@@ -8,12 +8,12 @@ on:
|
||||
jobs:
|
||||
build-artifacts:
|
||||
name: Build Artifacts
|
||||
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
|
||||
if: github.repository == 'kestra-io/kestra' # prevent running on forks
|
||||
uses: ./.github/workflows/workflow-build-artifacts.yml
|
||||
|
||||
publish:
|
||||
name: Publish Docker
|
||||
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
|
||||
if: github.repository == 'kestra-io/kestra' # prevent running on forks
|
||||
runs-on: ubuntu-latest
|
||||
needs: build-artifacts
|
||||
env:
|
||||
@@ -62,7 +62,7 @@ jobs:
|
||||
|
||||
# Add comment on pull request
|
||||
- name: Add comment to PR
|
||||
uses: actions/github-script@v7
|
||||
uses: actions/github-script@v8
|
||||
with:
|
||||
github-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
script: |
|
||||
|
||||
14
.github/workflows/workflow-test.yml
vendored
14
.github/workflows/workflow-test.yml
vendored
@@ -84,14 +84,12 @@ jobs:
|
||||
name: Notify - Slack
|
||||
runs-on: ubuntu-latest
|
||||
needs: [ frontend, backend ]
|
||||
if: github.event_name == 'schedule'
|
||||
steps:
|
||||
- name: Notify failed CI
|
||||
id: send-ci-failed
|
||||
if: |
|
||||
always() && (needs.frontend.result != 'success' ||
|
||||
needs.backend.result != 'success')
|
||||
uses: kestra-io/actions/.github/actions/send-ci-failed@main
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
always() &&
|
||||
(needs.frontend.result != 'success' || needs.backend.result != 'success') &&
|
||||
(github.ref == 'refs/heads/master' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/develop')
|
||||
uses: kestra-io/actions/composite/slack-status@main
|
||||
with:
|
||||
webhook-url: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
|
||||
4
.plugins
4
.plugins
@@ -36,6 +36,7 @@
|
||||
#plugin-gemini:io.kestra.plugin:plugin-gemini:LATEST
|
||||
#plugin-git:io.kestra.plugin:plugin-git:LATEST
|
||||
#plugin-github:io.kestra.plugin:plugin-github:LATEST
|
||||
#plugin-gitlab:io.kestra.plugin:plugin-gitlab:LATEST
|
||||
#plugin-googleworkspace:io.kestra.plugin:plugin-googleworkspace:LATEST
|
||||
#plugin-graalvm:io.kestra.plugin:plugin-graalvm:LATEST
|
||||
#plugin-graphql:io.kestra.plugin:plugin-graphql:LATEST
|
||||
@@ -108,16 +109,17 @@
|
||||
#plugin-serdes:io.kestra.plugin:plugin-serdes:LATEST
|
||||
#plugin-servicenow:io.kestra.plugin:plugin-servicenow:LATEST
|
||||
#plugin-sifflet:io.kestra.plugin:plugin-sifflet:LATEST
|
||||
#plugin-singer:io.kestra.plugin:plugin-singer:LATEST
|
||||
#plugin-soda:io.kestra.plugin:plugin-soda:LATEST
|
||||
#plugin-solace:io.kestra.plugin:plugin-solace:LATEST
|
||||
#plugin-spark:io.kestra.plugin:plugin-spark:LATEST
|
||||
#plugin-sqlmesh:io.kestra.plugin:plugin-sqlmesh:LATEST
|
||||
#plugin-supabase:io.kestra.plugin:plugin-supabase:LATEST
|
||||
#plugin-surrealdb:io.kestra.plugin:plugin-surrealdb:LATEST
|
||||
#plugin-terraform:io.kestra.plugin:plugin-terraform:LATEST
|
||||
#plugin-transform:io.kestra.plugin:plugin-transform-grok:LATEST
|
||||
#plugin-transform:io.kestra.plugin:plugin-transform-json:LATEST
|
||||
#plugin-tika:io.kestra.plugin:plugin-tika:LATEST
|
||||
#plugin-trivy:io.kestra.plugin:plugin-trivy:LATEST
|
||||
#plugin-weaviate:io.kestra.plugin:plugin-weaviate:LATEST
|
||||
#plugin-zendesk:io.kestra.plugin:plugin-zendesk:LATEST
|
||||
#plugin-typesense:io.kestra.plugin:plugin-typesense:LATEST
|
||||
|
||||
2
Makefile
2
Makefile
@@ -89,7 +89,7 @@ build-docker: build-exec
|
||||
--compress \
|
||||
--rm \
|
||||
-f ./Dockerfile \
|
||||
--build-arg="APT_PACKAGES=python3 python3-venv python-is-python3 python3-pip nodejs npm curl zip unzip jattach" \
|
||||
--build-arg="APT_PACKAGES=python3 python-is-python3 python3-pip curl jattach" \
|
||||
--build-arg="PYTHON_LIBRARIES=kestra" \
|
||||
-t ${DOCKER_IMAGE}:${VERSION} ${DOCKER_PATH} || exit 1 ;
|
||||
|
||||
|
||||
108
build.gradle
108
build.gradle
@@ -21,7 +21,7 @@ plugins {
|
||||
|
||||
// test
|
||||
id "com.adarshr.test-logger" version "4.0.0"
|
||||
id "org.sonarqube" version "6.2.0.5505"
|
||||
id "org.sonarqube" version "6.3.1.5724"
|
||||
id 'jacoco-report-aggregation'
|
||||
|
||||
// helper
|
||||
@@ -32,7 +32,7 @@ plugins {
|
||||
|
||||
// release
|
||||
id 'net.researchgate.release' version '3.1.0'
|
||||
id "com.gorylenko.gradle-git-properties" version "2.5.2"
|
||||
id "com.gorylenko.gradle-git-properties" version "2.5.3"
|
||||
id 'signing'
|
||||
id "com.vanniktech.maven.publish" version "0.34.0"
|
||||
|
||||
@@ -168,8 +168,9 @@ allprojects {
|
||||
/**********************************************************************************************************************\
|
||||
* Test
|
||||
**********************************************************************************************************************/
|
||||
subprojects {
|
||||
if (it.name != 'platform' && it.name != 'jmh-benchmarks') {
|
||||
subprojects {subProj ->
|
||||
|
||||
if (subProj.name != 'platform' && subProj.name != 'jmh-benchmarks') {
|
||||
apply plugin: "com.adarshr.test-logger"
|
||||
|
||||
java {
|
||||
@@ -207,6 +208,13 @@ subprojects {
|
||||
|
||||
test {
|
||||
useJUnitPlatform()
|
||||
reports {
|
||||
junitXml.required = true
|
||||
junitXml.outputPerTestCase = true
|
||||
junitXml.mergeReruns = true
|
||||
junitXml.includeSystemErrLog = true;
|
||||
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
|
||||
}
|
||||
|
||||
// set Xmx for test workers
|
||||
maxHeapSize = '4g'
|
||||
@@ -222,6 +230,52 @@ subprojects {
|
||||
environment 'SECRET_PASSWORD', "cGFzc3dvcmQ="
|
||||
environment 'ENV_TEST1', "true"
|
||||
environment 'ENV_TEST2', "Pass by env"
|
||||
|
||||
// === Test Timeline Trace (Chrome trace format) ===
|
||||
// Produces per-JVM ndjson under build/test-timelines/*.jsonl and a merged array via :mergeTestTimeline
|
||||
// Each event has: start time (ts, µs since epoch), end via dur, and absolute duration (dur, µs)
|
||||
doFirst {
|
||||
file("${buildDir}/test-results/test-timelines").mkdirs()
|
||||
}
|
||||
|
||||
def jvmName = java.lang.management.ManagementFactory.runtimeMXBean.name
|
||||
def pid = jvmName.tokenize('@')[0]
|
||||
def traceDir = file("${buildDir}/test-results/test-timelines")
|
||||
def traceFile = new File(traceDir, "${project.name}-${name}-${pid}.jsonl")
|
||||
def starts = new java.util.concurrent.ConcurrentHashMap<Object, Long>()
|
||||
|
||||
beforeTest { org.gradle.api.tasks.testing.TestDescriptor d ->
|
||||
// epoch millis to allow cross-JVM merge
|
||||
starts.put(d, System.currentTimeMillis())
|
||||
}
|
||||
afterTest { org.gradle.api.tasks.testing.TestDescriptor d, org.gradle.api.tasks.testing.TestResult r ->
|
||||
def st = starts.remove(d)
|
||||
if (st != null) {
|
||||
def en = System.currentTimeMillis()
|
||||
long tsMicros = st * 1000L // start time (µs since epoch)
|
||||
long durMicros = (en - st) * 1000L // duration (µs)
|
||||
def ev = [
|
||||
name: (d.className ? d.className + '.' + d.name : d.name),
|
||||
cat : 'test',
|
||||
ph : 'X', // Complete event with duration
|
||||
ts : tsMicros,
|
||||
dur : durMicros,
|
||||
pid : project.name, // group by project/module
|
||||
tid : "${name}-worker-${pid}",
|
||||
args: [result: r.resultType.toString()]
|
||||
]
|
||||
synchronized (traceFile.absolutePath.intern()) {
|
||||
traceFile << (groovy.json.JsonOutput.toJson(ev) + System.lineSeparator())
|
||||
}
|
||||
}
|
||||
}
|
||||
if (subProj.name == 'core' || subProj.name == 'jdbc-h2') {
|
||||
// JUnit 5 parallel settings
|
||||
systemProperty 'junit.jupiter.execution.parallel.enabled', 'true'
|
||||
systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent'
|
||||
systemProperty 'junit.jupiter.execution.parallel.mode.classes.default', 'same_thread'
|
||||
systemProperty 'junit.jupiter.execution.parallel.config.strategy', 'dynamic'
|
||||
}
|
||||
}
|
||||
|
||||
testlogger {
|
||||
@@ -236,7 +290,53 @@ subprojects {
|
||||
}
|
||||
}
|
||||
}
|
||||
// Root-level aggregator: merge timelines from ALL modules into one Chrome trace
|
||||
if (project == rootProject) {
|
||||
tasks.register('mergeTestTimeline') {
|
||||
group = 'verification'
|
||||
description = 'Merge per-worker test timeline ndjson from all modules into a single Chrome Trace JSON array.'
|
||||
doLast {
|
||||
def collectedFiles = [] as List<File>
|
||||
|
||||
// Collect *.jsonl files from every subproject
|
||||
rootProject.subprojects.each { p ->
|
||||
def dir = p.file("${p.buildDir}/test-results/test-timelines")
|
||||
if (dir.exists()) {
|
||||
collectedFiles.addAll(p.fileTree(dir: dir, include: '*.jsonl').files)
|
||||
}
|
||||
}
|
||||
|
||||
if (collectedFiles.isEmpty()) {
|
||||
logger.lifecycle("No timeline files found in any subproject. Run tests first (e.g., './gradlew test --parallel').")
|
||||
return
|
||||
}
|
||||
|
||||
collectedFiles = collectedFiles.sort { it.name }
|
||||
|
||||
def outDir = rootProject.file("${rootProject.buildDir}/reports/test-timelines-report")
|
||||
outDir.mkdirs()
|
||||
def out = new File(outDir, "all-test-timelines.json")
|
||||
|
||||
out.withWriter('UTF-8') { w ->
|
||||
w << '['
|
||||
boolean first = true
|
||||
collectedFiles.each { f ->
|
||||
f.eachLine { line ->
|
||||
def trimmed = line?.trim()
|
||||
if (trimmed) {
|
||||
if (!first) w << ','
|
||||
w << trimmed
|
||||
first = false
|
||||
}
|
||||
}
|
||||
}
|
||||
w << ']'
|
||||
}
|
||||
|
||||
logger.lifecycle("Merged ${collectedFiles.size()} files into ${out} — open it in chrome://tracing or Perfetto UI.")
|
||||
}
|
||||
}
|
||||
}
|
||||
/**********************************************************************************************************************\
|
||||
* End-to-End Tests
|
||||
**********************************************************************************************************************/
|
||||
|
||||
@@ -33,8 +33,13 @@ dependencies {
|
||||
|
||||
implementation project(":storage-local")
|
||||
|
||||
// Kestra server components
|
||||
implementation project(":executor")
|
||||
implementation project(":scheduler")
|
||||
implementation project(":webserver")
|
||||
implementation project(":worker")
|
||||
|
||||
//test
|
||||
testImplementation project(':tests')
|
||||
testImplementation "org.wiremock:wiremock-jetty12"
|
||||
}
|
||||
@@ -89,11 +89,24 @@ public class App implements Callable<Integer> {
|
||||
*/
|
||||
protected static ApplicationContext applicationContext(Class<?> mainClass,
|
||||
String[] args) {
|
||||
return applicationContext(mainClass, new String [] { Environment.CLI }, args);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an {@link ApplicationContext} with additional properties based on configuration files (--config) and
|
||||
* forced Properties from current command.
|
||||
*
|
||||
* @param args args passed to java app
|
||||
* @return the application context created
|
||||
*/
|
||||
protected static ApplicationContext applicationContext(Class<?> mainClass,
|
||||
String[] environments,
|
||||
String[] args) {
|
||||
|
||||
ApplicationContextBuilder builder = ApplicationContext
|
||||
.builder()
|
||||
.mainClass(mainClass)
|
||||
.environments(Environment.CLI);
|
||||
.environments(environments);
|
||||
|
||||
CommandLine cmd = new CommandLine(mainClass, CommandLine.defaultFactory());
|
||||
continueOnParsingErrors(cmd);
|
||||
|
||||
@@ -1,15 +1,14 @@
|
||||
package io.kestra.core.runners;
|
||||
package io.kestra.cli;
|
||||
|
||||
import io.kestra.core.schedulers.AbstractScheduler;
|
||||
import io.kestra.core.runners.*;
|
||||
import io.kestra.core.server.Service;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.ExecutorsUtils;
|
||||
import io.kestra.worker.DefaultWorker;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
import io.micronaut.context.annotation.Value;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@@ -24,9 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
@SuppressWarnings("try")
|
||||
@Slf4j
|
||||
@Singleton
|
||||
@Requires(missingBeans = RunnerInterface.class)
|
||||
public class StandAloneRunner implements RunnerInterface, AutoCloseable {
|
||||
public class StandAloneRunner implements Runnable, AutoCloseable {
|
||||
@Setter protected int workerThread = Math.max(3, Runtime.getRuntime().availableProcessors());
|
||||
@Setter protected boolean schedulerEnabled = true;
|
||||
@Setter protected boolean workerEnabled = true;
|
||||
@@ -45,7 +42,7 @@ public class StandAloneRunner implements RunnerInterface, AutoCloseable {
|
||||
|
||||
private final AtomicBoolean running = new AtomicBoolean(false);
|
||||
|
||||
private volatile ExecutorService poolExecutor;
|
||||
private ExecutorService poolExecutor;
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
@@ -57,20 +54,20 @@ public class StandAloneRunner implements RunnerInterface, AutoCloseable {
|
||||
if (workerEnabled) {
|
||||
// FIXME: For backward-compatibility with Kestra 0.15.x and earliest we still used UUID for Worker ID instead of IdUtils
|
||||
String workerID = UUID.randomUUID().toString();
|
||||
Worker worker = applicationContext.createBean(Worker.class, workerID, workerThread, null);
|
||||
Worker worker = applicationContext.createBean(DefaultWorker.class, workerID, workerThread, null);
|
||||
applicationContext.registerSingleton(worker); //
|
||||
poolExecutor.execute(worker);
|
||||
servers.add(worker);
|
||||
}
|
||||
|
||||
if (schedulerEnabled) {
|
||||
AbstractScheduler scheduler = applicationContext.getBean(AbstractScheduler.class);
|
||||
Scheduler scheduler = applicationContext.getBean(Scheduler.class);
|
||||
poolExecutor.execute(scheduler);
|
||||
servers.add(scheduler);
|
||||
}
|
||||
|
||||
if (indexerEnabled) {
|
||||
IndexerInterface indexer = applicationContext.getBean(IndexerInterface.class);
|
||||
Indexer indexer = applicationContext.getBean(Indexer.class);
|
||||
poolExecutor.execute(indexer);
|
||||
servers.add(indexer);
|
||||
}
|
||||
@@ -8,7 +8,7 @@ import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
|
||||
import io.kestra.core.runners.FlowInputOutput;
|
||||
import io.kestra.core.runners.RunnerUtils;
|
||||
import io.kestra.core.runners.StandAloneRunner;
|
||||
import io.kestra.cli.StandAloneRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
@@ -72,7 +72,6 @@ public class FlowTestCommand extends AbstractApiCommand {
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
StandAloneRunner runner = applicationContext.getBean(StandAloneRunner.class);
|
||||
LocalFlowRepositoryLoader repositoryLoader = applicationContext.getBean(LocalFlowRepositoryLoader.class);
|
||||
FlowRepositoryInterface flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
|
||||
FlowInputOutput flowInputOutput = applicationContext.getBean(FlowInputOutput.class);
|
||||
@@ -89,7 +88,7 @@ public class FlowTestCommand extends AbstractApiCommand {
|
||||
inputs.put(this.inputs.get(i), this.inputs.get(i+1));
|
||||
}
|
||||
|
||||
try {
|
||||
try (StandAloneRunner runner = applicationContext.createBean(StandAloneRunner.class);){
|
||||
runner.run();
|
||||
repositoryLoader.load(tenantService.getTenantId(tenantId), file.toFile());
|
||||
|
||||
@@ -103,8 +102,6 @@ public class FlowTestCommand extends AbstractApiCommand {
|
||||
(flow, execution) -> flowInputOutput.readExecutionInputs(flow, execution, inputs),
|
||||
Duration.ofHours(1)
|
||||
);
|
||||
|
||||
runner.close();
|
||||
} catch (ConstraintViolationException e) {
|
||||
throw new CommandLine.ParameterException(this.spec.commandLine(), e.getMessage());
|
||||
} catch (IOException | TimeoutException e) {
|
||||
|
||||
@@ -3,7 +3,7 @@ package io.kestra.cli.commands.servers;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.runners.ExecutorInterface;
|
||||
import io.kestra.core.services.SkipExecutionService;
|
||||
import io.kestra.executor.SkipExecutionService;
|
||||
import io.kestra.core.services.StartExecutorService;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
|
||||
@@ -2,7 +2,7 @@ package io.kestra.cli.commands.servers;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.runners.IndexerInterface;
|
||||
import io.kestra.core.runners.Indexer;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -29,7 +29,7 @@ public class IndexerCommand extends AbstractServerCommand {
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
IndexerInterface indexer = applicationContext.getBean(IndexerInterface.class);
|
||||
Indexer indexer = applicationContext.getBean(Indexer.class);
|
||||
indexer.run();
|
||||
|
||||
Await.until(() -> !this.applicationContext.isRunning());
|
||||
|
||||
@@ -2,7 +2,7 @@ package io.kestra.cli.commands.servers;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.schedulers.AbstractScheduler;
|
||||
import io.kestra.scheduler.AbstractScheduler;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
@@ -6,8 +6,8 @@ import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.kestra.core.contexts.KestraContext;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
|
||||
import io.kestra.core.runners.StandAloneRunner;
|
||||
import io.kestra.core.services.SkipExecutionService;
|
||||
import io.kestra.cli.StandAloneRunner;
|
||||
import io.kestra.executor.SkipExecutionService;
|
||||
import io.kestra.core.services.StartExecutorService;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
@@ -109,26 +109,27 @@ public class StandAloneCommand extends AbstractServerCommand {
|
||||
}
|
||||
}
|
||||
|
||||
StandAloneRunner standAloneRunner = applicationContext.getBean(StandAloneRunner.class);
|
||||
try (StandAloneRunner standAloneRunner = applicationContext.getBean(StandAloneRunner.class)) {
|
||||
|
||||
if (this.workerThread == 0) {
|
||||
standAloneRunner.setWorkerEnabled(false);
|
||||
} else {
|
||||
standAloneRunner.setWorkerThread(this.workerThread);
|
||||
if (this.workerThread == 0) {
|
||||
standAloneRunner.setWorkerEnabled(false);
|
||||
} else {
|
||||
standAloneRunner.setWorkerThread(this.workerThread);
|
||||
}
|
||||
|
||||
if (this.indexerDisabled) {
|
||||
standAloneRunner.setIndexerEnabled(false);
|
||||
}
|
||||
|
||||
standAloneRunner.run();
|
||||
|
||||
if (fileWatcher != null) {
|
||||
fileWatcher.startListeningFromConfig();
|
||||
}
|
||||
|
||||
Await.until(() -> !this.applicationContext.isRunning());
|
||||
}
|
||||
|
||||
if (this.indexerDisabled) {
|
||||
standAloneRunner.setIndexerEnabled(false);
|
||||
}
|
||||
|
||||
standAloneRunner.run();
|
||||
|
||||
if (fileWatcher != null) {
|
||||
fileWatcher.startListeningFromConfig();
|
||||
}
|
||||
|
||||
Await.until(() -> !this.applicationContext.isRunning());
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ package io.kestra.cli.commands.servers;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.runners.IndexerInterface;
|
||||
import io.kestra.core.runners.Indexer;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.ExecutorsUtils;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
@@ -54,7 +54,7 @@ public class WebServerCommand extends AbstractServerCommand {
|
||||
if (!indexerDisabled) {
|
||||
log.info("Starting an embedded indexer, this can be disabled by using `--no-indexer`.");
|
||||
poolExecutor = executorsUtils.cachedThreadPool("webserver-indexer");
|
||||
poolExecutor.execute(applicationContext.getBean(IndexerInterface.class));
|
||||
poolExecutor.execute(applicationContext.getBean(Indexer.class));
|
||||
shutdownHook(false, () -> poolExecutor.shutdown());
|
||||
}
|
||||
|
||||
|
||||
@@ -262,6 +262,8 @@ public class FileChangedEventListener {
|
||||
}
|
||||
|
||||
private String getTenantIdFromPath(Path path) {
|
||||
// FIXME there is probably a bug here when a tenant has '_' in its name,
|
||||
// a valid tenant name is defined with following regex: "^[a-z0-9][a-z0-9_-]*"
|
||||
return path.getFileName().toString().split("_")[0];
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,10 @@ micronaut:
|
||||
root:
|
||||
paths: classpath:root
|
||||
mapping: /**
|
||||
codec:
|
||||
json:
|
||||
additional-types:
|
||||
- application/scim+json
|
||||
server:
|
||||
max-request-size: 10GB
|
||||
multipart:
|
||||
@@ -78,8 +82,19 @@ micronaut:
|
||||
type: scheduled
|
||||
core-pool-size: 1
|
||||
|
||||
# Disable OpenTelemetry metrics by default, users that need it must enable it and configure the collector URL.
|
||||
metrics:
|
||||
binders:
|
||||
retry:
|
||||
enabled: true
|
||||
netty:
|
||||
queues:
|
||||
enabled: true
|
||||
bytebuf-allocators:
|
||||
enabled: true
|
||||
channels:
|
||||
enabled: true
|
||||
|
||||
# Disable OpenTelemetry metrics by default, users that need it must enable it and configure the collector URL.
|
||||
export:
|
||||
otlp:
|
||||
enabled: false
|
||||
@@ -92,6 +107,8 @@ jackson:
|
||||
serialization-inclusion: non_null
|
||||
deserialization:
|
||||
FAIL_ON_UNKNOWN_PROPERTIES: false
|
||||
mapper:
|
||||
ACCEPT_CASE_INSENSITIVE_ENUMS: true
|
||||
|
||||
endpoints:
|
||||
all:
|
||||
@@ -100,6 +117,10 @@ endpoints:
|
||||
sensitive: false
|
||||
health:
|
||||
details-visible: ANONYMOUS
|
||||
disk-space:
|
||||
enabled: false
|
||||
discovery-client:
|
||||
enabled: false
|
||||
loggers:
|
||||
write-sensitive: false
|
||||
env:
|
||||
@@ -133,12 +154,44 @@ kestra:
|
||||
tutorial-flows:
|
||||
# Automatically loads all tutorial flows at startup.
|
||||
enabled: true
|
||||
|
||||
retries:
|
||||
attempts: 5
|
||||
multiplier: 2.0
|
||||
delay: 1s
|
||||
maxDelay: ""
|
||||
|
||||
server:
|
||||
basic-auth:
|
||||
# These URLs will not be authenticated, by default we open some of the Micronaut default endpoints but not all for security reasons
|
||||
open-urls:
|
||||
- "/ping"
|
||||
- "/api/v1/executions/webhook/"
|
||||
|
||||
preview:
|
||||
initial-rows: 100
|
||||
max-rows: 5000
|
||||
|
||||
# The expected time for this server to complete all its tasks before initiating a graceful shutdown.
|
||||
terminationGracePeriod: 5m
|
||||
workerTaskRestartStrategy: AFTER_TERMINATION_GRACE_PERIOD
|
||||
# Configuration for Liveness and Heartbeat mechanism between servers.
|
||||
liveness:
|
||||
enabled: true
|
||||
# The expected time between liveness probe.
|
||||
interval: 10s
|
||||
# The timeout used to detect service failures.
|
||||
timeout: 1m
|
||||
# The time to wait before executing a liveness probe.
|
||||
initialDelay: 1m
|
||||
# The expected time between service heartbeats.
|
||||
heartbeatInterval: 3s
|
||||
service:
|
||||
purge:
|
||||
initial-delay: 1h
|
||||
fixed-delay: 1d
|
||||
retention: 30d
|
||||
|
||||
jdbc:
|
||||
queues:
|
||||
min-poll-interval: 25ms
|
||||
@@ -150,7 +203,7 @@ kestra:
|
||||
fixed-delay: 1h
|
||||
retention: 7d
|
||||
types:
|
||||
- type : io.kestra.core.models.executions.LogEntry
|
||||
- type: io.kestra.core.models.executions.LogEntry
|
||||
retention: 1h
|
||||
- type: io.kestra.core.models.executions.MetricEntry
|
||||
retention: 1h
|
||||
@@ -182,37 +235,12 @@ kestra:
|
||||
traces:
|
||||
root: DISABLED
|
||||
|
||||
server:
|
||||
basic-auth:
|
||||
# These URLs will not be authenticated, by default we open some of the Micronaut default endpoints but not all for security reasons
|
||||
open-urls:
|
||||
- "/ping"
|
||||
- "/api/v1/executions/webhook/"
|
||||
preview:
|
||||
initial-rows: 100
|
||||
max-rows: 5000
|
||||
# The expected time for this server to complete all its tasks before initiating a graceful shutdown.
|
||||
terminationGracePeriod: 5m
|
||||
workerTaskRestartStrategy: AFTER_TERMINATION_GRACE_PERIOD
|
||||
# Configuration for Liveness and Heartbeat mechanism between servers.
|
||||
liveness:
|
||||
enabled: true
|
||||
# The expected time between liveness probe.
|
||||
interval: 10s
|
||||
# The timeout used to detect service failures.
|
||||
timeout: 1m
|
||||
# The time to wait before executing a liveness probe.
|
||||
initialDelay: 1m
|
||||
# The expected time between service heartbeats.
|
||||
heartbeatInterval: 3s
|
||||
service:
|
||||
purge:
|
||||
initial-delay: 1h
|
||||
fixed-delay: 1d
|
||||
retention: 30d
|
||||
ui-anonymous-usage-report:
|
||||
enabled: true
|
||||
|
||||
anonymous-usage-report:
|
||||
enabled: true
|
||||
uri: https://api.kestra.io/v1/server-events/
|
||||
uri: https://api.kestra.io/v1/reports/server-events
|
||||
initial-delay: 5m
|
||||
fixed-delay: 1h
|
||||
|
||||
|
||||
@@ -4,11 +4,11 @@ import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.GenericFlow;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.junitpioneer.jupiter.RetryingTest;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
@@ -19,7 +19,6 @@ import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static io.kestra.core.utils.Rethrow.throwRunnable;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@@ -57,10 +56,11 @@ class FileChangedEventListenerTest {
|
||||
}
|
||||
}
|
||||
|
||||
@RetryingTest(5) // Flaky on CI but always pass locally
|
||||
@Test
|
||||
void test() throws IOException, TimeoutException {
|
||||
var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getSimpleName(), "test");
|
||||
// remove the flow if it already exists
|
||||
flowRepository.findByIdWithSource(MAIN_TENANT, "io.kestra.tests.watch", "myflow").ifPresent(flow -> flowRepository.delete(flow));
|
||||
flowRepository.findByIdWithSource(tenant, "io.kestra.tests.watch", "myflow").ifPresent(flow -> flowRepository.delete(flow));
|
||||
|
||||
// create a basic flow
|
||||
String flow = """
|
||||
@@ -73,14 +73,14 @@ class FileChangedEventListenerTest {
|
||||
message: Hello World! 🚀
|
||||
""";
|
||||
|
||||
GenericFlow genericFlow = GenericFlow.fromYaml(MAIN_TENANT, flow);
|
||||
GenericFlow genericFlow = GenericFlow.fromYaml(tenant, flow);
|
||||
Files.write(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"), flow.getBytes());
|
||||
Await.until(
|
||||
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").isPresent(),
|
||||
() -> flowRepository.findById(tenant, "io.kestra.tests.watch", "myflow").isPresent(),
|
||||
Duration.ofMillis(100),
|
||||
Duration.ofSeconds(10)
|
||||
);
|
||||
Flow myflow = flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").orElseThrow();
|
||||
Flow myflow = flowRepository.findById(tenant, "io.kestra.tests.watch", "myflow").orElseThrow();
|
||||
assertThat(myflow.getTasks()).hasSize(1);
|
||||
assertThat(myflow.getTasks().getFirst().getId()).isEqualTo("hello");
|
||||
assertThat(myflow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
|
||||
@@ -88,16 +88,17 @@ class FileChangedEventListenerTest {
|
||||
// delete the flow
|
||||
Files.delete(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"));
|
||||
Await.until(
|
||||
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").isEmpty(),
|
||||
() -> flowRepository.findById(tenant, "io.kestra.tests.watch", "myflow").isEmpty(),
|
||||
Duration.ofMillis(100),
|
||||
Duration.ofSeconds(10)
|
||||
);
|
||||
}
|
||||
|
||||
@RetryingTest(5) // Flaky on CI but always pass locally
|
||||
@Test
|
||||
void testWithPluginDefault() throws IOException, TimeoutException {
|
||||
var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getName(), "testWithPluginDefault");
|
||||
// remove the flow if it already exists
|
||||
flowRepository.findByIdWithSource(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").ifPresent(flow -> flowRepository.delete(flow));
|
||||
flowRepository.findByIdWithSource(tenant, "io.kestra.tests.watch", "pluginDefault").ifPresent(flow -> flowRepository.delete(flow));
|
||||
|
||||
// create a flow with plugin default
|
||||
String pluginDefault = """
|
||||
@@ -113,14 +114,14 @@ class FileChangedEventListenerTest {
|
||||
values:
|
||||
message: Hello World!
|
||||
""";
|
||||
GenericFlow genericFlow = GenericFlow.fromYaml(MAIN_TENANT, pluginDefault);
|
||||
GenericFlow genericFlow = GenericFlow.fromYaml(tenant, pluginDefault);
|
||||
Files.write(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"), pluginDefault.getBytes());
|
||||
Await.until(
|
||||
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").isPresent(),
|
||||
() -> flowRepository.findById(tenant, "io.kestra.tests.watch", "pluginDefault").isPresent(),
|
||||
Duration.ofMillis(100),
|
||||
Duration.ofSeconds(10)
|
||||
);
|
||||
Flow pluginDefaultFlow = flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").orElseThrow();
|
||||
Flow pluginDefaultFlow = flowRepository.findById(tenant, "io.kestra.tests.watch", "pluginDefault").orElseThrow();
|
||||
assertThat(pluginDefaultFlow.getTasks()).hasSize(1);
|
||||
assertThat(pluginDefaultFlow.getTasks().getFirst().getId()).isEqualTo("helloWithDefault");
|
||||
assertThat(pluginDefaultFlow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
|
||||
@@ -128,7 +129,7 @@ class FileChangedEventListenerTest {
|
||||
// delete both files
|
||||
Files.delete(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"));
|
||||
Await.until(
|
||||
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").isEmpty(),
|
||||
() -> flowRepository.findById(tenant, "io.kestra.tests.watch", "pluginDefault").isEmpty(),
|
||||
Duration.ofMillis(100),
|
||||
Duration.ofSeconds(10)
|
||||
);
|
||||
|
||||
@@ -42,10 +42,10 @@ dependencies {
|
||||
|
||||
// plugins
|
||||
implementation 'org.apache.maven.resolver:maven-resolver-impl'
|
||||
implementation 'org.apache.maven.resolver:maven-resolver-supplier'
|
||||
implementation 'org.apache.maven.resolver:maven-resolver-supplier-mvn3'
|
||||
implementation 'org.apache.maven.resolver:maven-resolver-connector-basic'
|
||||
implementation 'org.apache.maven.resolver:maven-resolver-transport-file'
|
||||
implementation 'org.apache.maven.resolver:maven-resolver-transport-http'
|
||||
implementation 'org.apache.maven.resolver:maven-resolver-transport-apache'
|
||||
|
||||
// scheduler
|
||||
implementation group: 'com.cronutils', name: 'cron-utils'
|
||||
@@ -63,6 +63,10 @@ dependencies {
|
||||
exclude group: 'com.fasterxml.jackson.core'
|
||||
}
|
||||
|
||||
// micrometer
|
||||
implementation "io.micronaut.micrometer:micronaut-micrometer-observation"
|
||||
implementation 'io.micrometer:micrometer-java21'
|
||||
|
||||
// test
|
||||
testAnnotationProcessor project(':processor')
|
||||
testImplementation project(':tests')
|
||||
@@ -70,6 +74,9 @@ dependencies {
|
||||
testImplementation project(':repository-memory')
|
||||
testImplementation project(':runner-memory')
|
||||
testImplementation project(':storage-local')
|
||||
testImplementation project(':worker')
|
||||
testImplementation project(':scheduler')
|
||||
testImplementation project(':executor')
|
||||
|
||||
testImplementation "io.micronaut:micronaut-http-client"
|
||||
testImplementation "io.micronaut:micronaut-http-server-netty"
|
||||
|
||||
@@ -3,30 +3,88 @@ package io.kestra.core.events;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
import io.micronaut.http.context.ServerRequestContext;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
|
||||
@AllArgsConstructor
|
||||
import java.util.Objects;
|
||||
|
||||
@Getter
|
||||
public class CrudEvent<T> {
|
||||
T model;
|
||||
private final T model;
|
||||
@Nullable
|
||||
T previousModel;
|
||||
CrudEventType type;
|
||||
HttpRequest<?> request;
|
||||
|
||||
private final T previousModel;
|
||||
private final CrudEventType type;
|
||||
private final HttpRequest<?> request;
|
||||
|
||||
/**
|
||||
* Static helper method for creating a new {@link CrudEventType#UPDATE} CrudEvent.
|
||||
*
|
||||
* @param model the new created model.
|
||||
* @param <T> type of the model.
|
||||
* @return the new {@link CrudEvent}.
|
||||
*/
|
||||
public static <T> CrudEvent<T> create(T model) {
|
||||
Objects.requireNonNull(model, "Can't create CREATE event with a null model");
|
||||
return new CrudEvent<>(model, null, CrudEventType.CREATE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Static helper method for creating a new {@link CrudEventType#DELETE} CrudEvent.
|
||||
*
|
||||
* @param model the deleted model.
|
||||
* @param <T> type of the model.
|
||||
* @return the new {@link CrudEvent}.
|
||||
*/
|
||||
public static <T> CrudEvent<T> delete(T model) {
|
||||
Objects.requireNonNull(model, "Can't create DELETE event with a null model");
|
||||
return new CrudEvent<>(null, model, CrudEventType.DELETE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Static helper method for creating a new CrudEvent.
|
||||
*
|
||||
* @param before the model before the update.
|
||||
* @param after the model after the update.
|
||||
* @param <T> type of the model.
|
||||
* @return the new {@link CrudEvent}.
|
||||
*/
|
||||
public static <T> CrudEvent<T> of(T before, T after) {
|
||||
|
||||
if (before == null && after == null) {
|
||||
throw new IllegalArgumentException("Both before and after cannot be null");
|
||||
}
|
||||
|
||||
if (before == null) {
|
||||
return create(after);
|
||||
}
|
||||
|
||||
if (after == null) {
|
||||
return delete(before);
|
||||
}
|
||||
|
||||
return new CrudEvent<>(after, before, CrudEventType.UPDATE);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated use the static factory methods.
|
||||
*/
|
||||
@Deprecated
|
||||
public CrudEvent(T model, CrudEventType type) {
|
||||
this.model = model;
|
||||
this.type = type;
|
||||
this.previousModel = null;
|
||||
this.request = ServerRequestContext.currentRequest().orElse(null);
|
||||
this(
|
||||
CrudEventType.DELETE.equals(type) ? null : model,
|
||||
CrudEventType.DELETE.equals(type) ? model : null,
|
||||
type,
|
||||
ServerRequestContext.currentRequest().orElse(null)
|
||||
);
|
||||
}
|
||||
|
||||
public CrudEvent(T model, T previousModel, CrudEventType type) {
|
||||
this(model, previousModel, type, ServerRequestContext.currentRequest().orElse(null));
|
||||
}
|
||||
|
||||
public CrudEvent(T model, T previousModel, CrudEventType type, HttpRequest<?> request) {
|
||||
this.model = model;
|
||||
this.previousModel = previousModel;
|
||||
this.type = type;
|
||||
this.request = ServerRequestContext.currentRequest().orElse(null);
|
||||
this.request = request;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -6,16 +6,24 @@ import io.kestra.core.http.HttpRequest;
|
||||
import io.kestra.core.http.HttpResponse;
|
||||
import io.kestra.core.http.client.apache.*;
|
||||
import io.kestra.core.http.client.configurations.HttpConfiguration;
|
||||
import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.micrometer.common.KeyValues;
|
||||
import io.micrometer.core.instrument.binder.httpcomponents.hc5.ApacheHttpClientContext;
|
||||
import io.micrometer.core.instrument.binder.httpcomponents.hc5.DefaultApacheHttpClientObservationConvention;
|
||||
import io.micrometer.core.instrument.binder.httpcomponents.hc5.ObservationExecChainHandler;
|
||||
import io.micrometer.observation.ObservationRegistry;
|
||||
import io.micronaut.http.MediaType;
|
||||
import jakarta.annotation.Nullable;
|
||||
import lombok.Builder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.ArrayUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hc.client5.http.ContextBuilder;
|
||||
import org.apache.hc.client5.http.auth.*;
|
||||
import org.apache.hc.client5.http.config.ConnectionConfig;
|
||||
import org.apache.hc.client5.http.impl.ChainElement;
|
||||
import org.apache.hc.client5.http.impl.DefaultAuthenticationStrategy;
|
||||
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
|
||||
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
|
||||
@@ -30,7 +38,6 @@ import org.apache.hc.core5.http.io.HttpClientResponseHandler;
|
||||
import org.apache.hc.core5.http.io.entity.EntityUtils;
|
||||
import org.apache.hc.core5.ssl.SSLContexts;
|
||||
import org.apache.hc.core5.util.Timeout;
|
||||
import org.codehaus.plexus.util.StringUtils;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
@@ -50,11 +57,16 @@ public class HttpClient implements Closeable {
|
||||
private transient CloseableHttpClient client;
|
||||
private final RunContext runContext;
|
||||
private final HttpConfiguration configuration;
|
||||
private ObservationRegistry observationRegistry;
|
||||
|
||||
@Builder
|
||||
public HttpClient(RunContext runContext, @Nullable HttpConfiguration configuration) throws IllegalVariableEvaluationException {
|
||||
this.runContext = runContext;
|
||||
this.configuration = configuration == null ? HttpConfiguration.builder().build() : configuration;
|
||||
if (runContext instanceof DefaultRunContext defaultRunContext) {
|
||||
this.observationRegistry = defaultRunContext.getApplicationContext().findBean(ObservationRegistry.class).orElse(null);
|
||||
}
|
||||
|
||||
this.client = this.createClient();
|
||||
}
|
||||
|
||||
@@ -67,6 +79,13 @@ public class HttpClient implements Closeable {
|
||||
.disableDefaultUserAgent()
|
||||
.setUserAgent("Kestra");
|
||||
|
||||
if (observationRegistry != null) {
|
||||
// micrometer, must be placed before the retry strategy (see https://docs.micrometer.io/micrometer/reference/reference/httpcomponents.html#_retry_strategy_considerations)
|
||||
builder.addExecInterceptorAfter(ChainElement.RETRY.name(), "micrometer",
|
||||
new ObservationExecChainHandler(observationRegistry, new CustomApacheHttpClientObservationConvention())
|
||||
);
|
||||
}
|
||||
|
||||
// logger
|
||||
if (this.configuration.getLogs() != null && this.configuration.getLogs().length > 0) {
|
||||
if (ArrayUtils.contains(this.configuration.getLogs(), HttpConfiguration.LoggingType.REQUEST_HEADERS) ||
|
||||
@@ -297,4 +316,14 @@ public class HttpClient implements Closeable {
|
||||
this.client.close();
|
||||
}
|
||||
}
|
||||
|
||||
public static class CustomApacheHttpClientObservationConvention extends DefaultApacheHttpClientObservationConvention {
|
||||
@Override
|
||||
public KeyValues getLowCardinalityKeyValues(ApacheHttpClientContext context) {
|
||||
return KeyValues.concat(
|
||||
super.getLowCardinalityKeyValues(context),
|
||||
KeyValues.of("type", "core-client")
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,34 @@
|
||||
package io.kestra.core.metrics;
|
||||
|
||||
import io.micrometer.core.instrument.binder.jvm.JvmThreadDeadlockMetrics;
|
||||
import io.micrometer.java21.instrument.binder.jdk.VirtualThreadMetrics;
|
||||
import io.micronaut.configuration.metrics.annotation.RequiresMetrics;
|
||||
import io.micronaut.context.annotation.Bean;
|
||||
import io.micronaut.context.annotation.Factory;
|
||||
import io.micronaut.context.annotation.Primary;
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import static io.micronaut.configuration.metrics.micrometer.MeterRegistryFactory.MICRONAUT_METRICS_BINDERS;
|
||||
import static io.micronaut.core.util.StringUtils.FALSE;
|
||||
|
||||
@Factory
|
||||
@RequiresMetrics
|
||||
|
||||
public class MeterRegistryBinderFactory {
|
||||
@Bean
|
||||
@Primary
|
||||
@Singleton
|
||||
@Requires(property = MICRONAUT_METRICS_BINDERS + ".jvm.enabled", notEquals = FALSE)
|
||||
public VirtualThreadMetrics virtualThreadMetrics() {
|
||||
return new VirtualThreadMetrics();
|
||||
}
|
||||
|
||||
@Bean
|
||||
@Primary
|
||||
@Singleton
|
||||
@Requires(property = MICRONAUT_METRICS_BINDERS + ".jvm.enabled", notEquals = FALSE)
|
||||
public JvmThreadDeadlockMetrics threadDeadlockMetricsMetrics() {
|
||||
return new JvmThreadDeadlockMetrics();
|
||||
}
|
||||
}
|
||||
@@ -6,7 +6,6 @@ import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
import io.kestra.core.runners.*;
|
||||
import io.kestra.core.schedulers.SchedulerExecutionWithTrigger;
|
||||
import io.micrometer.core.instrument.*;
|
||||
import io.micrometer.core.instrument.binder.MeterBinder;
|
||||
import io.micrometer.core.instrument.search.Search;
|
||||
@@ -395,19 +394,6 @@ public class MetricRegistry {
|
||||
return triggerContext.getTenantId() == null ? baseTags : ArrayUtils.addAll(baseTags, TAG_TENANT_ID, triggerContext.getTenantId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Return tags for current {@link SchedulerExecutionWithTrigger}.
|
||||
*
|
||||
* @param schedulerExecutionWithTrigger the current SchedulerExecutionWithTrigger
|
||||
* @return tags to apply to metrics
|
||||
*/
|
||||
public String[] tags(SchedulerExecutionWithTrigger schedulerExecutionWithTrigger, String... tags) {
|
||||
return ArrayUtils.addAll(
|
||||
this.tags(schedulerExecutionWithTrigger.getExecution()),
|
||||
tags
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return tags for current {@link ExecutionKilled}
|
||||
*
|
||||
|
||||
@@ -1,16 +1,33 @@
|
||||
package io.kestra.core.models;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Interface that can be implemented by classes supporting plugin versioning.
|
||||
*
|
||||
* @see Plugin
|
||||
*/
|
||||
public interface PluginVersioning {
|
||||
|
||||
String TITLE = "Plugin Version";
|
||||
String DESCRIPTION = """
|
||||
Defines the version of the plugin to use.
|
||||
|
||||
@Pattern(regexp="\\d+\\.\\d+\\.\\d+(-[a-zA-Z0-9-]+)?|([a-zA-Z0-9]+)")
|
||||
@Schema(title = "The version of the plugin to use.")
|
||||
The version must follow the Semantic Versioning (SemVer) specification:
|
||||
- A single-digit MAJOR version (e.g., `1`).
|
||||
- A MAJOR.MINOR version (e.g., `1.1`).
|
||||
- A MAJOR.MINOR.PATCH version, optionally with any qualifier
|
||||
(e.g., `1.1.2`, `1.1.0-SNAPSHOT`).
|
||||
""";
|
||||
|
||||
@Schema(
|
||||
title = TITLE,
|
||||
description = DESCRIPTION
|
||||
)
|
||||
String getVersion();
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@ package io.kestra.core.models.triggers.multipleflows;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.triggers.TimeWindow;
|
||||
import io.kestra.core.utils.Rethrow;
|
||||
import org.slf4j.Logger;
|
||||
@@ -24,7 +23,7 @@ public interface MultipleCondition extends Rethrow.PredicateChecked<ConditionCon
|
||||
|
||||
/**
|
||||
* This conditions will only validate previously calculated value on
|
||||
* {@link io.kestra.core.services.FlowTriggerService#computeExecutionsFromFlowTriggers(Execution, List, Optional)}} and {@link MultipleConditionStorageInterface#save(List)} by the executor.
|
||||
* io.kestra.executor.FlowTriggerService#computeExecutionsFromFlowTriggers(Execution, List, Optional) and {@link MultipleConditionStorageInterface#save(List)} by the executor.
|
||||
* The real validation is done here.
|
||||
*/
|
||||
@Override
|
||||
|
||||
@@ -43,7 +43,7 @@ public class DefaultPluginRegistry implements PluginRegistry {
|
||||
static final DefaultPluginRegistry INSTANCE = new DefaultPluginRegistry();
|
||||
}
|
||||
|
||||
private final Map<PluginIdentifier, PluginClassAndMetadata<? extends Plugin>> pluginClassByIdentifier = new ConcurrentHashMap<>();
|
||||
protected final Map<PluginIdentifier, PluginClassAndMetadata<? extends Plugin>> pluginClassByIdentifier = new ConcurrentHashMap<>();
|
||||
private final Map<PluginBundleIdentifier, RegisteredPlugin> plugins = new ConcurrentHashMap<>();
|
||||
private final PluginScanner scanner = new PluginScanner(DefaultPluginRegistry.class.getClassLoader());
|
||||
private final AtomicBoolean initialized = new AtomicBoolean(false);
|
||||
@@ -56,7 +56,7 @@ public class DefaultPluginRegistry implements PluginRegistry {
|
||||
*
|
||||
* @return the {@link DefaultPluginRegistry}.
|
||||
*/
|
||||
public static DefaultPluginRegistry getOrCreate() {
|
||||
public synchronized static DefaultPluginRegistry getOrCreate() {
|
||||
DefaultPluginRegistry instance = LazyHolder.INSTANCE;
|
||||
if (!instance.isInitialized()) {
|
||||
instance.init();
|
||||
@@ -74,7 +74,7 @@ public class DefaultPluginRegistry implements PluginRegistry {
|
||||
/**
|
||||
* Initializes the registry by loading all core plugins.
|
||||
*/
|
||||
protected void init() {
|
||||
protected synchronized void init() {
|
||||
if (initialized.compareAndSet(false, true)) {
|
||||
register(scanner.scan());
|
||||
}
|
||||
@@ -103,11 +103,13 @@ public class DefaultPluginRegistry implements PluginRegistry {
|
||||
*/
|
||||
@Override
|
||||
public void registerIfAbsent(final Path pluginPath) {
|
||||
long start = System.currentTimeMillis();
|
||||
if (isPluginPathValid(pluginPath) && !isPluginPathScanned(pluginPath)) {
|
||||
List<RegisteredPlugin> scanned = scanner.scan(pluginPath);
|
||||
scanned.forEach(this::register);
|
||||
scannedPluginPaths.add(pluginPath);
|
||||
}
|
||||
log.debug("Registered if absent plugins from path {} in {} ms", pluginPath, System.currentTimeMillis() - start);
|
||||
}
|
||||
|
||||
private boolean isPluginPathScanned(final Path pluginPath) {
|
||||
@@ -119,10 +121,12 @@ public class DefaultPluginRegistry implements PluginRegistry {
|
||||
*/
|
||||
@Override
|
||||
public void register(final Path pluginPath) {
|
||||
long start = System.currentTimeMillis();
|
||||
if (isPluginPathValid(pluginPath)) {
|
||||
List<RegisteredPlugin> scanned = scanner.scan(pluginPath);
|
||||
scanned.forEach(this::register);
|
||||
}
|
||||
log.debug("Registered plugins from path {} in {} ms", pluginPath, System.currentTimeMillis() - start);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -191,21 +195,28 @@ public class DefaultPluginRegistry implements PluginRegistry {
|
||||
*/
|
||||
public void register(final RegisteredPlugin plugin) {
|
||||
final PluginBundleIdentifier identifier = PluginBundleIdentifier.of(plugin);
|
||||
|
||||
// Skip registration if plugin-bundle already exists in the registry.
|
||||
if (containsPluginBundle(identifier)) {
|
||||
return;
|
||||
// Skip registration if the same plugin already exists in the registry.
|
||||
final RegisteredPlugin existing = plugins.get(identifier);
|
||||
if (existing != null && existing.crc32() == plugin.crc32()) {
|
||||
return; // same plugin already registered
|
||||
}
|
||||
|
||||
lock.lock();
|
||||
try {
|
||||
if (existing != null) {
|
||||
unregister(List.of(existing));
|
||||
}
|
||||
plugins.put(PluginBundleIdentifier.of(plugin), plugin);
|
||||
pluginClassByIdentifier.putAll(getPluginClassesByIdentifier(plugin));
|
||||
registerAll(getPluginClassesByIdentifier(plugin));
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
protected void registerAll(Map<PluginIdentifier, PluginClassAndMetadata<? extends Plugin>> plugins) {
|
||||
pluginClassByIdentifier.putAll(plugins);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected Map<PluginIdentifier, PluginClassAndMetadata<? extends Plugin>> getPluginClassesByIdentifier(final RegisteredPlugin plugin) {
|
||||
Map<PluginIdentifier, PluginClassAndMetadata<? extends Plugin>> classes = new HashMap<>();
|
||||
|
||||
@@ -6,6 +6,12 @@ import lombok.Getter;
|
||||
import lombok.ToString;
|
||||
|
||||
import java.net.URL;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Enumeration;
|
||||
import java.util.jar.JarEntry;
|
||||
import java.util.jar.JarFile;
|
||||
import java.util.zip.CRC32;
|
||||
|
||||
@AllArgsConstructor
|
||||
@Getter
|
||||
@@ -14,4 +20,59 @@ import java.net.URL;
|
||||
public class ExternalPlugin {
|
||||
private final URL location;
|
||||
private final URL[] resources;
|
||||
private volatile Long crc32; // lazy-val
|
||||
|
||||
public ExternalPlugin(URL location, URL[] resources) {
|
||||
this.location = location;
|
||||
this.resources = resources;
|
||||
}
|
||||
|
||||
public Long getCrc32() {
|
||||
if (this.crc32 == null) {
|
||||
synchronized (this) {
|
||||
if (this.crc32 == null) {
|
||||
this.crc32 = computeJarCrc32(location);
|
||||
}
|
||||
}
|
||||
}
|
||||
return crc32;
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute a CRC32 of the JAR File without reading the whole file
|
||||
*
|
||||
* @param location of the JAR File.
|
||||
* @return the CRC32 of {@code -1} if the checksum can't be computed.
|
||||
*/
|
||||
private static long computeJarCrc32(final URL location) {
|
||||
CRC32 crc = new CRC32();
|
||||
try (JarFile jar = new JarFile(location.toURI().getPath(), false)) {
|
||||
Enumeration<JarEntry> entries = jar.entries();
|
||||
byte[] buffer = new byte[Long.BYTES]; // reusable buffer to avoid re-allocation
|
||||
|
||||
while (entries.hasMoreElements()) {
|
||||
JarEntry entry = entries.nextElement();
|
||||
crc.update(entry.getName().getBytes(StandardCharsets.UTF_8));
|
||||
updateCrc32WithLong(crc, buffer, entry.getSize());
|
||||
updateCrc32WithLong(crc, buffer, entry.getCrc());
|
||||
}
|
||||
|
||||
return crc.getValue();
|
||||
} catch (Exception e) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
private static void updateCrc32WithLong(CRC32 crc32, byte[] reusable, long val) {
|
||||
// fast long -> byte conversion
|
||||
reusable[0] = (byte) (val >>> 56);
|
||||
reusable[1] = (byte) (val >>> 48);
|
||||
reusable[2] = (byte) (val >>> 40);
|
||||
reusable[3] = (byte) (val >>> 32);
|
||||
reusable[4] = (byte) (val >>> 24);
|
||||
reusable[5] = (byte) (val >>> 16);
|
||||
reusable[6] = (byte) (val >>> 8);
|
||||
reusable[7] = (byte) val;
|
||||
crc32.update(reusable);;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,6 +46,7 @@ public class PluginClassLoader extends URLClassLoader {
|
||||
+ "|dev.failsafe"
|
||||
+ "|reactor"
|
||||
+ "|io.opentelemetry"
|
||||
+ "|io.netty"
|
||||
+ ")\\..*$");
|
||||
|
||||
private final ClassLoader parent;
|
||||
|
||||
@@ -2,10 +2,14 @@ package io.kestra.core.plugins;
|
||||
|
||||
import io.kestra.core.models.Plugin;
|
||||
|
||||
import java.net.URL;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.zip.CRC32;
|
||||
import java.util.zip.Checksum;
|
||||
|
||||
/**
|
||||
* Registry for managing all Kestra's {@link Plugin}.
|
||||
@@ -123,4 +127,24 @@ public interface PluginRegistry {
|
||||
* @return {@code true} if supported. Otherwise {@code false}.
|
||||
*/
|
||||
boolean isVersioningSupported();
|
||||
|
||||
/**
|
||||
* Computes a CRC32 hash value representing the current content of the plugin registry.
|
||||
*
|
||||
* @return a {@code long} containing the CRC32 checksum value, serving as a compact
|
||||
* representation of the registry's content
|
||||
*/
|
||||
default long hash() {
|
||||
Checksum crc32 = new CRC32();
|
||||
|
||||
for (RegisteredPlugin plugin : plugins()) {
|
||||
Optional.ofNullable(plugin.getExternalPlugin())
|
||||
.map(ExternalPlugin::getCrc32)
|
||||
.ifPresent(checksum -> {
|
||||
byte[] bytes = ByteBuffer.allocate(Long.BYTES).putLong(checksum).array();
|
||||
crc32.update(bytes, 0, bytes.length);
|
||||
});
|
||||
}
|
||||
return crc32.getValue();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,11 +5,15 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import java.io.IOException;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.file.DirectoryStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.InvalidPathException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.*;
|
||||
import java.util.jar.JarEntry;
|
||||
import java.util.jar.JarFile;
|
||||
import java.util.zip.CRC32;
|
||||
|
||||
@Slf4j
|
||||
public class PluginResolver {
|
||||
@@ -119,4 +123,5 @@ public class PluginResolver {
|
||||
|
||||
return urls;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -308,6 +308,10 @@ public class RegisteredPlugin {
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public long crc32() {
|
||||
return Optional.ofNullable(externalPlugin).map(ExternalPlugin::getCrc32).orElse(-1L);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
|
||||
@@ -83,7 +83,9 @@ public class LocalFlowRepositoryLoader {
|
||||
}
|
||||
|
||||
public void load(String tenantId, File basePath) throws IOException {
|
||||
Map<String, FlowInterface> flowByUidInRepository = flowRepository.findAllForAllTenants().stream()
|
||||
Map<String, FlowInterface> flowByUidInRepository = flowRepository.findAllForAllTenants()
|
||||
.stream()
|
||||
.filter(flow -> tenantId.equals(flow.getTenantId()))
|
||||
.collect(Collectors.toMap(FlowId::uidWithoutRevision, Function.identity()));
|
||||
|
||||
try (Stream<Path> pathStream = Files.walk(basePath.toPath())) {
|
||||
|
||||
@@ -11,6 +11,10 @@ import lombok.Getter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
// TODO for 2.0: this class is used as a queue consumer (which should have been the ExecutorInterface instead),
|
||||
// a queue message (only in Kafka) and an execution context.
|
||||
// At some point, we should rename it to ExecutorContext and move it to the executor module,
|
||||
// then rename the ExecutorInterface to just Executor (to be used as a queue consumer)
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public class Executor {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import java.io.Closeable;
|
||||
import io.kestra.core.server.Service;
|
||||
|
||||
public interface ExecutorInterface extends Closeable, Runnable {
|
||||
public interface ExecutorInterface extends Service, Runnable {
|
||||
|
||||
}
|
||||
|
||||
@@ -382,6 +382,15 @@ public class FlowInputOutput {
|
||||
.stream()
|
||||
.collect(HashMap::new, (m, v) -> m.put(v.getKey(), v.getValue().value()), HashMap::putAll)
|
||||
);
|
||||
// Hack: Pre-inject all inputs that have a default value with 'null' to prevent
|
||||
// RunContextFactory from attempting to render them when absent, which could
|
||||
// otherwise cause an exception if a Pebble expression is involved.
|
||||
List<Input<?>> inputs = Optional.ofNullable(flow).map(FlowInterface::getInputs).orElse(List.of());
|
||||
for (Input<?> input : inputs) {
|
||||
if (input.getDefaults() != null && !flattenInputs.containsKey(input.getId())) {
|
||||
flattenInputs.put(input.getId(), null);
|
||||
}
|
||||
}
|
||||
return runContextFactory.of(flow, execution, vars -> vars.withInputs(flattenInputs));
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
package io.kestra.core.runners;
|
||||
// NOTE: this class is not used anymore but must be kept as it is used in as queue consumer both in JDBC and Kafka
|
||||
public class Indexer {
|
||||
|
||||
import io.kestra.core.server.Service;
|
||||
|
||||
public interface Indexer extends Service, Runnable {
|
||||
|
||||
}
|
||||
|
||||
@@ -20,9 +20,11 @@ import io.kestra.core.queues.QueueInterface;
|
||||
import jakarta.annotation.Nullable;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.*;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Instant;
|
||||
import java.util.*;
|
||||
import java.util.function.Supplier;
|
||||
@@ -142,8 +144,9 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
||||
}
|
||||
|
||||
public void usedSecret(String secret) {
|
||||
if (secret != null) {
|
||||
if (secret != null && !secret.isEmpty()) {
|
||||
this.useSecrets.add(secret);
|
||||
this.useSecrets.add(Base64.getEncoder().encodeToString(secret.getBytes(StandardCharsets.UTF_8)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.flows.input.SecretInput;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.property.PropertyContext;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
@@ -282,15 +283,15 @@ public final class RunVariables {
|
||||
|
||||
if (flow != null && flow.getInputs() != null) {
|
||||
// we add default inputs value from the flow if not already set, this will be useful for triggers
|
||||
flow.getInputs().stream()
|
||||
.filter(input -> input.getDefaults() != null && !inputs.containsKey(input.getId()))
|
||||
.forEach(input -> {
|
||||
try {
|
||||
inputs.put(input.getId(), FlowInputOutput.resolveDefaultValue(input, propertyContext));
|
||||
} catch (IllegalVariableEvaluationException e) {
|
||||
throw new RuntimeException("Unable to inject default value for input '" + input.getId() + "'", e);
|
||||
}
|
||||
});
|
||||
flow.getInputs().stream()
|
||||
.filter(input -> input.getDefaults() != null && !inputs.containsKey(input.getId()))
|
||||
.forEach(input -> {
|
||||
try {
|
||||
inputs.put(input.getId(), FlowInputOutput.resolveDefaultValue(input, propertyContext));
|
||||
} catch (IllegalVariableEvaluationException e) {
|
||||
// Silent catch, if an input depends on another input, or a variable that is populated at runtime / input filling time, we can't resolve it here.
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (!inputs.isEmpty()) {
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
public interface RunnerInterface {
|
||||
void run();
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
package io.kestra.core.schedulers;
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
@@ -2,6 +2,5 @@ package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.server.Service;
|
||||
|
||||
public interface IndexerInterface extends Service, Runnable {
|
||||
|
||||
public interface Scheduler extends Service, Runnable {
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
package io.kestra.core.schedulers;
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
File diff suppressed because it is too large
Load Diff
@@ -5,6 +5,8 @@ import io.kestra.core.http.HttpRequest;
|
||||
import io.kestra.core.http.HttpResponse;
|
||||
import io.kestra.core.http.client.HttpClient;
|
||||
import io.kestra.core.http.client.HttpClientException;
|
||||
import io.kestra.core.http.client.HttpClientRequestException;
|
||||
import io.kestra.core.http.client.HttpClientResponseException;
|
||||
import io.kestra.core.http.client.configurations.HttpConfiguration;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
@@ -101,8 +103,15 @@ public class HttpFunction<T> implements Function {
|
||||
try (HttpClient httpClient = new HttpClient(runContext, httpConfiguration)) {
|
||||
HttpResponse<Object> response = httpClient.request(httpRequest, Object.class);
|
||||
return response.getBody();
|
||||
} catch (HttpClientException | IllegalVariableEvaluationException | IOException e) {
|
||||
throw new PebbleException(e, "Unable to execute HTTP request", lineNumber, self.getName());
|
||||
} catch (HttpClientResponseException e) {
|
||||
if (e.getResponse() != null) {
|
||||
String msg = "Failed to execute HTTP Request, server respond with status " + e.getResponse().getStatus().getCode() + " : " + e.getResponse().getStatus().getReason();
|
||||
throw new PebbleException(e, msg , lineNumber, self.getName());
|
||||
} else {
|
||||
throw new PebbleException( e, "Failed to execute HTTP request ", lineNumber, self.getName());
|
||||
}
|
||||
} catch(HttpClientException | IllegalVariableEvaluationException | IOException e ) {
|
||||
throw new PebbleException( e, "Failed to execute HTTP request ", lineNumber, self.getName());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,9 +0,0 @@
|
||||
package io.kestra.core.schedulers;
|
||||
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@SuppressWarnings("try")
|
||||
@Singleton
|
||||
public interface Scheduler extends Runnable, AutoCloseable {
|
||||
|
||||
}
|
||||
@@ -180,23 +180,13 @@ public final class FileSerde {
|
||||
}
|
||||
|
||||
private static <T> MappingIterator<T> createMappingIterator(ObjectMapper objectMapper, Reader reader, TypeReference<T> type) throws IOException {
|
||||
// See https://github.com/FasterXML/jackson-dataformats-binary/issues/493
|
||||
// There is a limitation with the MappingIterator that cannot differentiate between an array of things (of whatever shape)
|
||||
// and a sequence/stream of things (of Array shape).
|
||||
// To work around that, we need to create a JsonParser and advance to the first token.
|
||||
try (var parser = objectMapper.createParser(reader)) {
|
||||
parser.nextToken();
|
||||
return objectMapper.readerFor(type).readValues(parser);
|
||||
}
|
||||
}
|
||||
|
||||
private static <T> MappingIterator<T> createMappingIterator(ObjectMapper objectMapper, Reader reader, Class<T> type) throws IOException {
|
||||
// See https://github.com/FasterXML/jackson-dataformats-binary/issues/493
|
||||
// There is a limitation with the MappingIterator that cannot differentiate between an array of things (of whatever shape)
|
||||
// and a sequence/stream of things (of Array shape).
|
||||
// To work around that, we need to create a JsonParser and advance to the first token.
|
||||
try (var parser = objectMapper.createParser(reader)) {
|
||||
parser.nextToken();
|
||||
return objectMapper.readerFor(type).readValues(parser);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -172,22 +172,19 @@ public final class JacksonMapper {
|
||||
|
||||
return Pair.of(patchPrevToNew, patchNewToPrev);
|
||||
}
|
||||
|
||||
public static String applyPatches(Object object, List<JsonNode> patches) throws JsonProcessingException {
|
||||
|
||||
public static JsonNode applyPatchesOnJsonNode(JsonNode jsonObject, List<JsonNode> patches) {
|
||||
for (JsonNode patch : patches) {
|
||||
try {
|
||||
// Required for ES
|
||||
if (patch.findValue("value") == null) {
|
||||
((ObjectNode) patch.get(0)).set("value", (JsonNode) null);
|
||||
((ObjectNode) patch.get(0)).set("value", null);
|
||||
}
|
||||
JsonNode current = MAPPER.valueToTree(object);
|
||||
object = JsonPatch.fromJson(patch).apply(current);
|
||||
jsonObject = JsonPatch.fromJson(patch).apply(jsonObject);
|
||||
} catch (IOException | JsonPatchException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
return MAPPER.writeValueAsString(object);
|
||||
return jsonObject;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -171,7 +171,7 @@ public abstract class AbstractServiceLivenessCoordinator extends AbstractService
|
||||
protected void handleAllServiceInNotRunningState() {
|
||||
// Soft delete all services which are NOT_RUNNING anymore.
|
||||
store.findAllInstancesInStates(Set.of(Service.ServiceState.NOT_RUNNING))
|
||||
.forEach(instance -> safelyUpdate(instance, Service.ServiceState.EMPTY, null));
|
||||
.forEach(instance -> safelyUpdate(instance, Service.ServiceState.INACTIVE, null));
|
||||
}
|
||||
|
||||
protected void handleAllServicesForTerminatedStates(final Instant now) {
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
package io.kestra.core.server;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import io.kestra.core.utils.Enums;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
@@ -106,12 +108,12 @@ public interface Service extends AutoCloseable {
|
||||
* |
|
||||
* v
|
||||
* +------+-------+
|
||||
* | Empty (8) |
|
||||
* | Inactive (8) |
|
||||
* +------+-------+
|
||||
* </pre>
|
||||
*/
|
||||
enum ServiceState {
|
||||
CREATED(1, 2, 3, 4, 9), // 0
|
||||
CREATED(1, 2, 3, 4, 9), // 0
|
||||
RUNNING(2, 3, 4, 9), // 1
|
||||
ERROR(4), // 2
|
||||
DISCONNECTED(4, 7), // 3
|
||||
@@ -119,14 +121,24 @@ public interface Service extends AutoCloseable {
|
||||
TERMINATED_GRACEFULLY(7), // 5
|
||||
TERMINATED_FORCED(7), // 6
|
||||
NOT_RUNNING(8), // 7
|
||||
EMPTY(), // 8 FINAL STATE
|
||||
MAINTENANCE(1, 2, 3, 4); // 9
|
||||
INACTIVE(), // 8 FINAL STATE
|
||||
MAINTENANCE(1, 2, 3, 4); // 9
|
||||
|
||||
private final Set<Integer> validTransitions = new HashSet<>();
|
||||
|
||||
ServiceState(final Integer... validTransitions) {
|
||||
this.validTransitions.addAll(Arrays.asList(validTransitions));
|
||||
}
|
||||
|
||||
@JsonCreator
|
||||
public static ServiceState fromString(final String value) {
|
||||
try {
|
||||
// EMPTY state was renamed to INACTIVE in Kestra 1.0
|
||||
return Enums.getForNameIgnoreCase(value, ServiceState.class, Map.of("EMPTY", INACTIVE));
|
||||
} catch (IllegalArgumentException e) {
|
||||
return INACTIVE;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isValidTransition(final ServiceState newState) {
|
||||
return validTransitions.contains(newState.ordinal()) || equals(newState);
|
||||
@@ -145,7 +157,7 @@ public interface Service extends AutoCloseable {
|
||||
return equals(TERMINATED_GRACEFULLY)
|
||||
|| equals(TERMINATED_FORCED)
|
||||
|| equals(NOT_RUNNING)
|
||||
|| equals(EMPTY);
|
||||
|| equals(INACTIVE);
|
||||
}
|
||||
|
||||
public static Set<ServiceState> allRunningStates() {
|
||||
|
||||
@@ -392,7 +392,7 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
|
||||
// More especially, this handles the case where a WORKER is configured with a short gracefulTerminationPeriod
|
||||
// and the JVM was unresponsive for more than this period.
|
||||
// In this context, the worker's tasks have already been resubmitted by the executor; the worker must therefore stop immediately.
|
||||
if (state.equals(Service.ServiceState.NOT_RUNNING) || state.equals(Service.ServiceState.EMPTY)) {
|
||||
if (state.equals(Service.ServiceState.NOT_RUNNING) || state.equals(Service.ServiceState.INACTIVE)) {
|
||||
service.skipGracefulTermination(true);
|
||||
}
|
||||
KestraContext.getContext().shutdown();
|
||||
|
||||
@@ -130,7 +130,7 @@ public class ConditionService {
|
||||
return this.conditionContext(runContext, flow, execution, null);
|
||||
}
|
||||
|
||||
boolean valid(FlowInterface flow, List<Condition> list, ConditionContext conditionContext) {
|
||||
public boolean valid(FlowInterface flow, List<Condition> list, ConditionContext conditionContext) {
|
||||
return list
|
||||
.stream()
|
||||
.allMatch(condition -> {
|
||||
|
||||
@@ -317,6 +317,32 @@ public class ExecutionService {
|
||||
return revision != null ? newExecution.withFlowRevision(revision) : newExecution;
|
||||
}
|
||||
|
||||
public Execution changeTaskRunState(final Execution execution, Flow flow, String taskRunId, State.Type newState) throws Exception {
|
||||
Execution newExecution = markAs(execution, flow, taskRunId, newState);
|
||||
|
||||
// if the execution was terminated, it could have executed errors/finally/afterExecutions, we must remove them as the execution will be restarted
|
||||
if (execution.getState().isTerminated()) {
|
||||
List<TaskRun> newTaskRuns = newExecution.getTaskRunList();
|
||||
// We need to remove global error tasks and flowable error tasks if any
|
||||
flow
|
||||
.allErrorsWithChildren()
|
||||
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
|
||||
|
||||
// We need to remove global finally tasks and flowable error tasks if any
|
||||
flow
|
||||
.allFinallyWithChildren()
|
||||
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
|
||||
|
||||
// We need to remove afterExecution tasks
|
||||
ListUtils.emptyOnNull(flow.getAfterExecution())
|
||||
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
|
||||
|
||||
return newExecution.withTaskRunList(newTaskRuns);
|
||||
} else {
|
||||
return newExecution;
|
||||
}
|
||||
}
|
||||
|
||||
public Execution markAs(final Execution execution, FlowInterface flow, String taskRunId, State.Type newState) throws Exception {
|
||||
return this.markAs(execution, flow, taskRunId, newState, null, null);
|
||||
}
|
||||
|
||||
@@ -3,12 +3,7 @@ package io.kestra.core.services;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import io.kestra.core.exceptions.FlowProcessingException;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowId;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.FlowWithException;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.flows.GenericFlow;
|
||||
import io.kestra.core.models.flows.*;
|
||||
import io.kestra.core.models.tasks.RunnableTask;
|
||||
import io.kestra.core.models.topologies.FlowTopology;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
@@ -30,16 +25,7 @@ import org.apache.commons.lang3.builder.EqualsBuilder;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Modifier;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
@@ -408,7 +394,7 @@ public class FlowService {
|
||||
return latestFlows.values().stream().filter(flow -> !flow.isDeleted());
|
||||
}
|
||||
|
||||
protected boolean removeUnwanted(Flow f, Execution execution) {
|
||||
public boolean removeUnwanted(Flow f, Execution execution) {
|
||||
// we don't allow recursive
|
||||
return !f.uidWithoutRevision().equals(FlowId.uidWithoutRevision(execution));
|
||||
}
|
||||
@@ -551,23 +537,24 @@ public class FlowService {
|
||||
return expandAll ? recursiveFlowTopology(new ArrayList<>(), tenant, namespace, id, destinationOnly) : flowTopologyRepository.get().findByFlow(tenant, namespace, id, destinationOnly).stream();
|
||||
}
|
||||
|
||||
private Stream<FlowTopology> recursiveFlowTopology(List<FlowId> flowIds, String tenantId, String namespace, String id, boolean destinationOnly) {
|
||||
private Stream<FlowTopology> recursiveFlowTopology(List<String> visitedTopologies, String tenantId, String namespace, String id, boolean destinationOnly) {
|
||||
if (flowTopologyRepository.isEmpty()) {
|
||||
throw noRepositoryException();
|
||||
}
|
||||
|
||||
List<FlowTopology> flowTopologies = flowTopologyRepository.get().findByFlow(tenantId, namespace, id, destinationOnly);
|
||||
|
||||
FlowId flowId = FlowId.of(tenantId, namespace, id, null);
|
||||
if (flowIds.contains(flowId)) {
|
||||
return flowTopologies.stream();
|
||||
}
|
||||
flowIds.add(flowId);
|
||||
var flowTopologies = flowTopologyRepository.get().findByFlow(tenantId, namespace, id, destinationOnly);
|
||||
|
||||
return flowTopologies.stream()
|
||||
.flatMap(topology -> Stream.of(topology.getDestination(), topology.getSource()))
|
||||
// recursively fetch child nodes
|
||||
.flatMap(node -> recursiveFlowTopology(flowIds, node.getTenantId(), node.getNamespace(), node.getId(), destinationOnly));
|
||||
// ignore already visited topologies
|
||||
.filter(x -> !visitedTopologies.contains(x.uid()))
|
||||
.flatMap(topology -> {
|
||||
visitedTopologies.add(topology.uid());
|
||||
Stream<FlowTopology> subTopologies = Stream
|
||||
.of(topology.getDestination(), topology.getSource())
|
||||
// recursively visit children and parents nodes
|
||||
.flatMap(relationNode -> recursiveFlowTopology(visitedTopologies, relationNode.getTenantId(), relationNode.getNamespace(), relationNode.getId(), destinationOnly));
|
||||
return Stream.concat(Stream.of(topology), subTopologies);
|
||||
});
|
||||
}
|
||||
|
||||
private IllegalStateException noRepositoryException() {
|
||||
|
||||
@@ -10,6 +10,7 @@ import jakarta.annotation.Nullable;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Collections;
|
||||
@@ -112,7 +113,14 @@ public class InternalKVStore implements KVStore {
|
||||
@Override
|
||||
public boolean delete(String key) throws IOException {
|
||||
KVStore.validateKey(key);
|
||||
return this.storage.delete(this.tenant, this.namespace, this.storageUri(key));
|
||||
URI uri = this.storageUri(key);
|
||||
boolean deleted = this.storage.delete(this.tenant, this.namespace, uri);
|
||||
URI metadataURI = URI.create(uri.getPath() + ".metadata");
|
||||
if (this.storage.exists(this.tenant, this.namespace, metadataURI)){
|
||||
this.storage.delete(this.tenant, this.namespace, metadataURI);
|
||||
}
|
||||
return deleted;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -120,18 +128,32 @@ public class InternalKVStore implements KVStore {
|
||||
*/
|
||||
@Override
|
||||
public List<KVEntry> list() throws IOException {
|
||||
List<FileAttributes> list;
|
||||
try {
|
||||
list = this.storage.list(this.tenant, this.namespace, this.storageUri(null));
|
||||
} catch (FileNotFoundException e) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
List<FileAttributes> list = listAllFromStorage();
|
||||
return list.stream()
|
||||
.map(throwFunction(KVEntry::from))
|
||||
.filter(kvEntry -> Optional.ofNullable(kvEntry.expirationDate()).map(expirationDate -> Instant.now().isBefore(expirationDate)).orElse(true))
|
||||
.toList();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public List<KVEntry> listAll() throws IOException {
|
||||
List<FileAttributes> list = listAllFromStorage();
|
||||
return list.stream()
|
||||
.map(throwFunction(KVEntry::from))
|
||||
.toList();
|
||||
}
|
||||
|
||||
private List<FileAttributes> listAllFromStorage() throws IOException {
|
||||
try {
|
||||
return this.storage.list(this.tenant, this.namespace, this.storageUri(null));
|
||||
} catch (FileNotFoundException e) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
package io.kestra.core.storages.kv;
|
||||
|
||||
import io.kestra.core.storages.FileAttributes;
|
||||
import jakarta.annotation.Nullable;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.Optional;
|
||||
|
||||
public record KVEntry(String key, String description, Instant creationDate, Instant updateDate, Instant expirationDate) {
|
||||
public record KVEntry(String key, @Nullable String description, Instant creationDate, Instant updateDate, @Nullable Instant expirationDate) {
|
||||
public static KVEntry from(FileAttributes fileAttributes) throws IOException {
|
||||
Optional<KVMetadata> kvMetadata = Optional.ofNullable(fileAttributes.getMetadata()).map(KVMetadata::new);
|
||||
return new KVEntry(
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
package io.kestra.core.storages.kv;
|
||||
|
||||
import io.kestra.core.exceptions.ResourceExpiredException;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import jakarta.annotation.Nullable;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
@@ -80,6 +78,14 @@ public interface KVStore {
|
||||
*/
|
||||
List<KVEntry> list() throws IOException;
|
||||
|
||||
/**
|
||||
* Lists all the K/V store entries, expired or not.
|
||||
*
|
||||
* @return The list of all {@link KVEntry}.
|
||||
* @throws IOException if an error occurred while executing the operation on the K/V store.
|
||||
*/
|
||||
List<KVEntry> listAll() throws IOException;
|
||||
|
||||
/**
|
||||
* Finds the K/V store entry for the given key.
|
||||
*
|
||||
|
||||
@@ -18,6 +18,7 @@ import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.repositories.FlowTopologyRepositoryInterface;
|
||||
import io.kestra.core.services.ConditionService;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.MapUtils;
|
||||
import io.kestra.plugin.core.condition.*;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -175,9 +176,6 @@ public class FlowTopologyService {
|
||||
protected boolean isTriggerChild(Flow parent, Flow child) {
|
||||
List<AbstractTrigger> triggers = ListUtils.emptyOnNull(child.getTriggers());
|
||||
|
||||
// simulated execution: we add a "simulated" label so conditions can know that the evaluation is for a simulated execution
|
||||
Execution execution = Execution.newExecution(parent, (f, e) -> null, List.of(SIMULATED_EXECUTION), Optional.empty());
|
||||
|
||||
// keep only flow trigger
|
||||
List<io.kestra.plugin.core.trigger.Flow> flowTriggers = triggers
|
||||
.stream()
|
||||
@@ -189,13 +187,16 @@ public class FlowTopologyService {
|
||||
return false;
|
||||
}
|
||||
|
||||
// simulated execution: we add a "simulated" label so conditions can know that the evaluation is for a simulated execution
|
||||
Execution execution = Execution.newExecution(parent, (f, e) -> null, List.of(SIMULATED_EXECUTION), Optional.empty());
|
||||
|
||||
boolean conditionMatch = flowTriggers
|
||||
.stream()
|
||||
.flatMap(flow -> ListUtils.emptyOnNull(flow.getConditions()).stream())
|
||||
.allMatch(condition -> validateCondition(condition, parent, execution));
|
||||
|
||||
boolean preconditionMatch = flowTriggers.stream()
|
||||
.anyMatch(flow -> flow.getPreconditions() == null || validateMultipleConditions(flow.getPreconditions().getConditions(), parent, execution));
|
||||
.anyMatch(flow -> flow.getPreconditions() == null || validatePreconditions(flow.getPreconditions(), parent, execution));
|
||||
|
||||
return conditionMatch && preconditionMatch;
|
||||
}
|
||||
@@ -209,7 +210,13 @@ public class FlowTopologyService {
|
||||
return validateMultipleConditions(multipleCondition.getConditions(), child, execution);
|
||||
}
|
||||
|
||||
return this.conditionService.isValid(condition, child, execution);
|
||||
try {
|
||||
return this.conditionService.isValid(condition, child, execution);
|
||||
} catch (Exception e) {
|
||||
// extra safety net, it means there is a bug
|
||||
log.error("unable to validate condition in FlowTopologyService, flow: {}, condition: {}", child.uid(), condition, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean validateMultipleConditions(Map<String, Condition> multipleConditions, FlowInterface child, Execution execution) {
|
||||
@@ -233,11 +240,24 @@ public class FlowTopologyService {
|
||||
}
|
||||
|
||||
private boolean isMandatoryMultipleCondition(Condition condition) {
|
||||
return Stream
|
||||
.of(
|
||||
Expression.class
|
||||
)
|
||||
.anyMatch(aClass -> condition.getClass().isAssignableFrom(aClass));
|
||||
return condition.getClass().isAssignableFrom(Expression.class);
|
||||
}
|
||||
|
||||
private boolean validatePreconditions(io.kestra.plugin.core.trigger.Flow.Preconditions preconditions, FlowInterface child, Execution execution) {
|
||||
boolean upstreamFlowMatched = MapUtils.emptyOnNull(preconditions.getUpstreamFlowsConditions())
|
||||
.values()
|
||||
.stream()
|
||||
.filter(c -> !isFilterCondition(c))
|
||||
.anyMatch(c -> validateCondition(c, child, execution));
|
||||
|
||||
boolean whereMatched = MapUtils.emptyOnNull(preconditions.getWhereConditions())
|
||||
.values()
|
||||
.stream()
|
||||
.filter(c -> !isFilterCondition(c))
|
||||
.allMatch(c -> validateCondition(c, child, execution));
|
||||
|
||||
// to be a dependency, if upstream flow is set it must be either inside it so it's a AND between upstream flow and where
|
||||
return upstreamFlowMatched && whereMatched;
|
||||
}
|
||||
|
||||
private boolean isFilterCondition(Condition condition) {
|
||||
|
||||
15
core/src/main/java/io/kestra/core/utils/EditionProvider.java
Normal file
15
core/src/main/java/io/kestra/core/utils/EditionProvider.java
Normal file
@@ -0,0 +1,15 @@
|
||||
package io.kestra.core.utils;
|
||||
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@Singleton
|
||||
public class EditionProvider {
|
||||
public Edition get() {
|
||||
return Edition.OSS;
|
||||
}
|
||||
|
||||
public enum Edition {
|
||||
OSS,
|
||||
EE
|
||||
}
|
||||
}
|
||||
@@ -206,22 +206,17 @@ public class MapUtils {
|
||||
|
||||
/**
|
||||
* Utility method that flatten a nested map.
|
||||
* <p>
|
||||
* NOTE: for simplicity, this method didn't allow to flatten maps with conflicting keys that would end up in different flatten keys,
|
||||
* this could be related later if needed by flattening {k1: k2: {k3: v1}, k1: {k4: v2}} to {k1.k2.k3: v1, k1.k4: v2} is prohibited for now.
|
||||
*
|
||||
* @param nestedMap the nested map.
|
||||
* @return the flattened map.
|
||||
*
|
||||
* @throws IllegalArgumentException if any entry contains a map of more than one element.
|
||||
*/
|
||||
public static Map<String, Object> nestedToFlattenMap(@NotNull Map<String, Object> nestedMap) {
|
||||
Map<String, Object> result = new TreeMap<>();
|
||||
|
||||
for (Map.Entry<String, Object> entry : nestedMap.entrySet()) {
|
||||
if (entry.getValue() instanceof Map<?, ?> map) {
|
||||
Map.Entry<String, Object> flatten = flattenEntry(entry.getKey(), (Map<String, Object>) map);
|
||||
result.put(flatten.getKey(), flatten.getValue());
|
||||
Map<String, Object> flatten = flattenEntry(entry.getKey(), (Map<String, Object>) map);
|
||||
result.putAll(flatten);
|
||||
} else {
|
||||
result.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
@@ -229,18 +224,19 @@ public class MapUtils {
|
||||
return result;
|
||||
}
|
||||
|
||||
private static Map.Entry<String, Object> flattenEntry(String key, Map<String, Object> value) {
|
||||
if (value.size() > 1) {
|
||||
throw new IllegalArgumentException("You cannot flatten a map with an entry that is a map of more than one element, conflicting key: " + key);
|
||||
private static Map<String, Object> flattenEntry(String key, Map<String, Object> value) {
|
||||
Map<String, Object> result = new TreeMap<>();
|
||||
|
||||
for (Map.Entry<String, Object> entry : value.entrySet()) {
|
||||
String newKey = key + "." + entry.getKey();
|
||||
Object newValue = entry.getValue();
|
||||
if (newValue instanceof Map<?, ?> map) {
|
||||
result.putAll(flattenEntry(newKey, (Map<String, Object>) map));
|
||||
} else {
|
||||
result.put(newKey, newValue);
|
||||
}
|
||||
}
|
||||
|
||||
Map.Entry<String, Object> entry = value.entrySet().iterator().next();
|
||||
String newKey = key + "." + entry.getKey();
|
||||
Object newValue = entry.getValue();
|
||||
if (newValue instanceof Map<?, ?> map) {
|
||||
return flattenEntry(newKey, (Map<String, Object>) map);
|
||||
} else {
|
||||
return Map.entry(newKey, newValue);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,17 +32,23 @@ public class Version implements Comparable<Version> {
|
||||
* @param version the version.
|
||||
* @return a new {@link Version} instance.
|
||||
*/
|
||||
public static Version of(String version) {
|
||||
public static Version of(final Object version) {
|
||||
|
||||
if (version.startsWith("v")) {
|
||||
version = version.substring(1);
|
||||
if (Objects.isNull(version)) {
|
||||
throw new IllegalArgumentException("Invalid version, cannot parse null version");
|
||||
}
|
||||
|
||||
String strVersion = version.toString();
|
||||
|
||||
if (strVersion.startsWith("v")) {
|
||||
strVersion = strVersion.substring(1);
|
||||
}
|
||||
|
||||
int qualifier = version.indexOf("-");
|
||||
int qualifier = strVersion.indexOf("-");
|
||||
|
||||
final String[] versions = qualifier > 0 ?
|
||||
version.substring(0, qualifier).split("\\.") :
|
||||
version.split("\\.");
|
||||
strVersion.substring(0, qualifier).split("\\.") :
|
||||
strVersion.split("\\.");
|
||||
try {
|
||||
final int majorVersion = Integer.parseInt(versions[0]);
|
||||
final int minorVersion = versions.length > 1 ? Integer.parseInt(versions[1]) : 0;
|
||||
@@ -52,28 +58,54 @@ public class Version implements Comparable<Version> {
|
||||
majorVersion,
|
||||
minorVersion,
|
||||
incrementalVersion,
|
||||
qualifier > 0 ? version.substring(qualifier + 1) : null,
|
||||
version
|
||||
qualifier > 0 ? strVersion.substring(qualifier + 1) : null,
|
||||
strVersion
|
||||
);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new IllegalArgumentException("Invalid version, cannot parse '" + version + "'");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Static helper method for returning the most recent stable version for a current {@link Version}.
|
||||
* Returns the most recent stable version compatible with the given {@link Version}.
|
||||
*
|
||||
* @param from the current version.
|
||||
* @param versions the list of version.
|
||||
* <p>Resolution strategy:</p>
|
||||
* <ol>
|
||||
* <li>If {@code from} is present in {@code versions}, return it directly.</li>
|
||||
* <li>Otherwise, return the latest version with the same major and minor.</li>
|
||||
* <li>If none found and {@code from.majorVersion() > 0}, return the latest with the same major.</li>
|
||||
* <li>Return {@code null} if no compatible version is found.</li>
|
||||
* </ol>
|
||||
*
|
||||
* @return the last stable version.
|
||||
* @param from the current version
|
||||
* @param versions available versions
|
||||
* @return the most recent compatible stable version, or {@code null} if none
|
||||
*/
|
||||
public static Version getStable(final Version from, final Collection<Version> versions) {
|
||||
List<Version> compatibleVersions = versions.stream()
|
||||
if (versions.contains(from)) {
|
||||
return from;
|
||||
}
|
||||
|
||||
// Prefer same major+minor stable versions
|
||||
List<Version> sameMinorStable = versions.stream()
|
||||
.filter(v -> v.majorVersion() == from.majorVersion() && v.minorVersion() == from.minorVersion())
|
||||
.toList();
|
||||
if (compatibleVersions.isEmpty()) return null;
|
||||
return Version.getLatest(compatibleVersions);
|
||||
|
||||
if (!sameMinorStable.isEmpty()) {
|
||||
return Version.getLatest(sameMinorStable);
|
||||
}
|
||||
|
||||
if (from.majorVersion() > 0) {
|
||||
// Fallback: any stable version with the same major
|
||||
List<Version> sameMajorStable = versions.stream()
|
||||
.filter(v -> v.majorVersion() == from.majorVersion())
|
||||
.toList();
|
||||
|
||||
if (!sameMajorStable.isEmpty()) {
|
||||
return Version.getLatest(sameMajorStable);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -29,7 +29,7 @@ import java.util.Map;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Condition that check labels of an execution."
|
||||
title = "Condition that checks labels of an execution."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
|
||||
@@ -35,8 +35,8 @@ import static io.kestra.core.utils.MapUtils.mergeWithNullableValues;
|
||||
examples = {
|
||||
@Example(
|
||||
title = """
|
||||
The upstream `flow_a` must explicitly define its outputs
|
||||
to be used in the `ExecutionOutputs` condition.
|
||||
The upstream `flow_a` must explicitly define its outputs
|
||||
to be used in the `ExecutionOutputs` condition.
|
||||
|
||||
```yaml
|
||||
id: flow_a
|
||||
@@ -58,7 +58,7 @@ import static io.kestra.core.utils.MapUtils.mergeWithNullableValues;
|
||||
value: "{{ outputs.hello.value }}"
|
||||
```
|
||||
|
||||
The `flow_condition_executionoutputs` will run whenever `flow_a` finishes successfully
|
||||
The `flow_condition_executionoutputs` will run whenever `flow_a` finishes successfully
|
||||
and returns an output matching the value 'hello':
|
||||
""",
|
||||
full = true,
|
||||
@@ -105,7 +105,7 @@ public class ExecutionOutputs extends Condition implements ScheduleCondition {
|
||||
conditionContext.getVariables(),
|
||||
Map.of(TRIGGER_VAR, Map.of(OUTPUTS_VAR, conditionContext.getExecution().getOutputs()))
|
||||
);
|
||||
|
||||
|
||||
return conditionContext.getRunContext().render(expression).skipCache().as(Boolean.class, variables).orElseThrow();
|
||||
}
|
||||
|
||||
|
||||
@@ -35,7 +35,7 @@ import jakarta.validation.constraints.NotNull;
|
||||
code = """
|
||||
id: myflow
|
||||
namespace: company.team
|
||||
|
||||
|
||||
tasks:
|
||||
- id: hello
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
|
||||
@@ -47,7 +47,7 @@ public class FlowNamespaceCondition extends Condition {
|
||||
|
||||
@Builder.Default
|
||||
@Schema(
|
||||
title = "If we must look at the flow namespace by prefix (checked using startWith). The prefix is case sensitive."
|
||||
title = "If we must look at the flow namespace by prefix (checked using startsWith). The prefix is case sensitive."
|
||||
)
|
||||
@PluginProperty
|
||||
private final Boolean prefix = false;
|
||||
|
||||
@@ -26,7 +26,7 @@ import java.util.*;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Run a flow if the list of preconditions are met in a time window.",
|
||||
title = "Run a flow if the list of preconditions is met in a time window.",
|
||||
description = """
|
||||
**This task is deprecated**, use the `preconditions` property of the `io.kestra.plugin.core.trigger.Flow` trigger instead.
|
||||
Will trigger an executions when all the flows defined by the preconditions are successfully executed in a specific period of time.
|
||||
|
||||
@@ -27,7 +27,7 @@ import static io.kestra.core.utils.Rethrow.throwPredicate;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Condition to exclude others conditions."
|
||||
title = "Condition to exclude other conditions."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
@@ -62,7 +62,7 @@ public class Not extends Condition implements ScheduleCondition {
|
||||
@NotEmpty
|
||||
@Schema(
|
||||
title = "The list of conditions to exclude.",
|
||||
description = "If any conditions is true, it will prevent the event's execution."
|
||||
description = "If any condition is true, it will prevent the event's execution."
|
||||
)
|
||||
@PluginProperty
|
||||
private List<Condition> conditions;
|
||||
|
||||
@@ -31,7 +31,7 @@ import static io.kestra.core.utils.Rethrow.throwPredicate;
|
||||
@Plugin(
|
||||
examples = {
|
||||
@Example(
|
||||
title = "Trigger condition to execute the flow when any of the condition is satisfied.",
|
||||
title = "Trigger condition to execute the flow when any of the conditions are satisfied.",
|
||||
full = true,
|
||||
code = """
|
||||
id: schedule_condition_or
|
||||
|
||||
@@ -5,14 +5,12 @@ import de.focus_shift.jollyday.core.ManagerParameters;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.models.annotations.Example;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.models.conditions.ScheduleCondition;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.utils.DateUtils;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
@@ -31,7 +29,7 @@ import java.time.LocalDate;
|
||||
examples = {
|
||||
@Example(
|
||||
full = true,
|
||||
title = "Trigger condition to excute the flow only on public holidays.",
|
||||
title = "Trigger condition to execute the flow only on public holidays.",
|
||||
code = """
|
||||
id: schedule_condition_public-holiday
|
||||
namespace: company.team
|
||||
@@ -52,7 +50,7 @@ import java.time.LocalDate;
|
||||
),
|
||||
@Example(
|
||||
full = true,
|
||||
title = "Trigger condition to excute the flow only on work days in France.",
|
||||
title = "Trigger condition to execute the flow only on work days in France.",
|
||||
code = """
|
||||
id: schedule-condition-work-days
|
||||
namespace: company.team
|
||||
|
||||
@@ -25,7 +25,7 @@ import lombok.experimental.SuperBuilder;
|
||||
@Plugin(
|
||||
examples = {
|
||||
@Example(
|
||||
title = "Display a bar chart with with Executions per Namespace.",
|
||||
title = "Display a bar chart with Executions per Namespace.",
|
||||
full = true,
|
||||
code = { """
|
||||
charts:
|
||||
|
||||
@@ -42,7 +42,6 @@ import lombok.experimental.SuperBuilder;
|
||||
|
||||
- Use this insight to identify trends and optimize performance.
|
||||
"""
|
||||
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ import lombok.experimental.SuperBuilder;
|
||||
@Plugin(
|
||||
examples = {
|
||||
@Example(
|
||||
title = "Display a pie chart with with Executions per State.",
|
||||
title = "Display a pie chart with Executions per State.",
|
||||
full = true,
|
||||
code = { """
|
||||
charts:
|
||||
|
||||
@@ -25,7 +25,7 @@ import lombok.experimental.SuperBuilder;
|
||||
@Plugin(
|
||||
examples = {
|
||||
@Example(
|
||||
title = "Display a table with a Log count for each level by Namespace.",
|
||||
title = "Display a table with Log counts for each level by Namespace.",
|
||||
full = true,
|
||||
code = { """
|
||||
charts:
|
||||
|
||||
@@ -29,7 +29,7 @@ import lombok.experimental.SuperBuilder;
|
||||
@Example(
|
||||
title = "Display a chart with Executions over the last week.",
|
||||
full = true,
|
||||
code = { """
|
||||
code = { """
|
||||
charts:
|
||||
- id: executions_timeseries
|
||||
type: io.kestra.plugin.core.dashboard.chart.TimeSeries
|
||||
|
||||
@@ -31,7 +31,7 @@ import lombok.experimental.SuperBuilder;
|
||||
title = "Display a chart with a Executions per Namespace broken out by State.",
|
||||
full = true,
|
||||
code = { """
|
||||
charts:
|
||||
charts:
|
||||
- id: executions_per_namespace_bars
|
||||
type: io.kestra.plugin.core.dashboard.chart.Bar
|
||||
chartOptions:
|
||||
|
||||
@@ -50,7 +50,7 @@ import java.util.Optional;
|
||||
Bearer {{ inputs.token }}
|
||||
{%- elseif inputs.username is not empty and inputs.password is not empty -%}
|
||||
Basic {{ (inputs.username + ':' + inputs.password) | base64encode }}
|
||||
{%- endif -%}
|
||||
{%- endif -%}
|
||||
"""
|
||||
)
|
||||
},
|
||||
|
||||
@@ -73,7 +73,7 @@ public class Assert extends Task implements RunnableTask<VoidOutput> {
|
||||
@PluginProperty(dynamic = true)
|
||||
private List<String> conditions;
|
||||
|
||||
@Schema(title = "Optional error message.")
|
||||
@Schema(title = "Optional error message")
|
||||
private Property<String> errorMessage;
|
||||
|
||||
@Override
|
||||
|
||||
@@ -37,7 +37,7 @@ import static io.kestra.core.utils.Rethrow.throwPredicate;
|
||||
@Plugin(
|
||||
examples = {
|
||||
@Example(
|
||||
title = "Send a slack notification if there is no execution for a flow for the last 24 hours.",
|
||||
title = "Send a slack notification if there is no execution for a flow in the last 24 hours.",
|
||||
full = true,
|
||||
code = """
|
||||
id: executions_count
|
||||
@@ -78,32 +78,32 @@ import static io.kestra.core.utils.Rethrow.throwPredicate;
|
||||
)
|
||||
public class Count extends Task implements RunnableTask<Count.Output> {
|
||||
@Schema(
|
||||
title = "A list of flows to be filtered.",
|
||||
title = "A list of flows to be filtered",
|
||||
description = "If not provided, namespaces must be set."
|
||||
)
|
||||
@PluginProperty
|
||||
protected List<Flow> flows;
|
||||
|
||||
@Schema(
|
||||
title = "A list of states to be filtered."
|
||||
title = "A list of states to be filtered"
|
||||
)
|
||||
protected Property<List<State.Type>> states;
|
||||
|
||||
@NotNull
|
||||
@Schema(
|
||||
title = "The start date."
|
||||
title = "The start date"
|
||||
)
|
||||
protected Property<String> startDate;
|
||||
|
||||
@Schema(
|
||||
title = "The end date."
|
||||
title = "The end date"
|
||||
)
|
||||
protected Property<String> endDate;
|
||||
|
||||
@NotNull
|
||||
@Schema(
|
||||
title = "The expression to look at against each flow.",
|
||||
description = "The expression is such that expression must return `true` in order to keep the current line.\n" +
|
||||
title = "The expression to check against each flow",
|
||||
description = "The expression is such that the expression must return `true` in order to keep the current line.\n" +
|
||||
"Some examples: \n" +
|
||||
"- ```yaml {{ eq count 0 }} ```: no execution found\n" +
|
||||
"- ```yaml {{ gte count 5 }} ```: more than 5 executions\n"
|
||||
@@ -121,7 +121,7 @@ public class Count extends Task implements RunnableTask<Count.Output> {
|
||||
.getBean(ExecutionRepositoryInterface.class);
|
||||
|
||||
if (flows == null && namespaces == null) {
|
||||
throw new IllegalArgumentException("You must provide a list of flows or namespaces");
|
||||
throw new IllegalArgumentException("You must provide a list of flows or namespaces.");
|
||||
}
|
||||
|
||||
var flowInfo = runContext.flowInfo();
|
||||
|
||||
@@ -119,7 +119,7 @@ public class Fail extends Task implements RunnableTask<VoidOutput> {
|
||||
)
|
||||
private Property<String> condition;
|
||||
|
||||
@Schema(title = "Optional error message.")
|
||||
@Schema(title = "Optional error message")
|
||||
@Builder.Default
|
||||
private Property<String> errorMessage = Property.ofValue("Task failure");
|
||||
|
||||
|
||||
@@ -36,7 +36,7 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Allow to add or overwrite labels for the current execution at runtime.",
|
||||
title = "Add or overwrite labels for the current execution at runtime.",
|
||||
description = "Trying to pass a system label (a label starting with `system.`) will fail the task."
|
||||
)
|
||||
@Plugin(
|
||||
@@ -76,7 +76,7 @@ public class Labels extends Task implements ExecutionUpdatableTask {
|
||||
private static final ObjectMapper MAPPER = JacksonMapper.ofJson();
|
||||
|
||||
@Schema(
|
||||
title = "Labels to add to the current execution.",
|
||||
title = "Labels to add to the current execution",
|
||||
description = "The value should result in a list of labels or a labelKey:labelValue map",
|
||||
oneOf = {
|
||||
String.class,
|
||||
@@ -127,9 +127,24 @@ public class Labels extends Task implements ExecutionUpdatableTask {
|
||||
}
|
||||
|
||||
// check for system labels: none can be passed at runtime
|
||||
Optional<Map.Entry<String, String>> first = labelsAsMap.entrySet().stream().filter(entry -> entry.getKey().startsWith(SYSTEM_PREFIX)).findFirst();
|
||||
if (first.isPresent()) {
|
||||
throw new IllegalArgumentException("System labels can only be set by Kestra itself, offending label: " + first.get().getKey() + "=" + first.get().getValue());
|
||||
Optional<Map.Entry<String, String>> systemLabel = labelsAsMap.entrySet().stream()
|
||||
.filter(entry -> entry.getKey().startsWith(SYSTEM_PREFIX))
|
||||
.findFirst();
|
||||
if (systemLabel.isPresent()) {
|
||||
throw new IllegalArgumentException(
|
||||
"System labels can only be set by Kestra itself, offending label: " +
|
||||
systemLabel.get().getKey() + "=" + systemLabel.get().getValue()
|
||||
);
|
||||
}
|
||||
|
||||
// check for empty label values
|
||||
Optional<Map.Entry<String, String>> emptyValue = labelsAsMap.entrySet().stream()
|
||||
.filter(entry -> entry.getValue().isEmpty())
|
||||
.findFirst();
|
||||
if (emptyValue.isPresent()) {
|
||||
throw new IllegalArgumentException(
|
||||
"Label values cannot be empty, offending label: " + emptyValue.get().getKey()
|
||||
);
|
||||
}
|
||||
|
||||
Map<String, String> newLabels = ListUtils.emptyOnNull(execution.getLabels()).stream()
|
||||
|
||||
@@ -45,59 +45,59 @@ import java.util.List;
|
||||
)
|
||||
public class PurgeExecutions extends Task implements RunnableTask<PurgeExecutions.Output> {
|
||||
@Schema(
|
||||
title = "Namespace whose flows need to be purged, or namespace of the flow that needs to be purged.",
|
||||
title = "Namespace whose flows need to be purged, or namespace of the flow that needs to be purged",
|
||||
description = "If `flowId` isn't provided, this is a namespace prefix, else the namespace of the flow."
|
||||
)
|
||||
private Property<String> namespace;
|
||||
|
||||
@Schema(
|
||||
title = "The flow ID to be purged.",
|
||||
title = "The flow ID to be purged",
|
||||
description = "You need to provide the `namespace` properties if you want to purge a flow."
|
||||
)
|
||||
private Property<String> flowId;
|
||||
|
||||
@Schema(
|
||||
title = "The minimum date to be purged.",
|
||||
title = "The date after which data should be purged",
|
||||
description = "All data of flows executed after this date will be purged."
|
||||
)
|
||||
private Property<String> startDate;
|
||||
|
||||
@Schema(
|
||||
title = "The maximum date to be purged.",
|
||||
title = "The date before which data should be purged.",
|
||||
description = "All data of flows executed before this date will be purged."
|
||||
)
|
||||
@NotNull
|
||||
private Property<String> endDate;
|
||||
|
||||
@Schema(
|
||||
title = "The state of the executions to be purged.",
|
||||
title = "The state of the executions to be purged",
|
||||
description = "If not set, executions for any states will be purged."
|
||||
)
|
||||
private Property<List<State.Type>> states;
|
||||
|
||||
@Schema(
|
||||
title = "Whether to purge executions."
|
||||
title = "Flag specifying whether to purge executions"
|
||||
)
|
||||
@Builder.Default
|
||||
private Property<Boolean> purgeExecution = Property.ofValue(true);
|
||||
|
||||
@Schema(
|
||||
title = "Whether to purge execution's logs.",
|
||||
title = "Flag specifying whether to purge execution logs",
|
||||
description = """
|
||||
This will only purge logs from executions not from triggers, and it will do it execution by execution.
|
||||
The `io.kestra.plugin.core.log.PurgeLogs` task is a better fit to purge logs as it will purge logs in bulk, and will also purge logs not tied to an execution like trigger logs."""
|
||||
This will only purge logs from executions, not from triggers, and it will do it execution by execution.
|
||||
The `io.kestra.plugin.core.log.PurgeLogs` task is a better fit to purge, as it will purge logs in bulk and will also purge logs not tied to an execution like trigger logs."""
|
||||
)
|
||||
@Builder.Default
|
||||
private Property<Boolean> purgeLog = Property.ofValue(true);
|
||||
|
||||
@Schema(
|
||||
title = "Whether to purge execution's metrics."
|
||||
title = "Flag specifying whether to purge execution's metrics."
|
||||
)
|
||||
@Builder.Default
|
||||
private Property<Boolean> purgeMetric = Property.ofValue(true);
|
||||
|
||||
@Schema(
|
||||
title = "Whether to purge execution's files from the Kestra's internal storage."
|
||||
title = "Flag specifying whether to purge execution's files from the Kestra's internal storage"
|
||||
)
|
||||
@Builder.Default
|
||||
private Property<Boolean> purgeStorage = Property.ofValue(true);
|
||||
@@ -141,22 +141,22 @@ public class PurgeExecutions extends Task implements RunnableTask<PurgeExecution
|
||||
@Getter
|
||||
public static class Output implements io.kestra.core.models.tasks.Output {
|
||||
@Schema(
|
||||
title = "The count of deleted executions."
|
||||
title = "The count of deleted executions"
|
||||
)
|
||||
private int executionsCount;
|
||||
|
||||
@Schema(
|
||||
title = "The count of deleted logs."
|
||||
title = "The count of deleted logs"
|
||||
)
|
||||
private int logsCount;
|
||||
|
||||
@Schema(
|
||||
title = "The count of deleted storage files."
|
||||
title = "The count of deleted storage files"
|
||||
)
|
||||
private int storagesCount;
|
||||
|
||||
@Schema(
|
||||
title = "The count of deleted metrics."
|
||||
title = "The count of deleted metrics"
|
||||
)
|
||||
private int metricsCount;
|
||||
}
|
||||
|
||||
@@ -64,14 +64,14 @@ public class Resume extends Task implements RunnableTask<VoidOutput> {
|
||||
description = """
|
||||
If you explicitly define an `executionId`, Kestra will use that specific ID.
|
||||
|
||||
If another `namespace` and `flowId` properties are set, Kestra will look for a paused execution for that corresponding flow.
|
||||
If `executionId` is not set and `namespace` and `flowId` properties are set, Kestra will look for a paused execution for that corresponding flow.
|
||||
|
||||
If `executionId` is not set, the task will use the ID of the current execution."""
|
||||
)
|
||||
private Property<String> executionId;
|
||||
|
||||
@Schema(
|
||||
title = "Inputs to be passed to the execution when it's resumed."
|
||||
title = "Inputs to be passed to the execution when it's resumed"
|
||||
)
|
||||
private Property<Map<String, Object>> inputs;
|
||||
|
||||
@@ -93,7 +93,7 @@ public class Resume extends Task implements RunnableTask<VoidOutput> {
|
||||
|
||||
Execution execution = executionRepository.findById(executionInfo.tenantId(), executionInfo.id())
|
||||
.orElseThrow(() -> new IllegalArgumentException("No execution found for execution id " + executionInfo.id()));
|
||||
FlowInterface flow = flowExecutor.findByExecution(execution).orElseThrow(() -> new IllegalArgumentException("Flow not found for execution id " + executionInfo.id()));
|
||||
FlowInterface flow = flowExecutor.findByExecution(execution).orElseThrow(() -> new IllegalArgumentException("Flow not found for execution ID " + executionInfo.id()));
|
||||
|
||||
Map<String, Object> renderedInputs = runContext.render(this.inputs).asMap(String.class, Object.class);
|
||||
renderedInputs = !renderedInputs.isEmpty() ? renderedInputs : null;
|
||||
|
||||
@@ -22,7 +22,7 @@ import java.util.Map;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Allow to set execution variables. These variables will then be available via the `{{ vars.name }}` expression."
|
||||
title = "Allow to set execution variables. These variables are available via the `{{ vars.name }}` expression."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
@@ -53,7 +53,7 @@ public class SetVariables extends Task implements ExecutionUpdatableTask {
|
||||
@NotNull
|
||||
private Property<Map<String, Object>> variables;
|
||||
|
||||
@Schema(title = "Whether to overwrite existing variables")
|
||||
@Schema(title = "Flag specifying whether to overwrite existing variables")
|
||||
@NotNull
|
||||
@Builder.Default
|
||||
private Property<Boolean> overwrite = Property.ofValue(true);
|
||||
|
||||
@@ -21,7 +21,7 @@ import java.util.Map;
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@Schema(
|
||||
title = "Allow to unset execution variables."
|
||||
title = "Unset execution variables."
|
||||
)
|
||||
@Plugin(
|
||||
examples = {
|
||||
@@ -57,7 +57,7 @@ public class UnsetVariables extends Task implements ExecutionUpdatableTask {
|
||||
@NotNull
|
||||
private Property<List<String>> variables;
|
||||
|
||||
@Schema(title = "Whether to ignore missing variables")
|
||||
@Schema(title = "Flag specifying whether to ignore missing variables")
|
||||
@NotNull
|
||||
@Builder.Default
|
||||
private Property<Boolean> ignoreMissing = Property.ofValue(false);
|
||||
|
||||
@@ -91,7 +91,7 @@ public class Dag extends Task implements FlowableTask<VoidOutput> {
|
||||
@NotNull
|
||||
@Builder.Default
|
||||
@Schema(
|
||||
title = "Number of concurrent parallel tasks that can be running at any point in time.",
|
||||
title = "Number of concurrent parallel tasks that can be running at any point in time",
|
||||
description = "If the value is `0`, no concurrency limit exists for the tasks in a DAG and all tasks that can run in parallel will start at the same time."
|
||||
)
|
||||
private final Property<Integer> concurrent = Property.ofValue(0);
|
||||
@@ -134,7 +134,7 @@ public class Dag extends Task implements FlowableTask<VoidOutput> {
|
||||
private void controlTask() throws IllegalVariableEvaluationException {
|
||||
List<String> dagCheckNotExistTasks = this.dagCheckNotExistTask(this.tasks);
|
||||
if (!dagCheckNotExistTasks.isEmpty()) {
|
||||
throw new IllegalVariableEvaluationException("Some task doesn't exists on task '" + this.id + "': " + String.join(", ", dagCheckNotExistTasks));
|
||||
throw new IllegalVariableEvaluationException("Some task doesn't exist on task '" + this.id + "': " + String.join(", ", dagCheckNotExistTasks));
|
||||
}
|
||||
|
||||
ArrayList<String> cyclicDependenciesTasks = this.dagCheckCyclicDependencies(this.tasks);
|
||||
@@ -238,14 +238,14 @@ public class Dag extends Task implements FlowableTask<VoidOutput> {
|
||||
public static class DagTask {
|
||||
@NotNull
|
||||
@Schema(
|
||||
title = "The task within the DAG."
|
||||
title = "The task within the DAG"
|
||||
)
|
||||
@PluginProperty
|
||||
private Task task;
|
||||
|
||||
@PluginProperty
|
||||
@Schema(
|
||||
title = "The list of task IDs that should have been successfully executed before starting this task."
|
||||
title = "The list of task IDs that should have been successfully executed before starting this task"
|
||||
)
|
||||
private List<String> dependsOn;
|
||||
}
|
||||
|
||||
@@ -126,15 +126,15 @@ public class EachParallel extends Parallel implements FlowableTask<VoidOutput> {
|
||||
@NotNull
|
||||
@Builder.Default
|
||||
@Schema(
|
||||
title = "Number of concurrent parallel tasks that can be running at any point in time.",
|
||||
description = "If the value is `0`, no limit exist and all the tasks will start at the same time."
|
||||
title = "Number of concurrent parallel tasks that can be running at any point in time",
|
||||
description = "If the value is `0`, no limit exists and all the tasks will start at the same time."
|
||||
)
|
||||
private final Property<Integer> concurrent = Property.ofValue(0);
|
||||
|
||||
@NotNull
|
||||
@PluginProperty(dynamic = true)
|
||||
@Schema(
|
||||
title = "The list of values for this task.",
|
||||
title = "The list of values for this task",
|
||||
description = "The value can be passed as a string, a list of strings, or a list of objects.",
|
||||
oneOf = {String.class, Object[].class}
|
||||
)
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user