mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-29 09:00:26 -05:00
Compare commits
414 Commits
chore/micr
...
v0.24.5
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0a81f67981 | ||
|
|
c4757ed915 | ||
|
|
4577070e32 | ||
|
|
7bd519ddb4 | ||
|
|
62c85078b6 | ||
|
|
3718be9658 | ||
|
|
2df6c1b730 | ||
|
|
9af86ea677 | ||
|
|
8601905994 | ||
|
|
9de1a15d02 | ||
|
|
25b056ebb3 | ||
|
|
87d8f9867f | ||
|
|
a00c1f8397 | ||
|
|
f4470095ff | ||
|
|
cbfaa8815d | ||
|
|
10e55bbb77 | ||
|
|
59d5d4cb91 | ||
|
|
e8ee3b0a84 | ||
|
|
602ff849e3 | ||
|
|
155bdca83f | ||
|
|
faaaeada3a | ||
|
|
6ef35974d7 | ||
|
|
46f9bb768f | ||
|
|
ab87f63e8c | ||
|
|
cdb73ccbd7 | ||
|
|
8fc936e0a3 | ||
|
|
1e0ebc94b8 | ||
|
|
5318592eff | ||
|
|
2da08f160d | ||
|
|
8cbc9e7aff | ||
|
|
f8e15d103f | ||
|
|
49794a4f2a | ||
|
|
bafa5fe03c | ||
|
|
208b244f0f | ||
|
|
b93976091d | ||
|
|
eec52d76f0 | ||
|
|
b96fd87572 | ||
|
|
1aa5bfab43 | ||
|
|
c4572e86a5 | ||
|
|
f2f97bb70c | ||
|
|
804c740d3c | ||
|
|
75cd4f44e0 | ||
|
|
f167a2a2bb | ||
|
|
08d9416e3a | ||
|
|
2a879c617c | ||
|
|
3227ca7c11 | ||
|
|
428a52ce02 | ||
|
|
f58bc4caba | ||
|
|
e99ae9513f | ||
|
|
c8b51fcacf | ||
|
|
813b2f6439 | ||
|
|
c6b5bca25b | ||
|
|
de35d2cdb9 | ||
|
|
a6ffbd59d0 | ||
|
|
568740a214 | ||
|
|
aa0d2c545f | ||
|
|
cda77d5146 | ||
|
|
d4fd1f61ba | ||
|
|
9859ea5eb6 | ||
|
|
aca374a28f | ||
|
|
c413ba95e1 | ||
|
|
9c6b92619e | ||
|
|
8173e8df51 | ||
|
|
5c95505911 | ||
|
|
33f0b533bb | ||
|
|
23e35a7f97 | ||
|
|
0357321c58 | ||
|
|
5c08403398 | ||
|
|
a63cb71218 | ||
|
|
317885b91c | ||
|
|
87637302e4 | ||
|
|
056faaaf9f | ||
|
|
54c74a1328 | ||
|
|
fae0c88c5e | ||
|
|
db5d83d1cb | ||
|
|
066b947762 | ||
|
|
b6597475b1 | ||
|
|
f2610baf15 | ||
|
|
b619bf76d8 | ||
|
|
117f453a77 | ||
|
|
053d6276ff | ||
|
|
3870eca70b | ||
|
|
afd7c216f9 | ||
|
|
59a17e88e7 | ||
|
|
99f8dca1c2 | ||
|
|
1068c9fe51 | ||
|
|
ea6d30df7c | ||
|
|
04ba7363c2 | ||
|
|
281a987944 | ||
|
|
c9ce54b0be | ||
|
|
ccd9baef3c | ||
|
|
97869b9c75 | ||
|
|
1c681c1492 | ||
|
|
de2a446f93 | ||
|
|
d778947017 | ||
|
|
3f97845fdd | ||
|
|
631cd169a1 | ||
|
|
1648fa076c | ||
|
|
474806882e | ||
|
|
65467bd118 | ||
|
|
387bbb80ac | ||
|
|
19d4c64f19 | ||
|
|
809c0a228c | ||
|
|
6a045900fb | ||
|
|
4ada5fe8f3 | ||
|
|
998087ca30 | ||
|
|
146338e48f | ||
|
|
de177b925e | ||
|
|
04bfb19095 | ||
|
|
c913c48785 | ||
|
|
0d5b593d42 | ||
|
|
83f92535c5 | ||
|
|
fd6a0a6c11 | ||
|
|
104c4c97b4 | ||
|
|
21cd21269f | ||
|
|
679befa2fe | ||
|
|
8a0ecdeb8a | ||
|
|
ee8762e138 | ||
|
|
d16324f265 | ||
|
|
a518fefecd | ||
|
|
1d3210fd7d | ||
|
|
597f84ecb7 | ||
|
|
5f3c7ac9f0 | ||
|
|
77c4691b04 | ||
|
|
6d34416529 | ||
|
|
40a67d5dcd | ||
|
|
2c68c704f6 | ||
|
|
e59d9f622c | ||
|
|
c951ba39a7 | ||
|
|
a0200cfacb | ||
|
|
c6310f0697 | ||
|
|
21ba59a525 | ||
|
|
4f9e3cd06c | ||
|
|
e74010d1a4 | ||
|
|
465e6467e9 | ||
|
|
c68c1b16d9 | ||
|
|
468c32156e | ||
|
|
6e0a1c61ef | ||
|
|
552d55ef6b | ||
|
|
08b0b682bf | ||
|
|
cff90c93bb | ||
|
|
ea465056d0 | ||
|
|
02f150f0b0 | ||
|
|
95d95d3d3c | ||
|
|
6b8d3d6928 | ||
|
|
1e347073ca | ||
|
|
ac09dcecd9 | ||
|
|
40b337cd22 | ||
|
|
5377d16036 | ||
|
|
f717bc413f | ||
|
|
d6bed2d235 | ||
|
|
07fd74b238 | ||
|
|
60eef29de2 | ||
|
|
20ca7b6380 | ||
|
|
9d82df61c6 | ||
|
|
e78210b5eb | ||
|
|
83143fae83 | ||
|
|
25f5ccc6b5 | ||
|
|
cf3e49a284 | ||
|
|
9a72d378df | ||
|
|
752a927fac | ||
|
|
4053392921 | ||
|
|
8b0483643a | ||
|
|
5feeb41c7a | ||
|
|
d7f5e5c05d | ||
|
|
4840f723fc | ||
|
|
8cf159b281 | ||
|
|
4c79576113 | ||
|
|
f87f2ed753 | ||
|
|
298a6c7ca8 | ||
|
|
ab464fff6e | ||
|
|
6dcba16314 | ||
|
|
80a328e87e | ||
|
|
f2034f4975 | ||
|
|
edca56d168 | ||
|
|
076434cc7c | ||
|
|
69d2b97416 | ||
|
|
a7b07e5556 | ||
|
|
ee6a2ae9a3 | ||
|
|
e36925c879 | ||
|
|
df63fc56fc | ||
|
|
eb22d3f6ee | ||
|
|
150145692f | ||
|
|
a900d8f5bb | ||
|
|
3e70aacb9c | ||
|
|
31658a1862 | ||
|
|
694ee7ed86 | ||
|
|
83fb225577 | ||
|
|
1d89f53526 | ||
|
|
6d72804a54 | ||
|
|
26bd7dab97 | ||
|
|
1925d7832c | ||
|
|
379649785d | ||
|
|
302ec94bee | ||
|
|
02f97dfd88 | ||
|
|
ac9f44b766 | ||
|
|
c287304264 | ||
|
|
6510cdfbdc | ||
|
|
298e9f3ab7 | ||
|
|
45291eb2c4 | ||
|
|
ebd47b31b1 | ||
|
|
48a3a3cbbf | ||
|
|
fc7b7738bd | ||
|
|
06ffa6602b | ||
|
|
1336cca81a | ||
|
|
f0ab8a3067 | ||
|
|
3cfd5ebe4d | ||
|
|
f97ad45cef | ||
|
|
2a9a0c7484 | ||
|
|
9eeffa089c | ||
|
|
19df58c6da | ||
|
|
d190522bfd | ||
|
|
cbd48b0075 | ||
|
|
ea1603f051 | ||
|
|
d24f6059d9 | ||
|
|
12c8db40ae | ||
|
|
3660e1a990 | ||
|
|
ca96c7b5dc | ||
|
|
d9bdcc5b20 | ||
|
|
c31fae4cc9 | ||
|
|
87480d81b8 | ||
|
|
251a821322 | ||
|
|
3d0b2b7f01 | ||
|
|
0811258d2e | ||
|
|
aecd4cc5dd | ||
|
|
b1d41f6f47 | ||
|
|
a9d215996b | ||
|
|
812c8b5718 | ||
|
|
bc3d534ba6 | ||
|
|
ef4f1bdd1f | ||
|
|
6bc1e3ec4d | ||
|
|
80d394fd6a | ||
|
|
30c4f11b8a | ||
|
|
7bd21887d1 | ||
|
|
770438eb66 | ||
|
|
a8838102ec | ||
|
|
19161cc078 | ||
|
|
6c48571101 | ||
|
|
c09dafca01 | ||
|
|
3a3dadd8e9 | ||
|
|
68c1abb6f2 | ||
|
|
cfea378104 | ||
|
|
d1badab05b | ||
|
|
581442c427 | ||
|
|
02430a00b5 | ||
|
|
f7c5fd3984 | ||
|
|
3f4b39ec4f | ||
|
|
ddfe637828 | ||
|
|
e09a89ac03 | ||
|
|
bbb5c2a6e0 | ||
|
|
bbf22d8813 | ||
|
|
243522372d | ||
|
|
2a24e29bd9 | ||
|
|
d7d52cba5a | ||
|
|
8319ad7439 | ||
|
|
4996ccdefd | ||
|
|
66889a3d92 | ||
|
|
fc0b52dbd0 | ||
|
|
c7b9e1846e | ||
|
|
fe485243f7 | ||
|
|
8637bb847f | ||
|
|
c8d89dbdd4 | ||
|
|
71e3b19f02 | ||
|
|
5457c216c8 | ||
|
|
aa2d88fcbb | ||
|
|
393faed512 | ||
|
|
0e8e65af7c | ||
|
|
133151377f | ||
|
|
fa2bf8fc5c | ||
|
|
614c7b2226 | ||
|
|
05cb79f4b6 | ||
|
|
278dbd8b82 | ||
|
|
98d1ab57cc | ||
|
|
f2fd9f398d | ||
|
|
b72381e2cb | ||
|
|
14e853ce40 | ||
|
|
7ebf5989a5 | ||
|
|
b70faea505 | ||
|
|
f54ed8a488 | ||
|
|
6a796b0a25 | ||
|
|
ec7e65d794 | ||
|
|
0c5e190350 | ||
|
|
1014cdefeb | ||
|
|
efdc29f30a | ||
|
|
a44b7f78fb | ||
|
|
90eb0ffa4f | ||
|
|
85b5002acf | ||
|
|
4fd66d7781 | ||
|
|
362858e4d7 | ||
|
|
06e4c9f110 | ||
|
|
a71e46169f | ||
|
|
4e1c4b7708 | ||
|
|
75f5348db1 | ||
|
|
5b5b616def | ||
|
|
ec360bd658 | ||
|
|
4f2a37c31f | ||
|
|
90d572ef33 | ||
|
|
ceecab1811 | ||
|
|
25592ec203 | ||
|
|
d935333c5b | ||
|
|
e2571ba523 | ||
|
|
94dc1cea25 | ||
|
|
87bb87bbbc | ||
|
|
47955fc3c3 | ||
|
|
be23ac591c | ||
|
|
578e34ee17 | ||
|
|
05959ee28c | ||
|
|
ae2ce394c9 | ||
|
|
a3fa2051ce | ||
|
|
6fb0858710 | ||
|
|
df94a248e2 | ||
|
|
f826d9ac8e | ||
|
|
8d652d5185 | ||
|
|
c4680836a6 | ||
|
|
77f0f5bb87 | ||
|
|
c68808582b | ||
|
|
3a10a52320 | ||
|
|
0a6bfd1389 | ||
|
|
b7201055a8 | ||
|
|
710f9a3373 | ||
|
|
0402362499 | ||
|
|
15d3caf62c | ||
|
|
4dc1e52b08 | ||
|
|
6f62988135 | ||
|
|
8080bbf964 | ||
|
|
565bee96c9 | ||
|
|
44f93c0b13 | ||
|
|
51f831586a | ||
|
|
93b9932469 | ||
|
|
d10b11ed1f | ||
|
|
f57ab7a828 | ||
|
|
b97f93f2f9 | ||
|
|
8094756601 | ||
|
|
0de5236f8a | ||
|
|
1b7034d154 | ||
|
|
732f1d95d7 | ||
|
|
aa3b118cb5 | ||
|
|
2543ad7216 | ||
|
|
98463335aa | ||
|
|
d46ebe2b4a | ||
|
|
fb96fc2f05 | ||
|
|
910bceb900 | ||
|
|
b6475d8552 | ||
|
|
e136e1ca9a | ||
|
|
0f6ae24b8e | ||
|
|
cc7f1e25e3 | ||
|
|
63dbff1e7a | ||
|
|
9c6b59c362 | ||
|
|
72341b8090 | ||
|
|
0c730843c6 | ||
|
|
05b50c22e3 | ||
|
|
cf4f6554e6 | ||
|
|
7ec1439bb7 | ||
|
|
d1b025253a | ||
|
|
aa272418cf | ||
|
|
ef098c2489 | ||
|
|
c671414958 | ||
|
|
6bb42641a1 | ||
|
|
acca4ddd55 | ||
|
|
e75a4a7500 | ||
|
|
4afa7dc969 | ||
|
|
c953e24931 | ||
|
|
b70545967e | ||
|
|
02302fa54c | ||
|
|
ff8c224554 | ||
|
|
f54c46e238 | ||
|
|
750fa4cc8c | ||
|
|
97b01ab6a4 | ||
|
|
eafaf32938 | ||
|
|
6a4397fdfd | ||
|
|
2109fa8116 | ||
|
|
7de415e54f | ||
|
|
a7307b6a0c | ||
|
|
63613572a5 | ||
|
|
157e942499 | ||
|
|
27d1069acd | ||
|
|
4a8b3d4d7d | ||
|
|
475c8d3ce2 | ||
|
|
093ae3ae39 | ||
|
|
6585d2446a | ||
|
|
2a53f55c3d | ||
|
|
f9b10407f0 | ||
|
|
d63039f7f9 | ||
|
|
3c4c1ed275 | ||
|
|
ce1f8a5cc3 | ||
|
|
c4581d1442 | ||
|
|
5c9bb7a110 | ||
|
|
c145a0224b | ||
|
|
64121eb24d | ||
|
|
fdd7906412 | ||
|
|
ad87583939 | ||
|
|
250ada9689 | ||
|
|
e814c68703 | ||
|
|
932be71d47 | ||
|
|
0c9b5222d6 | ||
|
|
ad52c59f2e | ||
|
|
2e9a1478e8 | ||
|
|
833fa56270 | ||
|
|
34af565257 | ||
|
|
d61d697665 | ||
|
|
d698ef56bf | ||
|
|
b544e257c3 | ||
|
|
2b49c88eab | ||
|
|
82a8a118c0 | ||
|
|
6d9ef2bb38 | ||
|
|
2b3324797b | ||
|
|
281e1ef979 | ||
|
|
3637f4f646 | ||
|
|
dfc0bcbb45 | ||
|
|
c6e01a7ecd | ||
|
|
f60cc48230 | ||
|
|
abc4e16372 | ||
|
|
0e2d5376b7 | ||
|
|
eb8c5ec494 |
@@ -27,11 +27,6 @@ In the meantime, you can move onto the next step...
|
||||
|
||||
- Create a `.env.development.local` file in the `ui` folder and paste the following:
|
||||
|
||||
```bash
|
||||
# This lets the frontend know what the backend URL is but you are free to change this to your actual server URL e.g. hosted version of Kestra.
|
||||
VITE_APP_API_URL=http://localhost:8080
|
||||
```
|
||||
|
||||
- Navigate into the `ui` folder and run `npm install` to install the dependencies for the frontend project.
|
||||
|
||||
- Now go to the `cli/src/main/resources` folder and create a `application-override.yml` file.
|
||||
@@ -74,9 +69,6 @@ kestra:
|
||||
path: /tmp/kestra-wd/tmp
|
||||
anonymous-usage-report:
|
||||
enabled: false
|
||||
server:
|
||||
basic-auth:
|
||||
enabled: false
|
||||
|
||||
datasources:
|
||||
postgres:
|
||||
|
||||
1
.github/CONTRIBUTING.md
vendored
1
.github/CONTRIBUTING.md
vendored
@@ -80,7 +80,6 @@ python3 -m pip install virtualenv
|
||||
The frontend is made with [Vue.js](https://vuejs.org/) and located on the `/ui` folder.
|
||||
|
||||
- `npm install`
|
||||
- create a file `ui/.env.development.local` with content `VITE_APP_API_URL=http://localhost:8080` (or your actual server url)
|
||||
- `npm run dev` will start the development server with hot reload.
|
||||
- The server start by default on port 5173 and is reachable on `http://localhost:5173`
|
||||
- You can run `npm run build` in order to build the front-end that will be delivered from the backend (without running the `npm run dev`) above.
|
||||
|
||||
6
.github/workflows/auto-translate-ui-keys.yml
vendored
6
.github/workflows/auto-translate-ui-keys.yml
vendored
@@ -2,7 +2,7 @@ name: Auto-Translate UI keys and create PR
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: "0 9-21 * * *" # Every hour from 9 AM to 9 PM
|
||||
- cron: "0 9-21/3 * * *" # Every 3 hours from 9 AM to 9 PM
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
retranslate_modified_keys:
|
||||
@@ -20,7 +20,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 10
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
name: Checkout
|
||||
with:
|
||||
fetch-depth: 0
|
||||
@@ -61,7 +61,7 @@ jobs:
|
||||
fi
|
||||
git commit -m "chore(core): localize to languages other than english" -m "Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference."
|
||||
git push -u origin $BRANCH_NAME || (git push origin --delete $BRANCH_NAME && git push -u origin $BRANCH_NAME)
|
||||
gh pr create --title "Translations from en.json" --body "This PR was created automatically by a GitHub Action." --base develop --head $BRANCH_NAME --assignee anna-geller --reviewer anna-geller
|
||||
gh pr create --title "Translations from en.json" --body $'This PR was created automatically by a GitHub Action.\n\nSomeone from the @kestra-io/frontend team needs to review and merge.' --base ${{ github.ref_name }} --head $BRANCH_NAME
|
||||
|
||||
- name: Check keys matching
|
||||
run: node ui/src/translations/check.js
|
||||
|
||||
2
.github/workflows/codeql-analysis.yml
vendored
2
.github/workflows/codeql-analysis.yml
vendored
@@ -27,7 +27,7 @@ jobs:
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
# We must fetch at least the immediate parents so that if this is
|
||||
# a pull request then we can checkout the head.
|
||||
|
||||
147
.github/workflows/docker.yml
vendored
147
.github/workflows/docker.yml
vendored
@@ -1,147 +0,0 @@
|
||||
name: Create Docker images on Release
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
retag-latest:
|
||||
description: 'Retag latest Docker images'
|
||||
required: true
|
||||
type: string
|
||||
default: "true"
|
||||
options:
|
||||
- "true"
|
||||
- "false"
|
||||
release-tag:
|
||||
description: 'Kestra Release Tag'
|
||||
required: false
|
||||
type: string
|
||||
plugin-version:
|
||||
description: 'Plugin version'
|
||||
required: false
|
||||
type: string
|
||||
default: "LATEST"
|
||||
env:
|
||||
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
jobs:
|
||||
plugins:
|
||||
name: List Plugins
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
plugins: ${{ steps.plugins.outputs.plugins }}
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
# Get Plugins List
|
||||
- name: Get Plugins List
|
||||
uses: ./.github/actions/plugins-list
|
||||
id: plugins
|
||||
with:
|
||||
plugin-version: ${{ env.PLUGIN_VERSION }}
|
||||
docker:
|
||||
name: Publish Docker
|
||||
needs: [ plugins ]
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
image:
|
||||
- name: "-no-plugins"
|
||||
plugins: ""
|
||||
packages: jattach
|
||||
python-libs: ""
|
||||
- name: ""
|
||||
plugins: ${{needs.plugins.outputs.plugins}}
|
||||
packages: python3 python-is-python3 python3-pip curl jattach
|
||||
python-libs: kestra
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
# Vars
|
||||
- name: Set image name
|
||||
id: vars
|
||||
run: |
|
||||
if [[ "${{ inputs.release-tag }}" == "" ]]; then
|
||||
TAG=${GITHUB_REF#refs/*/}
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
else
|
||||
TAG="${{ inputs.release-tag }}"
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
if [[ "${{ env.PLUGIN_VERSION }}" == *"-SNAPSHOT" ]]; then
|
||||
echo "plugins=--repositories=https://s01.oss.sonatype.org/content/repositories/snapshots ${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT;
|
||||
else
|
||||
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
# Download release
|
||||
- name: Download release
|
||||
uses: robinraju/release-downloader@v1.12
|
||||
with:
|
||||
tag: ${{steps.vars.outputs.tag}}
|
||||
fileName: 'kestra-*'
|
||||
out-file-path: build/executable
|
||||
|
||||
- name: Copy exe to image
|
||||
run: |
|
||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||
|
||||
# Docker setup
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Docker - Fix Qemu
|
||||
shell: bash
|
||||
run: |
|
||||
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
# Docker Login
|
||||
- name: Login to DockerHub
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
username: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
|
||||
# Docker Build and push
|
||||
- name: Push to Docker Hub
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
context: .
|
||||
push: true
|
||||
tags: ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }}
|
||||
platforms: linux/amd64,linux/arm64
|
||||
build-args: |
|
||||
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }}
|
||||
APT_PACKAGES=${{ matrix.image.packages }}
|
||||
PYTHON_LIBRARIES=${{ matrix.image.python-libs }}
|
||||
|
||||
- name: Install regctl
|
||||
if: github.event.inputs.retag-latest == 'true'
|
||||
uses: regclient/actions/regctl-installer@main
|
||||
|
||||
- name: Retag to latest
|
||||
if: github.event.inputs.retag-latest == 'true'
|
||||
run: |
|
||||
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
|
||||
|
||||
end:
|
||||
runs-on: ubuntu-latest
|
||||
needs:
|
||||
- docker
|
||||
if: always()
|
||||
env:
|
||||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
steps:
|
||||
|
||||
# Slack
|
||||
- name: Slack notification
|
||||
uses: Gamesight/slack-workflow-status@master
|
||||
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
|
||||
with:
|
||||
repo_token: ${{ secrets.GITHUB_TOKEN }}
|
||||
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
name: GitHub Actions
|
||||
icon_emoji: ':github-actions:'
|
||||
channel: 'C02DQ1A7JLR' # _int_git channel
|
||||
21
.github/workflows/e2e.yml
vendored
21
.github/workflows/e2e.yml
vendored
@@ -19,7 +19,7 @@ on:
|
||||
default: "no input"
|
||||
jobs:
|
||||
check:
|
||||
timeout-minutes: 10
|
||||
timeout-minutes: 15
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
|
||||
@@ -32,10 +32,19 @@ jobs:
|
||||
password: ${{ github.token }}
|
||||
|
||||
- name: Checkout kestra
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
path: kestra
|
||||
|
||||
# Setup build
|
||||
- uses: kestra-io/actions/.github/actions/setup-build@main
|
||||
name: Setup - Build
|
||||
id: build
|
||||
with:
|
||||
java-enabled: true
|
||||
node-enabled: true
|
||||
python-enabled: true
|
||||
|
||||
- name: Install Npm dependencies
|
||||
run: |
|
||||
cd kestra/ui
|
||||
@@ -44,8 +53,8 @@ jobs:
|
||||
|
||||
- name: Run E2E Tests
|
||||
run: |
|
||||
cd kestra/ui
|
||||
npm run test:e2e
|
||||
cd kestra
|
||||
sh build-and-start-e2e-tests.sh
|
||||
|
||||
- name: Upload Playwright Report as Github artifact
|
||||
# 'With this report, you can analyze locally the results of the tests. see https://playwright.dev/docs/ci-intro#html-report'
|
||||
@@ -53,7 +62,7 @@ jobs:
|
||||
if: ${{ !cancelled() }}
|
||||
with:
|
||||
name: playwright-report
|
||||
path: kestra/playwright-report/
|
||||
path: kestra/ui/playwright-report/
|
||||
retention-days: 7
|
||||
# Allure check
|
||||
# TODO I don't know what it should do
|
||||
@@ -74,4 +83,4 @@ jobs:
|
||||
# baseUrl: "https://internal.dev.kestra.io"
|
||||
# prefix: ${{ format('{0}/{1}', github.repository, 'allure/java') }}
|
||||
# copyLatest: true
|
||||
# ignoreMissingResults: true
|
||||
# ignoreMissingResults: true
|
||||
|
||||
4
.github/workflows/gradle-release-plugins.yml
vendored
4
.github/workflows/gradle-release-plugins.yml
vendored
@@ -21,12 +21,12 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
|
||||
4
.github/workflows/gradle-release.yml
vendored
4
.github/workflows/gradle-release.yml
vendored
@@ -33,13 +33,13 @@ jobs:
|
||||
exit 1;
|
||||
fi
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
path: kestra
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
|
||||
10
.github/workflows/main.yml
vendored
10
.github/workflows/main.yml
vendored
@@ -4,9 +4,8 @@ on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
default: 'LATEST'
|
||||
required: true
|
||||
description: "plugins version"
|
||||
required: false
|
||||
type: string
|
||||
push:
|
||||
branches:
|
||||
@@ -34,7 +33,7 @@ jobs:
|
||||
if: "!startsWith(github.ref, 'refs/heads/releases')"
|
||||
uses: ./.github/workflows/workflow-release.yml
|
||||
with:
|
||||
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
plugin-version: ${{ inputs.plugin-version != '' && inputs.plugin-version || (github.ref == 'refs/heads/develop' && 'LATEST-SNAPSHOT' || 'LATEST') }}
|
||||
secrets:
|
||||
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
@@ -43,7 +42,8 @@ jobs:
|
||||
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
|
||||
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
|
||||
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
|
||||
|
||||
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
|
||||
end:
|
||||
runs-on: ubuntu-latest
|
||||
needs:
|
||||
|
||||
4
.github/workflows/pull-request.yml
vendored
4
.github/workflows/pull-request.yml
vendored
@@ -56,6 +56,10 @@ jobs:
|
||||
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
|
||||
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
|
||||
|
||||
e2e-tests:
|
||||
name: E2E - Tests
|
||||
uses: ./.github/workflows/e2e.yml
|
||||
|
||||
end:
|
||||
name: End
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
2
.github/workflows/setversion-tag-plugins.yml
vendored
2
.github/workflows/setversion-tag-plugins.yml
vendored
@@ -17,7 +17,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
|
||||
9
.github/workflows/setversion-tag.yml
vendored
9
.github/workflows/setversion-tag.yml
vendored
@@ -34,11 +34,14 @@ jobs:
|
||||
fi
|
||||
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
token: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
|
||||
- name: Configure Git
|
||||
# Configure
|
||||
- name: Git - Configure
|
||||
run: |
|
||||
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
|
||||
git config --global user.name "github-actions[bot]"
|
||||
@@ -54,4 +57,4 @@ jobs:
|
||||
git commit -m"chore(version): update to version '$RELEASE_VERSION'"
|
||||
git push
|
||||
git tag -a "v$RELEASE_VERSION" -m"v$RELEASE_VERSION"
|
||||
git push --tags
|
||||
git push --tags
|
||||
|
||||
16
.github/workflows/vulnerabilities-check.yml
vendored
16
.github/workflows/vulnerabilities-check.yml
vendored
@@ -17,12 +17,12 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
@@ -66,12 +66,12 @@ jobs:
|
||||
actions: read
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
@@ -87,7 +87,7 @@ jobs:
|
||||
|
||||
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
|
||||
- name: Docker Vulnerabilities Check
|
||||
uses: aquasecurity/trivy-action@0.31.0
|
||||
uses: aquasecurity/trivy-action@0.32.0
|
||||
with:
|
||||
image-ref: kestra/kestra:develop
|
||||
format: 'template'
|
||||
@@ -111,12 +111,12 @@ jobs:
|
||||
actions: read
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
@@ -132,7 +132,7 @@ jobs:
|
||||
|
||||
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
|
||||
- name: Docker Vulnerabilities Check
|
||||
uses: aquasecurity/trivy-action@0.31.0
|
||||
uses: aquasecurity/trivy-action@0.32.0
|
||||
with:
|
||||
image-ref: kestra/kestra:latest
|
||||
format: table
|
||||
|
||||
2
.github/workflows/workflow-backend-test.yml
vendored
2
.github/workflows/workflow-backend-test.yml
vendored
@@ -29,7 +29,7 @@ jobs:
|
||||
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
|
||||
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
name: Checkout - Current ref
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
78
.github/workflows/workflow-build-artifacts.yml
vendored
78
.github/workflows/workflow-build-artifacts.yml
vendored
@@ -1,23 +1,7 @@
|
||||
name: Build Artifacts
|
||||
|
||||
on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
default: 'LATEST'
|
||||
required: true
|
||||
type: string
|
||||
outputs:
|
||||
docker-tag:
|
||||
value: ${{ jobs.build.outputs.docker-tag }}
|
||||
description: "The Docker image Tag for Kestra"
|
||||
docker-artifact-name:
|
||||
value: ${{ jobs.build.outputs.docker-artifact-name }}
|
||||
description: "The GitHub artifact containing the Kestra docker image name."
|
||||
plugins:
|
||||
value: ${{ jobs.build.outputs.plugins }}
|
||||
description: "The Kestra plugins list used for the build."
|
||||
workflow_call: {}
|
||||
|
||||
jobs:
|
||||
build:
|
||||
@@ -31,7 +15,7 @@ jobs:
|
||||
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
steps:
|
||||
- name: Checkout - Current ref
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
@@ -68,7 +52,7 @@ jobs:
|
||||
if [[ $TAG = "master" || $TAG == v* ]]; then
|
||||
echo "plugins=$PLUGINS" >> $GITHUB_OUTPUT
|
||||
else
|
||||
echo "plugins=--repositories=https://s01.oss.sonatype.org/content/repositories/snapshots $PLUGINS" >> $GITHUB_OUTPUT
|
||||
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ $PLUGINS" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
# Build
|
||||
@@ -82,55 +66,6 @@ jobs:
|
||||
run: |
|
||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||
|
||||
# Docker Tag
|
||||
- name: Setup - Docker vars
|
||||
id: vars
|
||||
shell: bash
|
||||
run: |
|
||||
TAG=${GITHUB_REF#refs/*/}
|
||||
if [[ $TAG = "master" ]]
|
||||
then
|
||||
TAG="latest";
|
||||
elif [[ $TAG = "develop" ]]
|
||||
then
|
||||
TAG="develop";
|
||||
elif [[ $TAG = v* ]]
|
||||
then
|
||||
TAG="${TAG}";
|
||||
else
|
||||
TAG="build-${{ github.run_id }}";
|
||||
fi
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
echo "artifact=docker-kestra-${TAG}" >> $GITHUB_OUTPUT
|
||||
|
||||
# Docker setup
|
||||
- name: Docker - Setup QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Docker - Fix Qemu
|
||||
shell: bash
|
||||
run: |
|
||||
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
|
||||
|
||||
- name: Docker - Setup Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
# Docker Build
|
||||
- name: Docker - Build & export image
|
||||
uses: docker/build-push-action@v6
|
||||
if: "!startsWith(github.ref, 'refs/tags/v')"
|
||||
with:
|
||||
context: .
|
||||
push: false
|
||||
file: Dockerfile
|
||||
tags: |
|
||||
kestra/kestra:${{ steps.vars.outputs.tag }}
|
||||
build-args: |
|
||||
KESTRA_PLUGINS=${{ steps.plugins.outputs.plugins }}
|
||||
APT_PACKAGES=${{ env.DOCKER_APT_PACKAGES }}
|
||||
PYTHON_LIBRARIES=${{ env.DOCKER_PYTHON_LIBRARIES }}
|
||||
outputs: type=docker,dest=/tmp/${{ steps.vars.outputs.artifact }}.tar
|
||||
|
||||
# Upload artifacts
|
||||
- name: Artifacts - Upload JAR
|
||||
uses: actions/upload-artifact@v4
|
||||
@@ -143,10 +78,3 @@ jobs:
|
||||
with:
|
||||
name: exe
|
||||
path: build/executable/
|
||||
|
||||
- name: Artifacts - Upload Docker
|
||||
uses: actions/upload-artifact@v4
|
||||
if: "!startsWith(github.ref, 'refs/tags/v')"
|
||||
with:
|
||||
name: ${{ steps.vars.outputs.artifact }}
|
||||
path: /tmp/${{ steps.vars.outputs.artifact }}.tar
|
||||
|
||||
18
.github/workflows/workflow-frontend-test.yml
vendored
18
.github/workflows/workflow-frontend-test.yml
vendored
@@ -20,7 +20,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
|
||||
- name: Cache Node Modules
|
||||
id: cache-node-modules
|
||||
@@ -68,19 +68,3 @@ jobs:
|
||||
- name: Run storybook component tests
|
||||
working-directory: ui
|
||||
run: npm run test:storybook -- --coverage
|
||||
|
||||
- name: Codecov - Upload coverage reports
|
||||
uses: codecov/codecov-action@v5
|
||||
if: ${{ !cancelled() && github.event.pull_request.head.repo.full_name == github.repository }}
|
||||
continue-on-error: true
|
||||
with:
|
||||
token: ${{ secrets.CODECOV_TOKEN }}
|
||||
flags: frontend
|
||||
|
||||
- name: Codecov - Upload test results
|
||||
uses: codecov/test-results-action@v1
|
||||
if: ${{ !cancelled() }}
|
||||
continue-on-error: true
|
||||
with:
|
||||
token: ${{ secrets.CODECOV_TOKEN && github.event.pull_request.head.repo.full_name == github.repository }}
|
||||
flags: frontend
|
||||
37
.github/workflows/workflow-github-release.yml
vendored
37
.github/workflows/workflow-github-release.yml
vendored
@@ -1,14 +1,17 @@
|
||||
name: Github - Release
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
workflow_call:
|
||||
secrets:
|
||||
GH_PERSONAL_TOKEN:
|
||||
description: "The Github personal token."
|
||||
required: true
|
||||
push:
|
||||
tags:
|
||||
- '*'
|
||||
SLACK_RELEASES_WEBHOOK_URL:
|
||||
description: "The Slack webhook URL."
|
||||
required: true
|
||||
|
||||
|
||||
|
||||
jobs:
|
||||
publish:
|
||||
@@ -17,14 +20,14 @@ jobs:
|
||||
steps:
|
||||
# Check out
|
||||
- name: Checkout - Repository
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
submodules: true
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- name: Checkout - Actions
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
sparse-checkout-cone-mode: true
|
||||
@@ -35,18 +38,31 @@ jobs:
|
||||
# Download Exec
|
||||
# Must be done after checkout actions
|
||||
- name: Artifacts - Download executable
|
||||
uses: actions/download-artifact@v4
|
||||
uses: actions/download-artifact@v5
|
||||
if: startsWith(github.ref, 'refs/tags/v')
|
||||
with:
|
||||
name: exe
|
||||
path: build/executable
|
||||
|
||||
- name: Check if current tag is latest
|
||||
id: is_latest
|
||||
run: |
|
||||
latest_tag=$(git tag | grep -E '^v[0-9]+\.[0-9]+\.[0-9]+$' | sed 's/^v//' | sort -V | tail -n1)
|
||||
current_tag="${GITHUB_REF_NAME#v}"
|
||||
if [ "$current_tag" = "$latest_tag" ]; then
|
||||
echo "latest=true" >> $GITHUB_OUTPUT
|
||||
else
|
||||
echo "latest=false" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
env:
|
||||
GITHUB_REF_NAME: ${{ github.ref_name }}
|
||||
|
||||
# GitHub Release
|
||||
- name: Create GitHub release
|
||||
uses: ./actions/.github/actions/github-release
|
||||
if: ${{ startsWith(github.ref, 'refs/tags/v') }}
|
||||
env:
|
||||
MAKE_LATEST: ${{ steps.is_latest.outputs.latest }}
|
||||
GITHUB_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
|
||||
|
||||
@@ -62,4 +78,11 @@ jobs:
|
||||
"new_version": "${{ github.ref_name }}",
|
||||
"github_repository": "${{ github.repository }}",
|
||||
"github_actor": "${{ github.actor }}"
|
||||
}
|
||||
}
|
||||
|
||||
- name: Merge Release Notes
|
||||
if: ${{ startsWith(github.ref, 'refs/tags/v') }}
|
||||
uses: ./actions/.github/actions/github-release-note-merge
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
RELEASE_TAG: ${{ github.ref_name }}
|
||||
|
||||
210
.github/workflows/workflow-publish-docker.yml
vendored
210
.github/workflows/workflow-publish-docker.yml
vendored
@@ -1,22 +1,37 @@
|
||||
name: Publish - Docker
|
||||
name: Create Docker images on Release
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
default: 'LATEST'
|
||||
retag-latest:
|
||||
description: 'Retag latest Docker images'
|
||||
required: true
|
||||
type: choice
|
||||
default: "false"
|
||||
options:
|
||||
- "true"
|
||||
- "false"
|
||||
release-tag:
|
||||
description: 'Kestra Release Tag (by default, deduced with the ref)'
|
||||
required: false
|
||||
type: string
|
||||
plugin-version:
|
||||
description: 'Plugin version'
|
||||
required: false
|
||||
type: string
|
||||
default: "LATEST"
|
||||
force-download-artifact:
|
||||
description: 'Force download artifact'
|
||||
required: false
|
||||
type: string
|
||||
type: choice
|
||||
default: "true"
|
||||
options:
|
||||
- "true"
|
||||
- "false"
|
||||
workflow_call:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
description: "Plugin version"
|
||||
default: 'LATEST'
|
||||
required: false
|
||||
type: string
|
||||
@@ -33,47 +48,93 @@ on:
|
||||
description: "The Dockerhub password."
|
||||
required: true
|
||||
|
||||
env:
|
||||
PLUGIN_VERSION: ${{ inputs.plugin-version != null && inputs.plugin-version || 'LATEST' }}
|
||||
jobs:
|
||||
plugins:
|
||||
name: List Plugins
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
plugins: ${{ steps.plugins.outputs.plugins }}
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v5
|
||||
|
||||
# Get Plugins List
|
||||
- name: Get Plugins List
|
||||
uses: ./.github/actions/plugins-list
|
||||
id: plugins
|
||||
with: # remap LATEST-SNAPSHOT to LATEST
|
||||
plugin-version: ${{ env.PLUGIN_VERSION == 'LATEST-SNAPSHOT' && 'LATEST' || env.PLUGIN_VERSION }}
|
||||
|
||||
# ********************************************************************************************************************
|
||||
# Build
|
||||
# ********************************************************************************************************************
|
||||
build-artifacts:
|
||||
name: Build Artifacts
|
||||
if: ${{ github.event.inputs.force-download-artifact == 'true' }}
|
||||
if: ${{ inputs.force-download-artifact == 'true' }}
|
||||
uses: ./.github/workflows/workflow-build-artifacts.yml
|
||||
with:
|
||||
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
# ********************************************************************************************************************
|
||||
# Docker
|
||||
# ********************************************************************************************************************
|
||||
publish:
|
||||
name: Publish - Docker
|
||||
|
||||
docker:
|
||||
name: Publish Docker
|
||||
needs: [ plugins, build-artifacts ]
|
||||
if: always()
|
||||
runs-on: ubuntu-latest
|
||||
needs: build-artifacts
|
||||
if: |
|
||||
always() &&
|
||||
(needs.build-artifacts.result == 'success' ||
|
||||
github.event.inputs.force-download-artifact != 'true')
|
||||
env:
|
||||
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
strategy:
|
||||
matrix:
|
||||
image:
|
||||
- tag: -no-plugins
|
||||
- name: "-no-plugins"
|
||||
plugins: ""
|
||||
packages: jattach
|
||||
plugins: false
|
||||
python-libraries: ""
|
||||
|
||||
- tag: ""
|
||||
plugins: true
|
||||
packages: python3 python3-venv python-is-python3 python3-pip nodejs npm curl zip unzip jattach
|
||||
python-libraries: kestra
|
||||
python-libs: ""
|
||||
- name: ""
|
||||
plugins: ${{needs.plugins.outputs.plugins}}
|
||||
packages: python3 python-is-python3 python3-pip curl jattach
|
||||
python-libs: kestra
|
||||
steps:
|
||||
- name: Checkout - Current ref
|
||||
uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
|
||||
# Vars
|
||||
- name: Set image name
|
||||
id: vars
|
||||
run: |
|
||||
if [[ "${{ inputs.release-tag }}" == "" ]]; then
|
||||
TAG=${GITHUB_REF#refs/*/}
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
else
|
||||
TAG="${{ inputs.release-tag }}"
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
if [[ $GITHUB_REF == refs/tags/* ]]; then
|
||||
if [[ $TAG =~ ^v[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
|
||||
# this will remove the patch version number
|
||||
MINOR_SEMVER=${TAG%.*}
|
||||
echo "minor_semver=${MINOR_SEMVER}" >> $GITHUB_OUTPUT
|
||||
else
|
||||
echo "Tag '$TAG' is not a valid semver (vMAJOR.MINOR.PATCH), skipping minor_semver"
|
||||
fi
|
||||
fi
|
||||
|
||||
if [[ "${{ env.PLUGIN_VERSION }}" == *"-SNAPSHOT" ]]; then
|
||||
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ ${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT;
|
||||
else
|
||||
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
# Download executable from artifact
|
||||
- name: Artifacts - Download executable
|
||||
uses: actions/download-artifact@v5
|
||||
with:
|
||||
name: exe
|
||||
path: build/executable
|
||||
|
||||
- name: Copy exe to image
|
||||
run: |
|
||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||
|
||||
# Docker setup
|
||||
- name: Docker - Setup QEMU
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Docker - Fix Qemu
|
||||
@@ -81,66 +142,59 @@ jobs:
|
||||
run: |
|
||||
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
|
||||
|
||||
- name: Docker - Setup Docker Buildx
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
# Docker Login
|
||||
- name: Docker - Login to DockerHub
|
||||
- name: Login to DockerHub
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
username: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
|
||||
|
||||
# # Get Plugins List
|
||||
- name: Plugins - Get List
|
||||
uses: ./.github/actions/plugins-list
|
||||
id: plugins-list
|
||||
if: ${{ matrix.image.plugins}}
|
||||
with:
|
||||
plugin-version: ${{ env.PLUGIN_VERSION }}
|
||||
|
||||
# Vars
|
||||
- name: Docker - Set variables
|
||||
shell: bash
|
||||
id: vars
|
||||
run: |
|
||||
TAG=${GITHUB_REF#refs/*/}
|
||||
PLUGINS="${{ matrix.image.plugins == true && steps.plugins-list.outputs.plugins || '' }}"
|
||||
if [[ $TAG == v* ]]; then
|
||||
TAG="${TAG}";
|
||||
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
|
||||
elif [[ $TAG = "develop" ]]; then
|
||||
TAG="develop";
|
||||
echo "plugins=--repositories=https://s01.oss.sonatype.org/content/repositories/snapshots $PLUGINS" >> $GITHUB_OUTPUT
|
||||
else
|
||||
TAG="build-${{ github.run_id }}";
|
||||
echo "plugins=--repositories=https://s01.oss.sonatype.org/content/repositories/snapshots $PLUGINS" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
echo "tag=${TAG}${{ matrix.image.tag }}" >> $GITHUB_OUTPUT
|
||||
|
||||
# Build Docker Image
|
||||
- name: Artifacts - Download executable
|
||||
uses: actions/download-artifact@v4
|
||||
with:
|
||||
name: exe
|
||||
path: build/executable
|
||||
|
||||
- name: Docker - Copy exe to image
|
||||
shell: bash
|
||||
run: |
|
||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||
|
||||
# Docker Build and push
|
||||
- name: Docker - Build image
|
||||
- name: Push to Docker Hub
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
context: .
|
||||
push: true
|
||||
tags: kestra/kestra:${{ steps.vars.outputs.tag }}
|
||||
tags: ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }}
|
||||
platforms: linux/amd64,linux/arm64
|
||||
build-args: |
|
||||
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }}
|
||||
APT_PACKAGES=${{ matrix.image.packages }}
|
||||
PYTHON_LIBRARIES=${{ matrix.image.python-libraries }}
|
||||
PYTHON_LIBRARIES=${{ matrix.image.python-libs }}
|
||||
|
||||
- name: Install regctl
|
||||
if: startsWith(github.ref, 'refs/tags/v')
|
||||
uses: regclient/actions/regctl-installer@main
|
||||
|
||||
- name: Retag to minor semver version
|
||||
if: startsWith(github.ref, 'refs/tags/v') && steps.vars.outputs.minor_semver != ''
|
||||
run: |
|
||||
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.minor_semver, matrix.image.name) }}
|
||||
|
||||
- name: Retag to latest
|
||||
if: startsWith(github.ref, 'refs/tags/v') && inputs.retag-latest == 'true'
|
||||
run: |
|
||||
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
|
||||
|
||||
end:
|
||||
runs-on: ubuntu-latest
|
||||
needs:
|
||||
- docker
|
||||
if: always()
|
||||
env:
|
||||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
steps:
|
||||
|
||||
# Slack
|
||||
- name: Slack notification
|
||||
uses: Gamesight/slack-workflow-status@master
|
||||
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
|
||||
with:
|
||||
repo_token: ${{ secrets.GITHUB_TOKEN }}
|
||||
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
name: GitHub Actions
|
||||
icon_emoji: ':github-actions:'
|
||||
channel: 'C02DQ1A7JLR' # _int_git channel
|
||||
2
.github/workflows/workflow-publish-maven.yml
vendored
2
.github/workflows/workflow-publish-maven.yml
vendored
@@ -25,7 +25,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout - Current ref
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
|
||||
# Setup build
|
||||
- name: Setup - Build
|
||||
|
||||
19
.github/workflows/workflow-release.yml
vendored
19
.github/workflows/workflow-release.yml
vendored
@@ -4,7 +4,7 @@ on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
description: "plugins version"
|
||||
default: 'LATEST'
|
||||
required: false
|
||||
type: string
|
||||
@@ -16,7 +16,7 @@ on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
description: "plugins version"
|
||||
default: 'LATEST'
|
||||
required: false
|
||||
type: string
|
||||
@@ -42,21 +42,25 @@ on:
|
||||
SONATYPE_GPG_FILE:
|
||||
description: "The Sonatype GPG file."
|
||||
required: true
|
||||
GH_PERSONAL_TOKEN:
|
||||
description: "GH personnal Token."
|
||||
required: true
|
||||
SLACK_RELEASES_WEBHOOK_URL:
|
||||
description: "Slack webhook for releases channel."
|
||||
required: true
|
||||
jobs:
|
||||
build-artifacts:
|
||||
name: Build - Artifacts
|
||||
uses: ./.github/workflows/workflow-build-artifacts.yml
|
||||
with:
|
||||
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
|
||||
Docker:
|
||||
name: Publish Docker
|
||||
needs: build-artifacts
|
||||
uses: ./.github/workflows/workflow-publish-docker.yml
|
||||
if: startsWith(github.ref, 'refs/heads/develop') || github.event.inputs.publish-docker == 'true'
|
||||
if: github.ref == 'refs/heads/develop' || inputs.publish-docker == 'true'
|
||||
with:
|
||||
force-download-artifact: 'false'
|
||||
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
plugin-version: ${{ inputs.plugin-version != null && inputs.plugin-version || 'LATEST' }}
|
||||
secrets:
|
||||
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
@@ -77,4 +81,5 @@ jobs:
|
||||
if: startsWith(github.ref, 'refs/tags/v')
|
||||
uses: ./.github/workflows/workflow-github-release.yml
|
||||
secrets:
|
||||
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
|
||||
2
.github/workflows/workflow-test.yml
vendored
2
.github/workflows/workflow-test.yml
vendored
@@ -27,7 +27,7 @@ jobs:
|
||||
ui: ${{ steps.changes.outputs.ui }}
|
||||
backend: ${{ steps.changes.outputs.backend }}
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
if: "!startsWith(github.ref, 'refs/tags/v')"
|
||||
- uses: dorny/paths-filter@v3
|
||||
if: "!startsWith(github.ref, 'refs/tags/v')"
|
||||
|
||||
12
.plugins
12
.plugins
@@ -3,10 +3,12 @@
|
||||
# Format: <RepositoryName>:<GroupId>:<ArtifactId>:<Version>
|
||||
#
|
||||
# Uncomment the lines corresponding to the plugins to be installed:
|
||||
#plugin-ai:io.kestra.plugin:plugin-ai:LATEST
|
||||
#plugin-airbyte:io.kestra.plugin:plugin-airbyte:LATEST
|
||||
#plugin-airflow:io.kestra.plugin:plugin-airflow:LATEST
|
||||
#plugin-amqp:io.kestra.plugin:plugin-amqp:LATEST
|
||||
#plugin-ansible:io.kestra.plugin:plugin-ansible:LATEST
|
||||
#plugin-anthropic:io.kestra.plugin:plugin-anthropic:LATEST
|
||||
#plugin-aws:io.kestra.plugin:plugin-aws:LATEST
|
||||
#plugin-azure:io.kestra.plugin:plugin-azure:LATEST
|
||||
#plugin-cassandra:io.kestra.plugin:plugin-cassandra:LATEST
|
||||
@@ -24,6 +26,7 @@
|
||||
#plugin-debezium:io.kestra.plugin:plugin-debezium-oracle:LATEST
|
||||
#plugin-debezium:io.kestra.plugin:plugin-debezium-postgres:LATEST
|
||||
#plugin-debezium:io.kestra.plugin:plugin-debezium-sqlserver:LATEST
|
||||
#plugin-deepseek:io.kestra.plugin:plugin-deepseek:LATEST
|
||||
#plugin-docker:io.kestra.plugin:plugin-docker:LATEST
|
||||
#plugin-elasticsearch:io.kestra.plugin:plugin-elasticsearch:LATEST
|
||||
#plugin-fivetran:io.kestra.plugin:plugin-fivetran:LATEST
|
||||
@@ -64,31 +67,38 @@
|
||||
#plugin-kafka:io.kestra.plugin:plugin-kafka:LATEST
|
||||
#plugin-kestra:io.kestra.plugin:plugin-kestra:LATEST
|
||||
#plugin-kubernetes:io.kestra.plugin:plugin-kubernetes:LATEST
|
||||
#plugin-langchain4j:io.kestra.plugin:plugin-langchain4j:LATEST
|
||||
#plugin-ldap:io.kestra.plugin:plugin-ldap:LATEST
|
||||
#plugin-linear:io.kestra.plugin:plugin-linear:LATEST
|
||||
#plugin-malloy:io.kestra.plugin:plugin-malloy:LATEST
|
||||
#plugin-meilisearch:io.kestra.plugin:plugin-meilisearch:LATEST
|
||||
#plugin-minio:io.kestra.plugin:plugin-minio:LATEST
|
||||
#plugin-mistral:io.kestra.plugin:plugin-mistral:LATEST
|
||||
#plugin-modal:io.kestra.plugin:plugin-modal:LATEST
|
||||
#plugin-mongodb:io.kestra.plugin:plugin-mongodb:LATEST
|
||||
#plugin-mqtt:io.kestra.plugin:plugin-mqtt:LATEST
|
||||
#plugin-nats:io.kestra.plugin:plugin-nats:LATEST
|
||||
#plugin-neo4j:io.kestra.plugin:plugin-neo4j:LATEST
|
||||
#plugin-notifications:io.kestra.plugin:plugin-notifications:LATEST
|
||||
#plugin-notion:io.kestra.plugin:plugin-notion:LATEST
|
||||
#plugin-ollama:io.kestra.plugin:plugin-ollama:LATEST
|
||||
#plugin-openai:io.kestra.plugin:plugin-openai:LATEST
|
||||
#plugin-opensearch:io.kestra.plugin:plugin-opensearch:LATEST
|
||||
#plugin-perplexity:io.kestra.plugin:plugin-perplexity:LATEST
|
||||
#plugin-powerbi:io.kestra.plugin:plugin-powerbi:LATEST
|
||||
#plugin-pulsar:io.kestra.plugin:plugin-pulsar:LATEST
|
||||
#plugin-redis:io.kestra.plugin:plugin-redis:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-bun:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-deno:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-go:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-groovy:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-jbang:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-julia:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-jython:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-lua:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-nashorn:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-node:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-perl:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-php:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-powershell:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-python:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-r:LATEST
|
||||
|
||||
5
Makefile
5
Makefile
@@ -77,7 +77,7 @@ install-plugins:
|
||||
else \
|
||||
${KESTRA_BASEDIR}/bin/kestra plugins install $$CURRENT_PLUGIN \
|
||||
--plugins ${KESTRA_BASEDIR}/plugins \
|
||||
--repositories=https://s01.oss.sonatype.org/content/repositories/snapshots || exit 1; \
|
||||
--repositories=https://central.sonatype.com/repository/maven-snapshots || exit 1; \
|
||||
fi \
|
||||
done < $$PLUGIN_LIST
|
||||
|
||||
@@ -130,9 +130,6 @@ datasources:
|
||||
username: kestra
|
||||
password: k3str4
|
||||
kestra:
|
||||
server:
|
||||
basic-auth:
|
||||
enabled: false
|
||||
encryption:
|
||||
secret-key: 3ywuDa/Ec61VHkOX3RlI9gYq7CaD0mv0Pf3DHtAXA6U=
|
||||
repository:
|
||||
|
||||
46
build-and-start-e2e-tests.sh
Executable file
46
build-and-start-e2e-tests.sh
Executable file
@@ -0,0 +1,46 @@
|
||||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
# E2E main script that can be run on a dev computer or in the CI
|
||||
# it will build the backend of the current git repo and the frontend
|
||||
# create a docker image out of it
|
||||
# run tests on this image
|
||||
|
||||
|
||||
LOCAL_IMAGE_VERSION="local-e2e"
|
||||
|
||||
echo "Running E2E"
|
||||
echo "Start time: $(date '+%Y-%m-%d %H:%M:%S')"
|
||||
start_time=$(date +%s)
|
||||
|
||||
echo ""
|
||||
echo "Building the image for this current repository"
|
||||
make build-docker VERSION=$LOCAL_IMAGE_VERSION
|
||||
|
||||
end_time=$(date +%s)
|
||||
elapsed=$(( end_time - start_time ))
|
||||
|
||||
echo ""
|
||||
echo "building elapsed time: ${elapsed} seconds"
|
||||
echo ""
|
||||
echo "Start time: $(date '+%Y-%m-%d %H:%M:%S')"
|
||||
start_time2=$(date +%s)
|
||||
|
||||
echo "cd ./ui"
|
||||
cd ./ui
|
||||
echo "npm i"
|
||||
npm i
|
||||
|
||||
echo 'sh ./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"'
|
||||
sh ./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"
|
||||
|
||||
end_time2=$(date +%s)
|
||||
elapsed2=$(( end_time2 - start_time2 ))
|
||||
echo ""
|
||||
echo "Tests elapsed time: ${elapsed2} seconds"
|
||||
echo ""
|
||||
total_elapsed=$(( elapsed + elapsed2 ))
|
||||
echo "Total elapsed time: ${total_elapsed} seconds"
|
||||
echo ""
|
||||
|
||||
exit 0
|
||||
15
build.gradle
15
build.gradle
@@ -32,9 +32,9 @@ plugins {
|
||||
|
||||
// release
|
||||
id 'net.researchgate.release' version '3.1.0'
|
||||
id "com.gorylenko.gradle-git-properties" version "2.5.0"
|
||||
id "com.gorylenko.gradle-git-properties" version "2.5.2"
|
||||
id 'signing'
|
||||
id "com.vanniktech.maven.publish" version "0.33.0"
|
||||
id "com.vanniktech.maven.publish" version "0.34.0"
|
||||
|
||||
// OWASP dependency check
|
||||
id "org.owasp.dependencycheck" version "12.1.3" apply false
|
||||
@@ -71,6 +71,11 @@ dependencies {
|
||||
* Dependencies
|
||||
**********************************************************************************************************************/
|
||||
allprojects {
|
||||
|
||||
tasks.withType(GenerateModuleMetadata).configureEach {
|
||||
suppressedValidationErrors.add('enforced-platform')
|
||||
}
|
||||
|
||||
if (it.name != 'platform') {
|
||||
group = "io.kestra"
|
||||
|
||||
@@ -143,6 +148,7 @@ allprojects {
|
||||
implementation group: 'com.fasterxml.jackson.module', name: 'jackson-module-parameter-names'
|
||||
implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-guava'
|
||||
implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310'
|
||||
implementation group: 'com.fasterxml.uuid', name: 'java-uuid-generator'
|
||||
|
||||
// kestra
|
||||
implementation group: 'com.devskiller.friendly-id', name: 'friendly-id'
|
||||
@@ -614,11 +620,6 @@ subprojects {subProject ->
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tasks.withType(GenerateModuleMetadata).configureEach {
|
||||
// Suppression this validation error as we want to enforce the Kestra platform
|
||||
suppressedValidationErrors.add('enforced-platform')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
package io.kestra.cli;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.micronaut.http.HttpHeaders;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
@@ -16,16 +14,15 @@ import io.micronaut.http.netty.body.NettyJsonHandler;
|
||||
import io.micronaut.json.JsonMapper;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Named;
|
||||
import lombok.Builder;
|
||||
import lombok.Value;
|
||||
import lombok.extern.jackson.Jacksonized;
|
||||
import picocli.CommandLine;
|
||||
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import lombok.Builder;
|
||||
import lombok.Value;
|
||||
import lombok.extern.jackson.Jacksonized;
|
||||
import picocli.CommandLine;
|
||||
|
||||
public abstract class AbstractApiCommand extends AbstractCommand {
|
||||
@CommandLine.Option(names = {"--server"}, description = "Kestra server url", defaultValue = "http://localhost:8080")
|
||||
@@ -37,7 +34,7 @@ public abstract class AbstractApiCommand extends AbstractCommand {
|
||||
@CommandLine.Option(names = {"--user"}, paramLabel = "<user:password>", description = "Server user and password")
|
||||
protected String user;
|
||||
|
||||
@CommandLine.Option(names = {"--tenant"}, description = "Tenant identifier (EE only, when multi-tenancy is enabled)")
|
||||
@CommandLine.Option(names = {"--tenant"}, description = "Tenant identifier (EE only)")
|
||||
protected String tenantId;
|
||||
|
||||
@CommandLine.Option(names = {"--api-token"}, description = "API Token (EE only).")
|
||||
@@ -87,12 +84,12 @@ public abstract class AbstractApiCommand extends AbstractCommand {
|
||||
return request;
|
||||
}
|
||||
|
||||
protected String apiUri(String path) {
|
||||
protected String apiUri(String path, String tenantId) {
|
||||
if (path == null || !path.startsWith("/")) {
|
||||
throw new IllegalArgumentException("'path' must be non-null and start with '/'");
|
||||
}
|
||||
|
||||
return tenantId == null ? "/api/v1/" + MAIN_TENANT + path : "/api/v1/" + tenantId + path;
|
||||
return "/api/v1/" + tenantId + path;
|
||||
}
|
||||
|
||||
@Builder
|
||||
|
||||
@@ -40,7 +40,7 @@ import picocli.CommandLine.Option;
|
||||
)
|
||||
@Slf4j
|
||||
@Introspected
|
||||
abstract public class AbstractCommand implements Callable<Integer> {
|
||||
public abstract class AbstractCommand implements Callable<Integer> {
|
||||
@Inject
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
@@ -93,7 +93,7 @@ abstract public class AbstractCommand implements Callable<Integer> {
|
||||
this.startupHook.start(this);
|
||||
}
|
||||
|
||||
if (this.pluginsPath != null && loadExternalPlugins()) {
|
||||
if (pluginRegistryProvider != null && this.pluginsPath != null && loadExternalPlugins()) {
|
||||
pluginRegistry = pluginRegistryProvider.get();
|
||||
pluginRegistry.registerIfAbsent(pluginsPath);
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.kestra.cli;
|
||||
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.kestra.core.models.validations.ModelValidator;
|
||||
import io.kestra.core.models.validations.ValidateConstraintViolation;
|
||||
import io.kestra.core.serializers.YamlParser;
|
||||
@@ -9,6 +10,7 @@ import io.micronaut.http.MediaType;
|
||||
import io.micronaut.http.MutableHttpRequest;
|
||||
import io.micronaut.http.client.exceptions.HttpClientResponseException;
|
||||
import io.micronaut.http.client.netty.DefaultHttpClient;
|
||||
import jakarta.inject.Inject;
|
||||
import picocli.CommandLine;
|
||||
|
||||
import java.io.IOException;
|
||||
@@ -31,6 +33,9 @@ public abstract class AbstractValidateCommand extends AbstractApiCommand {
|
||||
@CommandLine.Parameters(index = "0", description = "the directory containing files to check")
|
||||
protected Path directory;
|
||||
|
||||
@Inject
|
||||
private TenantIdSelectorService tenantService;
|
||||
|
||||
/** {@inheritDoc} **/
|
||||
@Override
|
||||
protected boolean loadExternalPlugins() {
|
||||
@@ -112,7 +117,7 @@ public abstract class AbstractValidateCommand extends AbstractApiCommand {
|
||||
|
||||
try(DefaultHttpClient client = client()) {
|
||||
MutableHttpRequest<String> request = HttpRequest
|
||||
.POST(apiUri("/flows/validate"), body).contentType(MediaType.APPLICATION_YAML);
|
||||
.POST(apiUri("/flows/validate", tenantService.getTenantId(tenantId)), body).contentType(MediaType.APPLICATION_YAML);
|
||||
|
||||
List<ValidateConstraintViolation> validations = client.toBlocking().retrieve(
|
||||
this.requestOptions(request),
|
||||
|
||||
@@ -66,8 +66,14 @@ public class App implements Callable<Integer> {
|
||||
ApplicationContext applicationContext = App.applicationContext(cls, args);
|
||||
|
||||
// Call Picocli command
|
||||
int exitCode = new CommandLine(cls, new MicronautFactory(applicationContext)).execute(args);
|
||||
|
||||
int exitCode = 0;
|
||||
try {
|
||||
exitCode = new CommandLine(cls, new MicronautFactory(applicationContext)).execute(args);
|
||||
} catch (CommandLine.InitializationException e){
|
||||
System.err.println("Could not initialize picoli ComandLine, err: " + e.getMessage());
|
||||
e.printStackTrace();
|
||||
exitCode = 1;
|
||||
}
|
||||
applicationContext.close();
|
||||
|
||||
// exit code
|
||||
|
||||
@@ -2,11 +2,13 @@ package io.kestra.cli.commands.flows;
|
||||
|
||||
import io.kestra.cli.AbstractApiCommand;
|
||||
import io.kestra.cli.AbstractValidateCommand;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
import io.micronaut.http.MediaType;
|
||||
import io.micronaut.http.MutableHttpRequest;
|
||||
import io.micronaut.http.client.exceptions.HttpClientResponseException;
|
||||
import io.micronaut.http.client.netty.DefaultHttpClient;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@@ -23,6 +25,9 @@ public class FlowCreateCommand extends AbstractApiCommand {
|
||||
@CommandLine.Parameters(index = "0", description = "The file containing the flow")
|
||||
public Path flowFile;
|
||||
|
||||
@Inject
|
||||
private TenantIdSelectorService tenantService;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
@@ -34,7 +39,7 @@ public class FlowCreateCommand extends AbstractApiCommand {
|
||||
|
||||
try(DefaultHttpClient client = client()) {
|
||||
MutableHttpRequest<String> request = HttpRequest
|
||||
.POST(apiUri("/flows"), body).contentType(MediaType.APPLICATION_YAML);
|
||||
.POST(apiUri("/flows", tenantService.getTenantId(tenantId)), body).contentType(MediaType.APPLICATION_YAML);
|
||||
|
||||
client.toBlocking().retrieve(
|
||||
this.requestOptions(request),
|
||||
|
||||
@@ -2,10 +2,12 @@ package io.kestra.cli.commands.flows;
|
||||
|
||||
import io.kestra.cli.AbstractApiCommand;
|
||||
import io.kestra.cli.AbstractValidateCommand;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
import io.micronaut.http.MutableHttpRequest;
|
||||
import io.micronaut.http.client.exceptions.HttpClientResponseException;
|
||||
import io.micronaut.http.client.netty.DefaultHttpClient;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@@ -23,6 +25,9 @@ public class FlowDeleteCommand extends AbstractApiCommand {
|
||||
@CommandLine.Parameters(index = "1", description = "The ID of the flow")
|
||||
public String id;
|
||||
|
||||
@Inject
|
||||
private TenantIdSelectorService tenantService;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
@@ -30,7 +35,7 @@ public class FlowDeleteCommand extends AbstractApiCommand {
|
||||
|
||||
try(DefaultHttpClient client = client()) {
|
||||
MutableHttpRequest<String> request = HttpRequest
|
||||
.DELETE(apiUri("/flows/" + namespace + "/" + id ));
|
||||
.DELETE(apiUri("/flows/" + namespace + "/" + id, tenantService.getTenantId(tenantId)));
|
||||
|
||||
client.toBlocking().exchange(
|
||||
this.requestOptions(request)
|
||||
|
||||
@@ -2,7 +2,7 @@ package io.kestra.cli.commands.flows;
|
||||
|
||||
import io.kestra.cli.AbstractApiCommand;
|
||||
import io.kestra.cli.AbstractValidateCommand;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
import io.micronaut.http.HttpResponse;
|
||||
import io.micronaut.http.MediaType;
|
||||
@@ -25,9 +25,8 @@ import java.nio.file.Path;
|
||||
public class FlowExportCommand extends AbstractApiCommand {
|
||||
private static final String DEFAULT_FILE_NAME = "flows.zip";
|
||||
|
||||
// @FIXME: Keep it for bug in micronaut that need to have inject on top level command to inject on abstract classe
|
||||
@Inject
|
||||
private ApplicationContext applicationContext;
|
||||
private TenantIdSelectorService tenantService;
|
||||
|
||||
@CommandLine.Option(names = {"--namespace"}, description = "The namespace of flows to export")
|
||||
public String namespace;
|
||||
@@ -41,7 +40,7 @@ public class FlowExportCommand extends AbstractApiCommand {
|
||||
|
||||
try(DefaultHttpClient client = client()) {
|
||||
MutableHttpRequest<Object> request = HttpRequest
|
||||
.GET(apiUri("/flows/export/by-query") + (namespace != null ? "?namespace=" + namespace : ""))
|
||||
.GET(apiUri("/flows/export/by-query", tenantService.getTenantId(tenantId)) + (namespace != null ? "?namespace=" + namespace : ""))
|
||||
.accept(MediaType.APPLICATION_OCTET_STREAM);
|
||||
|
||||
HttpResponse<byte[]> response = client.toBlocking().exchange(this.requestOptions(request), byte[].class);
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
package io.kestra.cli.commands.flows;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import io.kestra.cli.AbstractApiCommand;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
|
||||
@@ -30,7 +31,7 @@ import java.util.concurrent.TimeoutException;
|
||||
description = "Test a flow"
|
||||
)
|
||||
@Slf4j
|
||||
public class FlowTestCommand extends AbstractCommand {
|
||||
public class FlowTestCommand extends AbstractApiCommand {
|
||||
@Inject
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
@@ -76,6 +77,7 @@ public class FlowTestCommand extends AbstractCommand {
|
||||
FlowRepositoryInterface flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
|
||||
FlowInputOutput flowInputOutput = applicationContext.getBean(FlowInputOutput.class);
|
||||
RunnerUtils runnerUtils = applicationContext.getBean(RunnerUtils.class);
|
||||
TenantIdSelectorService tenantService = applicationContext.getBean(TenantIdSelectorService.class);
|
||||
|
||||
Map<String, Object> inputs = new HashMap<>();
|
||||
|
||||
@@ -89,7 +91,7 @@ public class FlowTestCommand extends AbstractCommand {
|
||||
|
||||
try {
|
||||
runner.run();
|
||||
repositoryLoader.load(file.toFile());
|
||||
repositoryLoader.load(tenantService.getTenantId(tenantId), file.toFile());
|
||||
|
||||
List<Flow> all = flowRepository.findAllForAllTenants();
|
||||
if (all.size() != 1) {
|
||||
|
||||
@@ -2,11 +2,13 @@ package io.kestra.cli.commands.flows;
|
||||
|
||||
import io.kestra.cli.AbstractApiCommand;
|
||||
import io.kestra.cli.AbstractValidateCommand;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
import io.micronaut.http.MediaType;
|
||||
import io.micronaut.http.MutableHttpRequest;
|
||||
import io.micronaut.http.client.exceptions.HttpClientResponseException;
|
||||
import io.micronaut.http.client.netty.DefaultHttpClient;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@@ -29,6 +31,9 @@ public class FlowUpdateCommand extends AbstractApiCommand {
|
||||
@CommandLine.Parameters(index = "2", description = "The ID of the flow")
|
||||
public String id;
|
||||
|
||||
@Inject
|
||||
private TenantIdSelectorService tenantService;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
@@ -40,7 +45,7 @@ public class FlowUpdateCommand extends AbstractApiCommand {
|
||||
|
||||
try(DefaultHttpClient client = client()) {
|
||||
MutableHttpRequest<String> request = HttpRequest
|
||||
.PUT(apiUri("/flows/" + namespace + "/" + id ), body).contentType(MediaType.APPLICATION_YAML);
|
||||
.PUT(apiUri("/flows/" + namespace + "/" + id, tenantService.getTenantId(tenantId)), body).contentType(MediaType.APPLICATION_YAML);
|
||||
|
||||
client.toBlocking().retrieve(
|
||||
this.requestOptions(request),
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.cli.commands.flows;
|
||||
|
||||
import io.kestra.cli.AbstractApiCommand;
|
||||
import io.kestra.cli.AbstractValidateCommand;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.kestra.core.serializers.YamlParser;
|
||||
import io.micronaut.core.type.Argument;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
@@ -9,6 +10,7 @@ import io.micronaut.http.MediaType;
|
||||
import io.micronaut.http.MutableHttpRequest;
|
||||
import io.micronaut.http.client.exceptions.HttpClientResponseException;
|
||||
import io.micronaut.http.client.netty.DefaultHttpClient;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
@@ -36,6 +38,9 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
|
||||
@CommandLine.Option(names = {"--namespace"}, description = "The parent namespace of the flows, if not set, every namespace are allowed.")
|
||||
public String namespace;
|
||||
|
||||
@Inject
|
||||
private TenantIdSelectorService tenantIdSelectorService;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
@@ -66,7 +71,7 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
|
||||
namespaceQuery = "&namespace=" + namespace;
|
||||
}
|
||||
MutableHttpRequest<String> request = HttpRequest
|
||||
.POST(apiUri("/flows/bulk") + "?allowNamespaceChild=true&delete=" + delete + namespaceQuery, body).contentType(MediaType.APPLICATION_YAML);
|
||||
.POST(apiUri("/flows/bulk", tenantIdSelectorService.getTenantId(tenantId)) + "?allowNamespaceChild=true&delete=" + delete + namespaceQuery, body).contentType(MediaType.APPLICATION_YAML);
|
||||
|
||||
List<UpdateResult> updated = client.toBlocking().retrieve(
|
||||
this.requestOptions(request),
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.cli.commands.flows;
|
||||
|
||||
import io.kestra.cli.AbstractValidateCommand;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.validations.ModelValidator;
|
||||
import io.kestra.core.services.FlowService;
|
||||
@@ -22,6 +23,9 @@ public class FlowValidateCommand extends AbstractValidateCommand {
|
||||
@Inject
|
||||
private FlowService flowService;
|
||||
|
||||
@Inject
|
||||
private TenantIdSelectorService tenantService;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
return this.call(
|
||||
@@ -35,7 +39,7 @@ public class FlowValidateCommand extends AbstractValidateCommand {
|
||||
FlowWithSource flow = (FlowWithSource) object;
|
||||
List<String> warnings = new ArrayList<>();
|
||||
warnings.addAll(flowService.deprecationPaths(flow).stream().map(deprecation -> deprecation + " is deprecated").toList());
|
||||
warnings.addAll(flowService.warnings(flow, this.tenantId));
|
||||
warnings.addAll(flowService.warnings(flow, tenantService.getTenantId(tenantId)));
|
||||
return warnings;
|
||||
},
|
||||
(Object object) -> {
|
||||
|
||||
@@ -3,6 +3,7 @@ package io.kestra.cli.commands.flows.namespaces;
|
||||
import io.kestra.cli.AbstractValidateCommand;
|
||||
import io.kestra.cli.commands.AbstractServiceNamespaceUpdateCommand;
|
||||
import io.kestra.cli.commands.flows.IncludeHelperExpander;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.kestra.core.serializers.YamlParser;
|
||||
import io.micronaut.core.type.Argument;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
@@ -10,6 +11,7 @@ import io.micronaut.http.MediaType;
|
||||
import io.micronaut.http.MutableHttpRequest;
|
||||
import io.micronaut.http.client.exceptions.HttpClientResponseException;
|
||||
import io.micronaut.http.client.netty.DefaultHttpClient;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
@@ -30,6 +32,9 @@ public class FlowNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCo
|
||||
@CommandLine.Option(names = {"--override-namespaces"}, negatable = true, description = "Replace namespace of all flows by the one provided")
|
||||
public boolean override = false;
|
||||
|
||||
@Inject
|
||||
private TenantIdSelectorService tenantService;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
@@ -59,7 +64,7 @@ public class FlowNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCo
|
||||
}
|
||||
try(DefaultHttpClient client = client()) {
|
||||
MutableHttpRequest<String> request = HttpRequest
|
||||
.POST(apiUri("/flows/") + namespace + "?delete=" + delete, body).contentType(MediaType.APPLICATION_YAML);
|
||||
.POST(apiUri("/flows/", tenantService.getTenantId(tenantId)) + namespace + "?delete=" + delete, body).contentType(MediaType.APPLICATION_YAML);
|
||||
|
||||
List<UpdateResult> updated = client.toBlocking().retrieve(
|
||||
this.requestOptions(request),
|
||||
|
||||
@@ -2,12 +2,14 @@ package io.kestra.cli.commands.namespaces.files;
|
||||
|
||||
import io.kestra.cli.AbstractApiCommand;
|
||||
import io.kestra.cli.AbstractValidateCommand;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.kestra.core.utils.KestraIgnore;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
import io.micronaut.http.MediaType;
|
||||
import io.micronaut.http.client.exceptions.HttpClientResponseException;
|
||||
import io.micronaut.http.client.multipart.MultipartBody;
|
||||
import io.micronaut.http.client.netty.DefaultHttpClient;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@@ -34,6 +36,9 @@ public class NamespaceFilesUpdateCommand extends AbstractApiCommand {
|
||||
@CommandLine.Option(names = {"--delete"}, negatable = true, description = "Whether missing should be deleted")
|
||||
public boolean delete = false;
|
||||
|
||||
@Inject
|
||||
private TenantIdSelectorService tenantService;
|
||||
|
||||
private static final String KESTRA_IGNORE_FILE = ".kestraignore";
|
||||
|
||||
@Override
|
||||
@@ -44,7 +49,7 @@ public class NamespaceFilesUpdateCommand extends AbstractApiCommand {
|
||||
|
||||
try (var files = Files.walk(from); DefaultHttpClient client = client()) {
|
||||
if (delete) {
|
||||
client.toBlocking().exchange(this.requestOptions(HttpRequest.DELETE(apiUri("/namespaces/") + namespace + "/files?path=" + to, null)));
|
||||
client.toBlocking().exchange(this.requestOptions(HttpRequest.DELETE(apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/files?path=" + to, null)));
|
||||
}
|
||||
|
||||
KestraIgnore kestraIgnore = new KestraIgnore(from);
|
||||
@@ -62,7 +67,7 @@ public class NamespaceFilesUpdateCommand extends AbstractApiCommand {
|
||||
client.toBlocking().exchange(
|
||||
this.requestOptions(
|
||||
HttpRequest.POST(
|
||||
apiUri("/namespaces/") + namespace + "/files?path=" + destination,
|
||||
apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/files?path=" + destination,
|
||||
body
|
||||
).contentType(MediaType.MULTIPART_FORM_DATA)
|
||||
)
|
||||
|
||||
@@ -3,11 +3,13 @@ package io.kestra.cli.commands.namespaces.kv;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.kestra.cli.AbstractApiCommand;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
import io.micronaut.http.MediaType;
|
||||
import io.micronaut.http.MutableHttpRequest;
|
||||
import io.micronaut.http.client.netty.DefaultHttpClient;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
import picocli.CommandLine.Option;
|
||||
@@ -42,6 +44,9 @@ public class KvUpdateCommand extends AbstractApiCommand {
|
||||
@Option(names = {"-f", "--file-value"}, description = "The file from which to read the value to set. If this is provided, it will take precedence over any specified value.")
|
||||
public Path fileValue;
|
||||
|
||||
@Inject
|
||||
private TenantIdSelectorService tenantService;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
@@ -56,7 +61,7 @@ public class KvUpdateCommand extends AbstractApiCommand {
|
||||
|
||||
Duration ttl = expiration == null ? null : Duration.parse(expiration);
|
||||
MutableHttpRequest<String> request = HttpRequest
|
||||
.PUT(apiUri("/namespaces/") + namespace + "/kv/" + key, value)
|
||||
.PUT(apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/kv/" + key, value)
|
||||
.contentType(MediaType.APPLICATION_JSON_TYPE);
|
||||
|
||||
if (ttl != null) {
|
||||
|
||||
@@ -18,6 +18,8 @@ import java.nio.file.Paths;
|
||||
import java.util.Base64;
|
||||
import java.util.List;
|
||||
|
||||
import static io.kestra.core.models.Plugin.isDeprecated;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "doc",
|
||||
description = "Generate documentation for all plugins currently installed"
|
||||
@@ -38,6 +40,9 @@ public class PluginDocCommand extends AbstractCommand {
|
||||
@CommandLine.Option(names = {"--schema"}, description = "Also write JSON Schema for each task")
|
||||
private boolean schema = false;
|
||||
|
||||
@CommandLine.Option(names = {"--skip-deprecated"},description = "Skip deprecated plugins when generating documentations")
|
||||
private boolean skipDeprecated = false;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
@@ -45,6 +50,11 @@ public class PluginDocCommand extends AbstractCommand {
|
||||
|
||||
PluginRegistry registry = pluginRegistryProvider.get();
|
||||
List<RegisteredPlugin> plugins = core ? registry.plugins() : registry.externalPlugins();
|
||||
if (skipDeprecated) {
|
||||
plugins = plugins.stream()
|
||||
.filter(plugin -> !isDeprecated(plugin.getClass()))
|
||||
.toList();
|
||||
}
|
||||
boolean hasFailures = false;
|
||||
|
||||
for (RegisteredPlugin registeredPlugin : plugins) {
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.cli.commands.servers;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.cli.services.FileChangedEventListener;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.kestra.core.contexts.KestraContext;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
|
||||
@@ -44,6 +45,9 @@ public class StandAloneCommand extends AbstractServerCommand {
|
||||
@CommandLine.Option(names = {"-f", "--flow-path"}, description = "the flow path containing flow to inject at startup (when running with a memory flow repository)")
|
||||
private File flowPath;
|
||||
|
||||
@CommandLine.Option(names = "--tenant", description = "Tenant identifier, Required to load flows from path with the enterprise edition")
|
||||
private String tenantId;
|
||||
|
||||
@CommandLine.Option(names = {"--worker-thread"}, description = "the number of worker threads, defaults to four times the number of available processors. Set it to 0 to avoid starting a worker.")
|
||||
private int workerThread = defaultWorkerThread();
|
||||
|
||||
@@ -98,7 +102,8 @@ public class StandAloneCommand extends AbstractServerCommand {
|
||||
if (flowPath != null) {
|
||||
try {
|
||||
LocalFlowRepositoryLoader localFlowRepositoryLoader = applicationContext.getBean(LocalFlowRepositoryLoader.class);
|
||||
localFlowRepositoryLoader.load(null, this.flowPath);
|
||||
TenantIdSelectorService tenantIdSelectorService = applicationContext.getBean(TenantIdSelectorService.class);
|
||||
localFlowRepositoryLoader.load(tenantIdSelectorService.getTenantId(this.tenantId), this.flowPath);
|
||||
} catch (IOException e) {
|
||||
throw new CommandLine.ParameterException(this.spec.commandLine(), "Invalid flow path", e);
|
||||
}
|
||||
|
||||
@@ -2,8 +2,8 @@ package io.kestra.cli.commands.templates;
|
||||
|
||||
import io.kestra.cli.AbstractApiCommand;
|
||||
import io.kestra.cli.AbstractValidateCommand;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.kestra.core.models.templates.TemplateEnabled;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
import io.micronaut.http.HttpResponse;
|
||||
import io.micronaut.http.MediaType;
|
||||
@@ -27,9 +27,8 @@ import java.nio.file.Path;
|
||||
public class TemplateExportCommand extends AbstractApiCommand {
|
||||
private static final String DEFAULT_FILE_NAME = "templates.zip";
|
||||
|
||||
// @FIXME: Keep it for bug in micronaut that need to have inject on top level command to inject on abstract classe
|
||||
@Inject
|
||||
private ApplicationContext applicationContext;
|
||||
private TenantIdSelectorService tenantService;
|
||||
|
||||
@CommandLine.Option(names = {"--namespace"}, description = "The namespace of templates to export")
|
||||
public String namespace;
|
||||
@@ -43,7 +42,7 @@ public class TemplateExportCommand extends AbstractApiCommand {
|
||||
|
||||
try(DefaultHttpClient client = client()) {
|
||||
MutableHttpRequest<Object> request = HttpRequest
|
||||
.GET(apiUri("/templates/export/by-query") + (namespace != null ? "?namespace=" + namespace : ""))
|
||||
.GET(apiUri("/templates/export/by-query", tenantService.getTenantId(tenantId)) + (namespace != null ? "?namespace=" + namespace : ""))
|
||||
.accept(MediaType.APPLICATION_OCTET_STREAM);
|
||||
|
||||
HttpResponse<byte[]> response = client.toBlocking().exchange(this.requestOptions(request), byte[].class);
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.cli.commands.templates.namespaces;
|
||||
|
||||
import io.kestra.cli.AbstractValidateCommand;
|
||||
import io.kestra.cli.commands.AbstractServiceNamespaceUpdateCommand;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.kestra.core.models.templates.Template;
|
||||
import io.kestra.core.models.templates.TemplateEnabled;
|
||||
import io.kestra.core.serializers.YamlParser;
|
||||
@@ -10,6 +11,7 @@ import io.micronaut.http.HttpRequest;
|
||||
import io.micronaut.http.MutableHttpRequest;
|
||||
import io.micronaut.http.client.exceptions.HttpClientResponseException;
|
||||
import io.micronaut.http.client.netty.DefaultHttpClient;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@@ -27,6 +29,9 @@ import jakarta.validation.ConstraintViolationException;
|
||||
@TemplateEnabled
|
||||
public class TemplateNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCommand {
|
||||
|
||||
@Inject
|
||||
private TenantIdSelectorService tenantService;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
@@ -44,7 +49,7 @@ public class TemplateNamespaceUpdateCommand extends AbstractServiceNamespaceUpda
|
||||
|
||||
try (DefaultHttpClient client = client()) {
|
||||
MutableHttpRequest<List<Template>> request = HttpRequest
|
||||
.POST(apiUri("/templates/") + namespace + "?delete=" + delete, templates);
|
||||
.POST(apiUri("/templates/", tenantService.getTenantId(tenantId)) + namespace + "?delete=" + delete, templates);
|
||||
|
||||
List<UpdateResult> updated = client.toBlocking().retrieve(
|
||||
this.requestOptions(request),
|
||||
|
||||
@@ -162,7 +162,15 @@ public class FileChangedEventListener {
|
||||
}
|
||||
|
||||
} catch (NoSuchFileException e) {
|
||||
log.error("File not found: {}", entry, e);
|
||||
log.warn("File not found: {}, deleting it", entry, e);
|
||||
// the file might have been deleted while reading so if not found we try to delete the flow
|
||||
flows.stream()
|
||||
.filter(flow -> flow.getPath().equals(filePath.toString()))
|
||||
.findFirst()
|
||||
.ifPresent(flowWithPath -> {
|
||||
flowFilesManager.deleteFlow(flowWithPath.getTenantId(), flowWithPath.getNamespace(), flowWithPath.getId());
|
||||
this.flows.removeIf(fwp -> fwp.uidWithoutRevision().equals(flowWithPath.uidWithoutRevision()));
|
||||
});
|
||||
} catch (IOException e) {
|
||||
log.error("Error reading file: {}", entry, e);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
package io.kestra.cli.services;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
|
||||
import io.kestra.core.exceptions.KestraRuntimeException;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
@Singleton
|
||||
public class TenantIdSelectorService {
|
||||
|
||||
//For override purpose in Kestra EE
|
||||
public String getTenantId(String tenantId) {
|
||||
if (StringUtils.isNotBlank(tenantId) && !MAIN_TENANT.equals(tenantId)){
|
||||
throw new KestraRuntimeException("Tenant id can only be 'main'");
|
||||
}
|
||||
return MAIN_TENANT;
|
||||
}
|
||||
}
|
||||
@@ -18,6 +18,10 @@ micronaut:
|
||||
root:
|
||||
paths: classpath:root
|
||||
mapping: /**
|
||||
codec:
|
||||
json:
|
||||
additional-types:
|
||||
- application/scim+json
|
||||
server:
|
||||
max-request-size: 10GB
|
||||
multipart:
|
||||
@@ -78,8 +82,19 @@ micronaut:
|
||||
type: scheduled
|
||||
core-pool-size: 1
|
||||
|
||||
# Disable OpenTelemetry metrics by default, users that need it must enable it and configure the collector URL.
|
||||
metrics:
|
||||
binders:
|
||||
retry:
|
||||
enabled: true
|
||||
netty:
|
||||
queues:
|
||||
enabled: true
|
||||
bytebuf-allocators:
|
||||
enabled: true
|
||||
channels:
|
||||
enabled: true
|
||||
|
||||
# Disable OpenTelemetry metrics by default, users that need it must enable it and configure the collector URL.
|
||||
export:
|
||||
otlp:
|
||||
enabled: false
|
||||
@@ -92,6 +107,8 @@ jackson:
|
||||
serialization-inclusion: non_null
|
||||
deserialization:
|
||||
FAIL_ON_UNKNOWN_PROPERTIES: false
|
||||
mapper:
|
||||
ACCEPT_CASE_INSENSITIVE_ENUMS: true
|
||||
|
||||
endpoints:
|
||||
all:
|
||||
@@ -100,6 +117,10 @@ endpoints:
|
||||
sensitive: false
|
||||
health:
|
||||
details-visible: ANONYMOUS
|
||||
disk-space:
|
||||
enabled: false
|
||||
discovery-client:
|
||||
enabled: false
|
||||
loggers:
|
||||
write-sensitive: false
|
||||
env:
|
||||
@@ -133,12 +154,44 @@ kestra:
|
||||
tutorial-flows:
|
||||
# Automatically loads all tutorial flows at startup.
|
||||
enabled: true
|
||||
|
||||
retries:
|
||||
attempts: 5
|
||||
multiplier: 2.0
|
||||
delay: 1s
|
||||
maxDelay: ""
|
||||
|
||||
server:
|
||||
basic-auth:
|
||||
# These URLs will not be authenticated, by default we open some of the Micronaut default endpoints but not all for security reasons
|
||||
open-urls:
|
||||
- "/ping"
|
||||
- "/api/v1/executions/webhook/"
|
||||
|
||||
preview:
|
||||
initial-rows: 100
|
||||
max-rows: 5000
|
||||
|
||||
# The expected time for this server to complete all its tasks before initiating a graceful shutdown.
|
||||
terminationGracePeriod: 5m
|
||||
workerTaskRestartStrategy: AFTER_TERMINATION_GRACE_PERIOD
|
||||
# Configuration for Liveness and Heartbeat mechanism between servers.
|
||||
liveness:
|
||||
enabled: true
|
||||
# The expected time between liveness probe.
|
||||
interval: 10s
|
||||
# The timeout used to detect service failures.
|
||||
timeout: 1m
|
||||
# The time to wait before executing a liveness probe.
|
||||
initialDelay: 1m
|
||||
# The expected time between service heartbeats.
|
||||
heartbeatInterval: 3s
|
||||
service:
|
||||
purge:
|
||||
initial-delay: 1h
|
||||
fixed-delay: 1d
|
||||
retention: 30d
|
||||
|
||||
jdbc:
|
||||
queues:
|
||||
min-poll-interval: 25ms
|
||||
@@ -150,7 +203,7 @@ kestra:
|
||||
fixed-delay: 1h
|
||||
retention: 7d
|
||||
types:
|
||||
- type : io.kestra.core.models.executions.LogEntry
|
||||
- type: io.kestra.core.models.executions.LogEntry
|
||||
retention: 1h
|
||||
- type: io.kestra.core.models.executions.MetricEntry
|
||||
retention: 1h
|
||||
@@ -182,38 +235,12 @@ kestra:
|
||||
traces:
|
||||
root: DISABLED
|
||||
|
||||
server:
|
||||
basic-auth:
|
||||
enabled: false
|
||||
# These URLs will not be authenticated, by default we open some of the Micronaut default endpoints but not all for security reasons
|
||||
open-urls:
|
||||
- "/ping"
|
||||
- "/api/v1/executions/webhook/"
|
||||
preview:
|
||||
initial-rows: 100
|
||||
max-rows: 5000
|
||||
# The expected time for this server to complete all its tasks before initiating a graceful shutdown.
|
||||
terminationGracePeriod: 5m
|
||||
workerTaskRestartStrategy: AFTER_TERMINATION_GRACE_PERIOD
|
||||
# Configuration for Liveness and Heartbeat mechanism between servers.
|
||||
liveness:
|
||||
enabled: true
|
||||
# The expected time between liveness probe.
|
||||
interval: 10s
|
||||
# The timeout used to detect service failures.
|
||||
timeout: 1m
|
||||
# The time to wait before executing a liveness probe.
|
||||
initialDelay: 1m
|
||||
# The expected time between service heartbeats.
|
||||
heartbeatInterval: 3s
|
||||
service:
|
||||
purge:
|
||||
initial-delay: 1h
|
||||
fixed-delay: 1d
|
||||
retention: 30d
|
||||
ui-anonymous-usage-report:
|
||||
enabled: true
|
||||
|
||||
anonymous-usage-report:
|
||||
enabled: true
|
||||
uri: https://api.kestra.io/v1/reports/usages
|
||||
uri: https://api.kestra.io/v1/reports/server-events
|
||||
initial-delay: 5m
|
||||
fixed-delay: 1h
|
||||
|
||||
|
||||
@@ -108,6 +108,34 @@ class FlowCreateOrUpdateCommandTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void should_fail_with_incorrect_tenant() {
|
||||
URL directory = FlowCreateOrUpdateCommandTest.class.getClassLoader().getResource("flows");
|
||||
|
||||
ByteArrayOutputStream err = new ByteArrayOutputStream();
|
||||
System.setErr(new PrintStream(err));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
String[] args = {
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
"--tenant", "incorrect",
|
||||
directory.getPath(),
|
||||
|
||||
};
|
||||
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
|
||||
|
||||
assertThat(err.toString()).contains("Tenant id can only be 'main'");
|
||||
err.reset();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void helper() {
|
||||
URL directory = FlowCreateOrUpdateCommandTest.class.getClassLoader().getResource("helper");
|
||||
|
||||
@@ -0,0 +1,33 @@
|
||||
package io.kestra.cli.commands.servers;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import io.kestra.cli.App;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class TenantIdSelectorServiceTest {
|
||||
|
||||
@Test
|
||||
void should_fail_without_tenant_id() {
|
||||
ByteArrayOutputStream err = new ByteArrayOutputStream();
|
||||
System.setErr(new PrintStream(err));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
String[] start = {
|
||||
"server", "standalone",
|
||||
"-f", "unused",
|
||||
"--tenant", "wrong_tenant"
|
||||
};
|
||||
PicocliRunner.call(App.class, ctx, start);
|
||||
|
||||
assertThat(err.toString()).contains("Tenant id can only be 'main'");
|
||||
err.reset();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -17,7 +17,7 @@ kestra:
|
||||
central:
|
||||
url: https://repo.maven.apache.org/maven2/
|
||||
sonatype:
|
||||
url: https://s01.oss.sonatype.org/content/repositories/snapshots/
|
||||
url: https://central.sonatype.com/repository/maven-snapshots/
|
||||
server:
|
||||
liveness:
|
||||
enabled: false
|
||||
|
||||
14
codecov.yml
14
codecov.yml
@@ -56,21 +56,23 @@ component_management:
|
||||
name: Tests
|
||||
paths:
|
||||
- tests/**
|
||||
- component_id: ui
|
||||
name: Ui
|
||||
paths:
|
||||
- ui/**
|
||||
|
||||
- component_id: webserver
|
||||
name: Webserver
|
||||
paths:
|
||||
- webserver/**
|
||||
|
||||
ignore:
|
||||
- ui/**
|
||||
# we are not mature yet to have a ui code coverage
|
||||
|
||||
flag_management:
|
||||
default_rules:
|
||||
carryforward: true
|
||||
statuses:
|
||||
- type: project
|
||||
target: 73% # current value as of 2025-06-25
|
||||
threshold: 1%
|
||||
target: 70%
|
||||
threshold: 10%
|
||||
- type: patch
|
||||
target: 75%
|
||||
threshold: 10%
|
||||
|
||||
@@ -37,6 +37,7 @@ dependencies {
|
||||
implementation 'nl.basjes.gitignore:gitignore-reader'
|
||||
implementation group: 'dev.failsafe', name: 'failsafe'
|
||||
implementation 'com.github.ben-manes.caffeine:caffeine'
|
||||
implementation 'com.github.ksuid:ksuid:1.1.3'
|
||||
api 'org.apache.httpcomponents.client5:httpclient5'
|
||||
|
||||
// plugins
|
||||
@@ -62,6 +63,10 @@ dependencies {
|
||||
exclude group: 'com.fasterxml.jackson.core'
|
||||
}
|
||||
|
||||
// micrometer
|
||||
implementation "io.micronaut.micrometer:micronaut-micrometer-observation"
|
||||
implementation 'io.micrometer:micrometer-java21'
|
||||
|
||||
// test
|
||||
testAnnotationProcessor project(':processor')
|
||||
testImplementation project(':tests')
|
||||
|
||||
@@ -65,9 +65,4 @@ public @interface Retryable {
|
||||
* (defaults to none)
|
||||
*/
|
||||
Class<? extends RetryPredicate> predicate() default DefaultRetryPredicate.class;
|
||||
|
||||
/**
|
||||
* @return The multiplier to use to calculate the delay
|
||||
*/
|
||||
String jitter() default "${kestra.retries.jitter:0.0}";
|
||||
}
|
||||
26
core/src/main/java/io/kestra/core/debug/Breakpoint.java
Normal file
26
core/src/main/java/io/kestra/core/debug/Breakpoint.java
Normal file
@@ -0,0 +1,26 @@
|
||||
package io.kestra.core.debug;
|
||||
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Getter
|
||||
public class Breakpoint {
|
||||
@NotNull
|
||||
private String id;
|
||||
|
||||
@Nullable
|
||||
private String value;
|
||||
|
||||
public static Breakpoint of(String breakpoint) {
|
||||
if (breakpoint.indexOf('.') > 0) {
|
||||
return new Breakpoint(breakpoint.substring(0, breakpoint.indexOf('.')), breakpoint.substring(breakpoint.indexOf('.') + 1));
|
||||
} else {
|
||||
return new Breakpoint(breakpoint, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -24,6 +24,7 @@ public class JsonSchemaCache {
|
||||
private final JsonSchemaGenerator jsonSchemaGenerator;
|
||||
|
||||
private final ConcurrentMap<CacheKey, Map<String, Object>> schemaCache = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<SchemaType, Map<String, Object>> propertiesCache = new ConcurrentHashMap<>();
|
||||
|
||||
private final Map<SchemaType, Class<?>> classesBySchemaType = new HashMap<>();
|
||||
|
||||
@@ -44,7 +45,7 @@ public class JsonSchemaCache {
|
||||
|
||||
public Map<String, Object> getSchemaForType(final SchemaType type,
|
||||
final boolean arrayOf) {
|
||||
return schemaCache.computeIfAbsent(new CacheKey(type, arrayOf), (key) -> {
|
||||
return schemaCache.computeIfAbsent(new CacheKey(type, arrayOf), key -> {
|
||||
|
||||
Class<?> cls = Optional.ofNullable(classesBySchemaType.get(type))
|
||||
.orElseThrow(() -> new IllegalArgumentException("Cannot found schema for type '" + type + "'"));
|
||||
@@ -52,6 +53,16 @@ public class JsonSchemaCache {
|
||||
});
|
||||
}
|
||||
|
||||
public Map<String, Object> getPropertiesForType(final SchemaType type) {
|
||||
return propertiesCache.computeIfAbsent(type, key -> {
|
||||
|
||||
Class<?> cls = Optional.ofNullable(classesBySchemaType.get(type))
|
||||
.orElseThrow(() -> new IllegalArgumentException("Cannot found properties for type '" + type + "'"));
|
||||
return jsonSchemaGenerator.properties(null, cls);
|
||||
});
|
||||
}
|
||||
|
||||
// must be public as it's used in EE
|
||||
public void registerClassForType(final SchemaType type, final Class<?> clazz) {
|
||||
classesBySchemaType.put(type, clazz);
|
||||
}
|
||||
|
||||
@@ -122,12 +122,13 @@ public class JsonSchemaGenerator {
|
||||
if (jsonNode instanceof ObjectNode clazzSchema && clazzSchema.get("required") instanceof ArrayNode requiredPropsNode && clazzSchema.get("properties") instanceof ObjectNode properties) {
|
||||
List<String> requiredFieldValues = StreamSupport.stream(requiredPropsNode.spliterator(), false)
|
||||
.map(JsonNode::asText)
|
||||
.toList();
|
||||
.collect(Collectors.toList());
|
||||
|
||||
properties.fields().forEachRemaining(e -> {
|
||||
int indexInRequiredArray = requiredFieldValues.indexOf(e.getKey());
|
||||
if (indexInRequiredArray != -1 && e.getValue() instanceof ObjectNode valueNode && valueNode.has("default")) {
|
||||
requiredPropsNode.remove(indexInRequiredArray);
|
||||
requiredFieldValues.remove(indexInRequiredArray);
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@@ -23,29 +23,25 @@ public class Plugin {
|
||||
private String group;
|
||||
private String version;
|
||||
private Map<String, String> manifest;
|
||||
private List<String> tasks;
|
||||
private List<String> triggers;
|
||||
private List<String> conditions;
|
||||
private List<String> controllers;
|
||||
private List<String> storages;
|
||||
private List<String> secrets;
|
||||
private List<String> taskRunners;
|
||||
private List<String> guides;
|
||||
private List<String> aliases;
|
||||
private List<String> apps;
|
||||
private List<String> appBlocks;
|
||||
private List<String> charts;
|
||||
private List<String> dataFilters;
|
||||
private List<String> logExporters;
|
||||
private List<String> additionalPlugins;
|
||||
private List<PluginElementMetadata> tasks;
|
||||
private List<PluginElementMetadata> triggers;
|
||||
private List<PluginElementMetadata> conditions;
|
||||
private List<PluginElementMetadata> controllers;
|
||||
private List<PluginElementMetadata> storages;
|
||||
private List<PluginElementMetadata> secrets;
|
||||
private List<PluginElementMetadata> taskRunners;
|
||||
private List<PluginElementMetadata> apps;
|
||||
private List<PluginElementMetadata> appBlocks;
|
||||
private List<PluginElementMetadata> charts;
|
||||
private List<PluginElementMetadata> dataFilters;
|
||||
private List<PluginElementMetadata> logExporters;
|
||||
private List<PluginElementMetadata> additionalPlugins;
|
||||
private List<PluginSubGroup.PluginCategory> categories;
|
||||
private String subGroup;
|
||||
|
||||
public static Plugin of(RegisteredPlugin registeredPlugin, @Nullable String subgroup) {
|
||||
return Plugin.of(registeredPlugin, subgroup, true);
|
||||
}
|
||||
|
||||
public static Plugin of(RegisteredPlugin registeredPlugin, @Nullable String subgroup, boolean includeDeprecated) {
|
||||
Plugin plugin = new Plugin();
|
||||
plugin.name = registeredPlugin.name();
|
||||
PluginSubGroup subGroupInfos = null;
|
||||
@@ -90,18 +86,18 @@ public class Plugin {
|
||||
plugin.subGroup = subgroup;
|
||||
|
||||
Predicate<Class<?>> packagePredicate = c -> subgroup == null || c.getPackageName().equals(subgroup);
|
||||
plugin.tasks = filterAndGetClassName(registeredPlugin.getTasks(), includeDeprecated, packagePredicate);
|
||||
plugin.triggers = filterAndGetClassName(registeredPlugin.getTriggers(), includeDeprecated, packagePredicate);
|
||||
plugin.conditions = filterAndGetClassName(registeredPlugin.getConditions(), includeDeprecated, packagePredicate);
|
||||
plugin.storages = filterAndGetClassName(registeredPlugin.getStorages(), includeDeprecated, packagePredicate);
|
||||
plugin.secrets = filterAndGetClassName(registeredPlugin.getSecrets(), includeDeprecated, packagePredicate);
|
||||
plugin.taskRunners = filterAndGetClassName(registeredPlugin.getTaskRunners(), includeDeprecated, packagePredicate);
|
||||
plugin.apps = filterAndGetClassName(registeredPlugin.getApps(), includeDeprecated, packagePredicate);
|
||||
plugin.appBlocks = filterAndGetClassName(registeredPlugin.getAppBlocks(), includeDeprecated, packagePredicate);
|
||||
plugin.charts = filterAndGetClassName(registeredPlugin.getCharts(), includeDeprecated, packagePredicate);
|
||||
plugin.dataFilters = filterAndGetClassName(registeredPlugin.getDataFilters(), includeDeprecated, packagePredicate);
|
||||
plugin.logExporters = filterAndGetClassName(registeredPlugin.getLogExporters(), includeDeprecated, packagePredicate);
|
||||
plugin.additionalPlugins = filterAndGetClassName(registeredPlugin.getAdditionalPlugins(), includeDeprecated, packagePredicate);
|
||||
plugin.tasks = filterAndGetTypeWithMetadata(registeredPlugin.getTasks(), packagePredicate);
|
||||
plugin.triggers = filterAndGetTypeWithMetadata(registeredPlugin.getTriggers(), packagePredicate);
|
||||
plugin.conditions = filterAndGetTypeWithMetadata(registeredPlugin.getConditions(), packagePredicate);
|
||||
plugin.storages = filterAndGetTypeWithMetadata(registeredPlugin.getStorages(), packagePredicate);
|
||||
plugin.secrets = filterAndGetTypeWithMetadata(registeredPlugin.getSecrets(), packagePredicate);
|
||||
plugin.taskRunners = filterAndGetTypeWithMetadata(registeredPlugin.getTaskRunners(), packagePredicate);
|
||||
plugin.apps = filterAndGetTypeWithMetadata(registeredPlugin.getApps(), packagePredicate);
|
||||
plugin.appBlocks = filterAndGetTypeWithMetadata(registeredPlugin.getAppBlocks(), packagePredicate);
|
||||
plugin.charts = filterAndGetTypeWithMetadata(registeredPlugin.getCharts(), packagePredicate);
|
||||
plugin.dataFilters = filterAndGetTypeWithMetadata(registeredPlugin.getDataFilters(), packagePredicate);
|
||||
plugin.logExporters = filterAndGetTypeWithMetadata(registeredPlugin.getLogExporters(), packagePredicate);
|
||||
plugin.additionalPlugins = filterAndGetTypeWithMetadata(registeredPlugin.getAdditionalPlugins(), packagePredicate);
|
||||
|
||||
return plugin;
|
||||
}
|
||||
@@ -111,17 +107,18 @@ public class Plugin {
|
||||
* Those classes are only filtered from the documentation to ensure backward compatibility.
|
||||
*
|
||||
* @param list The list of classes?
|
||||
* @param includeDeprecated whether to include deprecated plugins or not
|
||||
* @return a filtered streams.
|
||||
*/
|
||||
private static List<String> filterAndGetClassName(final List<? extends Class<?>> list, boolean includeDeprecated, Predicate<Class<?>> clazzFilter) {
|
||||
private static List<PluginElementMetadata> filterAndGetTypeWithMetadata(final List<? extends Class<?>> list, Predicate<Class<?>> clazzFilter) {
|
||||
return list
|
||||
.stream()
|
||||
.filter(not(io.kestra.core.models.Plugin::isInternal))
|
||||
.filter(p -> includeDeprecated || !io.kestra.core.models.Plugin.isDeprecated(p))
|
||||
.filter(clazzFilter)
|
||||
.map(Class::getName)
|
||||
.filter(c -> !c.startsWith("org.kestra."))
|
||||
.filter(c -> !c.getName().startsWith("org.kestra."))
|
||||
.map(c -> new PluginElementMetadata(c.getName(), io.kestra.core.models.Plugin.isDeprecated(c) ? true : null))
|
||||
.toList();
|
||||
}
|
||||
|
||||
public record PluginElementMetadata(String cls, Boolean deprecated) {
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,35 @@
|
||||
package io.kestra.core.exceptions;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.util.List;
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* General exception that can be throws when a resource fail validation.
|
||||
*/
|
||||
public class ValidationErrorException extends KestraRuntimeException {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
private static final String VALIDATION_ERROR_MESSAGE = "Resource fails validation";
|
||||
|
||||
@Getter
|
||||
private transient final List<String> invalids;
|
||||
|
||||
/**
|
||||
* Creates a new {@link ValidationErrorException} instance.
|
||||
*
|
||||
* @param invalids the invalid filters.
|
||||
*/
|
||||
public ValidationErrorException(final List<String> invalids) {
|
||||
super(VALIDATION_ERROR_MESSAGE);
|
||||
this.invalids = invalids;
|
||||
}
|
||||
|
||||
|
||||
public String formatedInvalidObjects(){
|
||||
if (invalids == null || invalids.isEmpty()){
|
||||
return VALIDATION_ERROR_MESSAGE;
|
||||
}
|
||||
return String.join(", ", invalids);
|
||||
}
|
||||
}
|
||||
@@ -6,8 +6,14 @@ import io.kestra.core.http.HttpRequest;
|
||||
import io.kestra.core.http.HttpResponse;
|
||||
import io.kestra.core.http.client.apache.*;
|
||||
import io.kestra.core.http.client.configurations.HttpConfiguration;
|
||||
import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.micrometer.common.KeyValues;
|
||||
import io.micrometer.core.instrument.binder.httpcomponents.hc5.ApacheHttpClientContext;
|
||||
import io.micrometer.core.instrument.binder.httpcomponents.hc5.DefaultApacheHttpClientObservationConvention;
|
||||
import io.micrometer.core.instrument.binder.httpcomponents.hc5.ObservationExecChainHandler;
|
||||
import io.micrometer.observation.ObservationRegistry;
|
||||
import io.micronaut.http.MediaType;
|
||||
import jakarta.annotation.Nullable;
|
||||
import lombok.Builder;
|
||||
@@ -16,6 +22,7 @@ import org.apache.commons.lang3.ArrayUtils;
|
||||
import org.apache.hc.client5.http.ContextBuilder;
|
||||
import org.apache.hc.client5.http.auth.*;
|
||||
import org.apache.hc.client5.http.config.ConnectionConfig;
|
||||
import org.apache.hc.client5.http.impl.ChainElement;
|
||||
import org.apache.hc.client5.http.impl.DefaultAuthenticationStrategy;
|
||||
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
|
||||
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
|
||||
@@ -50,11 +57,16 @@ public class HttpClient implements Closeable {
|
||||
private transient CloseableHttpClient client;
|
||||
private final RunContext runContext;
|
||||
private final HttpConfiguration configuration;
|
||||
private ObservationRegistry observationRegistry;
|
||||
|
||||
@Builder
|
||||
public HttpClient(RunContext runContext, @Nullable HttpConfiguration configuration) throws IllegalVariableEvaluationException {
|
||||
this.runContext = runContext;
|
||||
this.configuration = configuration == null ? HttpConfiguration.builder().build() : configuration;
|
||||
if (runContext instanceof DefaultRunContext defaultRunContext) {
|
||||
this.observationRegistry = defaultRunContext.getApplicationContext().findBean(ObservationRegistry.class).orElse(null);
|
||||
}
|
||||
|
||||
this.client = this.createClient();
|
||||
}
|
||||
|
||||
@@ -67,6 +79,13 @@ public class HttpClient implements Closeable {
|
||||
.disableDefaultUserAgent()
|
||||
.setUserAgent("Kestra");
|
||||
|
||||
if (observationRegistry != null) {
|
||||
// micrometer, must be placed before the retry strategy (see https://docs.micrometer.io/micrometer/reference/reference/httpcomponents.html#_retry_strategy_considerations)
|
||||
builder.addExecInterceptorAfter(ChainElement.RETRY.name(), "micrometer",
|
||||
new ObservationExecChainHandler(observationRegistry, new CustomApacheHttpClientObservationConvention())
|
||||
);
|
||||
}
|
||||
|
||||
// logger
|
||||
if (this.configuration.getLogs() != null && this.configuration.getLogs().length > 0) {
|
||||
if (ArrayUtils.contains(this.configuration.getLogs(), HttpConfiguration.LoggingType.REQUEST_HEADERS) ||
|
||||
@@ -297,4 +316,14 @@ public class HttpClient implements Closeable {
|
||||
this.client.close();
|
||||
}
|
||||
}
|
||||
|
||||
public static class CustomApacheHttpClientObservationConvention extends DefaultApacheHttpClientObservationConvention {
|
||||
@Override
|
||||
public KeyValues getLowCardinalityKeyValues(ApacheHttpClientContext context) {
|
||||
return KeyValues.concat(
|
||||
super.getLowCardinalityKeyValues(context),
|
||||
KeyValues.of("type", "core-client")
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,34 @@
|
||||
package io.kestra.core.metrics;
|
||||
|
||||
import io.micrometer.core.instrument.binder.jvm.JvmThreadDeadlockMetrics;
|
||||
import io.micrometer.java21.instrument.binder.jdk.VirtualThreadMetrics;
|
||||
import io.micronaut.configuration.metrics.annotation.RequiresMetrics;
|
||||
import io.micronaut.context.annotation.Bean;
|
||||
import io.micronaut.context.annotation.Factory;
|
||||
import io.micronaut.context.annotation.Primary;
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import static io.micronaut.configuration.metrics.micrometer.MeterRegistryFactory.MICRONAUT_METRICS_BINDERS;
|
||||
import static io.micronaut.core.util.StringUtils.FALSE;
|
||||
|
||||
@Factory
|
||||
@RequiresMetrics
|
||||
|
||||
public class MeterRegistryBinderFactory {
|
||||
@Bean
|
||||
@Primary
|
||||
@Singleton
|
||||
@Requires(property = MICRONAUT_METRICS_BINDERS + ".jvm.enabled", notEquals = FALSE)
|
||||
public VirtualThreadMetrics virtualThreadMetrics() {
|
||||
return new VirtualThreadMetrics();
|
||||
}
|
||||
|
||||
@Bean
|
||||
@Primary
|
||||
@Singleton
|
||||
@Requires(property = MICRONAUT_METRICS_BINDERS + ".jvm.enabled", notEquals = FALSE)
|
||||
public JvmThreadDeadlockMetrics threadDeadlockMetricsMetrics() {
|
||||
return new JvmThreadDeadlockMetrics();
|
||||
}
|
||||
}
|
||||
@@ -1,11 +1,10 @@
|
||||
package io.kestra.core.models;
|
||||
|
||||
import io.kestra.core.utils.MapUtils;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public record Label(@NotNull String key, @NotNull String value) {
|
||||
@@ -29,11 +28,36 @@ public record Label(@NotNull String key, @NotNull String value) {
|
||||
* @return the nested {@link Map}.
|
||||
*/
|
||||
public static Map<String, Object> toNestedMap(List<Label> labels) {
|
||||
Map<String, Object> asMap = labels.stream()
|
||||
return MapUtils.flattenToNestedMap(toMap(labels));
|
||||
}
|
||||
|
||||
/**
|
||||
* Static helper method for converting a list of labels to a flat map.
|
||||
* Key order is kept.
|
||||
*
|
||||
* @param labels The list of {@link Label} to be converted.
|
||||
* @return the flat {@link Map}.
|
||||
*/
|
||||
public static Map<String, String> toMap(@Nullable List<Label> labels) {
|
||||
if (labels == null || labels.isEmpty()) return Collections.emptyMap();
|
||||
return labels.stream()
|
||||
.filter(label -> label.value() != null && label.key() != null)
|
||||
// using an accumulator in case labels with the same key exists: the first is kept
|
||||
.collect(Collectors.toMap(Label::key, Label::value, (first, second) -> first));
|
||||
return MapUtils.flattenToNestedMap(asMap);
|
||||
// using an accumulator in case labels with the same key exists: the second is kept
|
||||
.collect(Collectors.toMap(Label::key, Label::value, (first, second) -> second, LinkedHashMap::new));
|
||||
}
|
||||
|
||||
/**
|
||||
* Static helper method for deduplicating a list of labels by their key.
|
||||
* Value of the last key occurrence is kept.
|
||||
*
|
||||
* @param labels The list of {@link Label} to be deduplicated.
|
||||
* @return the deduplicated {@link List}.
|
||||
*/
|
||||
public static List<Label> deduplicate(@Nullable List<Label> labels) {
|
||||
if (labels == null || labels.isEmpty()) return Collections.emptyList();
|
||||
return toMap(labels).entrySet().stream()
|
||||
.map(entry -> new Label(entry.getKey(), entry.getValue()))
|
||||
.collect(Collectors.toCollection(ArrayList::new));
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -6,9 +6,9 @@ import com.fasterxml.jackson.annotation.JsonValue;
|
||||
import io.kestra.core.exceptions.InvalidQueryFiltersException;
|
||||
import io.kestra.core.models.dashboards.filters.*;
|
||||
import io.kestra.core.utils.Enums;
|
||||
import java.util.ArrayList;
|
||||
import lombok.Builder;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -49,42 +49,27 @@ public record QueryFilter(
|
||||
PREFIX
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private List<Object> asValues(Object value) {
|
||||
return value instanceof String valueStr ? Arrays.asList(valueStr.split(",")) : (List<Object>) value;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T extends Enum<T>> AbstractFilter<T> toDashboardFilterBuilder(T field, Object value) {
|
||||
switch (this.operation) {
|
||||
case EQUALS:
|
||||
return EqualTo.<T>builder().field(field).value(value).build();
|
||||
case NOT_EQUALS:
|
||||
return NotEqualTo.<T>builder().field(field).value(value).build();
|
||||
case GREATER_THAN:
|
||||
return GreaterThan.<T>builder().field(field).value(value).build();
|
||||
case LESS_THAN:
|
||||
return LessThan.<T>builder().field(field).value(value).build();
|
||||
case GREATER_THAN_OR_EQUAL_TO:
|
||||
return GreaterThanOrEqualTo.<T>builder().field(field).value(value).build();
|
||||
case LESS_THAN_OR_EQUAL_TO:
|
||||
return LessThanOrEqualTo.<T>builder().field(field).value(value).build();
|
||||
case IN:
|
||||
return In.<T>builder().field(field).values(asValues(value)).build();
|
||||
case NOT_IN:
|
||||
return NotIn.<T>builder().field(field).values(asValues(value)).build();
|
||||
case STARTS_WITH:
|
||||
return StartsWith.<T>builder().field(field).value(value.toString()).build();
|
||||
case ENDS_WITH:
|
||||
return EndsWith.<T>builder().field(field).value(value.toString()).build();
|
||||
case CONTAINS:
|
||||
return Contains.<T>builder().field(field).value(value.toString()).build();
|
||||
case REGEX:
|
||||
return Regex.<T>builder().field(field).value(value.toString()).build();
|
||||
case PREFIX:
|
||||
return Regex.<T>builder().field(field).value("^" + value.toString().replace(".", "\\.") + "(?:\\..+)?$").build();
|
||||
default:
|
||||
throw new IllegalArgumentException("Unsupported operation: " + this.operation);
|
||||
}
|
||||
return switch (this.operation) {
|
||||
case EQUALS -> EqualTo.<T>builder().field(field).value(value).build();
|
||||
case NOT_EQUALS -> NotEqualTo.<T>builder().field(field).value(value).build();
|
||||
case GREATER_THAN -> GreaterThan.<T>builder().field(field).value(value).build();
|
||||
case LESS_THAN -> LessThan.<T>builder().field(field).value(value).build();
|
||||
case GREATER_THAN_OR_EQUAL_TO -> GreaterThanOrEqualTo.<T>builder().field(field).value(value).build();
|
||||
case LESS_THAN_OR_EQUAL_TO -> LessThanOrEqualTo.<T>builder().field(field).value(value).build();
|
||||
case IN -> In.<T>builder().field(field).values(asValues(value)).build();
|
||||
case NOT_IN -> NotIn.<T>builder().field(field).values(asValues(value)).build();
|
||||
case STARTS_WITH -> StartsWith.<T>builder().field(field).value(value.toString()).build();
|
||||
case ENDS_WITH -> EndsWith.<T>builder().field(field).value(value.toString()).build();
|
||||
case CONTAINS -> Contains.<T>builder().field(field).value(value.toString()).build();
|
||||
case REGEX -> Regex.<T>builder().field(field).value(value.toString()).build();
|
||||
case PREFIX -> Regex.<T>builder().field(field).value("^" + value.toString().replace(".", "\\.") + "(?:\\..+)?$").build();
|
||||
};
|
||||
}
|
||||
|
||||
public enum Field {
|
||||
|
||||
@@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
|
||||
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Streams;
|
||||
import io.kestra.core.debug.Breakpoint;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.models.DeletedInterface;
|
||||
import io.kestra.core.models.Label;
|
||||
@@ -24,6 +25,7 @@ import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
|
||||
import io.kestra.core.services.LabelService;
|
||||
import io.kestra.core.test.flow.TaskFixture;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.MapUtils;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
import jakarta.annotation.Nullable;
|
||||
@@ -120,6 +122,9 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
@Nullable
|
||||
ExecutionKind kind;
|
||||
|
||||
@Nullable
|
||||
List<Breakpoint> breakpoints;
|
||||
|
||||
/**
|
||||
* Factory method for constructing a new {@link Execution} object for the given {@link Flow}.
|
||||
*
|
||||
@@ -132,7 +137,7 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
}
|
||||
|
||||
public List<Label> getLabels() {
|
||||
return Optional.ofNullable(this.labels).orElse(new ArrayList<>());
|
||||
return ListUtils.emptyOnNull(this.labels);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -177,8 +182,22 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Customization of Lombok-generated builder.
|
||||
*/
|
||||
public static class ExecutionBuilder {
|
||||
|
||||
/**
|
||||
* Enforce unique values of {@link Label} when using the builder.
|
||||
*
|
||||
* @param labels The labels.
|
||||
* @return Deduplicated labels.
|
||||
*/
|
||||
public ExecutionBuilder labels(List<Label> labels) {
|
||||
this.labels = Label.deduplicate(labels);
|
||||
return this;
|
||||
}
|
||||
|
||||
void prebuild() {
|
||||
this.originalId = this.id;
|
||||
this.metadata = ExecutionMetadata.builder()
|
||||
@@ -221,12 +240,12 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
this.scheduleDate,
|
||||
this.traceParent,
|
||||
this.fixtures,
|
||||
this.kind
|
||||
this.kind,
|
||||
this.breakpoints
|
||||
);
|
||||
}
|
||||
|
||||
public Execution withLabels(List<Label> labels) {
|
||||
|
||||
return new Execution(
|
||||
this.tenantId,
|
||||
this.id,
|
||||
@@ -236,7 +255,7 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
this.taskRunList,
|
||||
this.inputs,
|
||||
this.outputs,
|
||||
labels,
|
||||
Label.deduplicate(labels),
|
||||
this.variables,
|
||||
this.state,
|
||||
this.parentId,
|
||||
@@ -247,7 +266,8 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
this.scheduleDate,
|
||||
this.traceParent,
|
||||
this.fixtures,
|
||||
this.kind
|
||||
this.kind,
|
||||
this.breakpoints
|
||||
);
|
||||
}
|
||||
|
||||
@@ -286,7 +306,34 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
this.scheduleDate,
|
||||
this.traceParent,
|
||||
this.fixtures,
|
||||
this.kind
|
||||
this.kind,
|
||||
this.breakpoints
|
||||
);
|
||||
}
|
||||
|
||||
public Execution withBreakpoints(List<Breakpoint> newBreakpoints) {
|
||||
return new Execution(
|
||||
this.tenantId,
|
||||
this.id,
|
||||
this.namespace,
|
||||
this.flowId,
|
||||
this.flowRevision,
|
||||
this.taskRunList,
|
||||
this.inputs,
|
||||
this.outputs,
|
||||
this.labels,
|
||||
this.variables,
|
||||
this.state,
|
||||
this.parentId,
|
||||
this.originalId,
|
||||
this.trigger,
|
||||
this.deleted,
|
||||
this.metadata,
|
||||
this.scheduleDate,
|
||||
this.traceParent,
|
||||
this.fixtures,
|
||||
this.kind,
|
||||
newBreakpoints
|
||||
);
|
||||
}
|
||||
|
||||
@@ -312,7 +359,8 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
this.scheduleDate,
|
||||
this.traceParent,
|
||||
this.fixtures,
|
||||
this.kind
|
||||
this.kind,
|
||||
this.breakpoints
|
||||
);
|
||||
}
|
||||
|
||||
@@ -366,7 +414,7 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
*
|
||||
* @param resolvedTasks normal tasks
|
||||
* @param resolvedErrors errors tasks
|
||||
* @param resolvedErrors finally tasks
|
||||
* @param resolvedFinally finally tasks
|
||||
* @return the flow we need to follow
|
||||
*/
|
||||
public List<ResolvedTask> findTaskDependingFlowState(
|
||||
@@ -992,6 +1040,16 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find all children of this {@link TaskRun}.
|
||||
*/
|
||||
public List<TaskRun> findChildren(TaskRun parentTaskRun) {
|
||||
return taskRunList.stream()
|
||||
.filter(taskRun -> parentTaskRun.getId().equals(taskRun.getParentTaskRunId()))
|
||||
.toList();
|
||||
}
|
||||
|
||||
|
||||
public List<String> findParentsValues(TaskRun taskRun, boolean withCurrent) {
|
||||
return (withCurrent ?
|
||||
Stream.concat(findParents(taskRun).stream(), Stream.of(taskRun)) :
|
||||
|
||||
@@ -3,8 +3,9 @@ package io.kestra.core.models.executions;
|
||||
/**
|
||||
* Describe the kind of execution:
|
||||
* - TEST: created by a test
|
||||
* - PLAYGROUND: created by a playground
|
||||
* - NORMAL: anything else, for backward compatibility NORMAL is not persisted but null is used instead
|
||||
*/
|
||||
public enum ExecutionKind {
|
||||
NORMAL, TEST
|
||||
NORMAL, TEST, PLAYGROUND
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import io.kestra.core.models.tasks.ResolvedTask;
|
||||
import io.kestra.core.models.tasks.retrys.AbstractRetry;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
import lombok.*;
|
||||
@@ -62,6 +63,11 @@ public class TaskRun implements TenantInterface {
|
||||
@With
|
||||
Boolean dynamic;
|
||||
|
||||
// Set it to true to force execution even if the execution is killed
|
||||
@Nullable
|
||||
@With
|
||||
Boolean forceExecution;
|
||||
|
||||
@Deprecated
|
||||
public void setItems(String items) {
|
||||
// no-op for backward compatibility
|
||||
@@ -81,7 +87,8 @@ public class TaskRun implements TenantInterface {
|
||||
this.outputs,
|
||||
this.state.withState(state),
|
||||
this.iteration,
|
||||
this.dynamic
|
||||
this.dynamic,
|
||||
this.forceExecution
|
||||
);
|
||||
}
|
||||
|
||||
@@ -99,7 +106,8 @@ public class TaskRun implements TenantInterface {
|
||||
this.outputs,
|
||||
newState,
|
||||
this.iteration,
|
||||
this.dynamic
|
||||
this.dynamic,
|
||||
this.forceExecution
|
||||
);
|
||||
}
|
||||
|
||||
@@ -121,7 +129,8 @@ public class TaskRun implements TenantInterface {
|
||||
this.outputs,
|
||||
this.state.withState(State.Type.FAILED),
|
||||
this.iteration,
|
||||
this.dynamic
|
||||
this.dynamic,
|
||||
this.forceExecution
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -19,7 +19,6 @@ import io.kestra.core.models.tasks.retrys.AbstractRetry;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.validations.ManualConstraintViolation;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.services.FlowService;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.validations.FlowValidation;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
@@ -30,8 +29,6 @@ import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
import lombok.*;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
@@ -189,19 +186,32 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
.toList();
|
||||
}
|
||||
|
||||
public List<Task> allErrorsWithChilds() {
|
||||
public List<Task> allErrorsWithChildren() {
|
||||
var allErrors = allTasksWithChilds().stream()
|
||||
.filter(task -> task.isFlowable() && ((FlowableTask<?>) task).getErrors() != null)
|
||||
.flatMap(task -> ((FlowableTask<?>) task).getErrors().stream())
|
||||
.collect(Collectors.toCollection(ArrayList::new));
|
||||
|
||||
if (this.getErrors() != null && !this.getErrors().isEmpty()) {
|
||||
if (!ListUtils.isEmpty(this.getErrors())) {
|
||||
allErrors.addAll(this.getErrors());
|
||||
}
|
||||
|
||||
return allErrors;
|
||||
}
|
||||
|
||||
public List<Task> allFinallyWithChildren() {
|
||||
var allFinally = allTasksWithChilds().stream()
|
||||
.filter(task -> task.isFlowable() && ((FlowableTask<?>) task).getFinally() != null)
|
||||
.flatMap(task -> ((FlowableTask<?>) task).getFinally().stream())
|
||||
.collect(Collectors.toCollection(ArrayList::new));
|
||||
|
||||
if (!ListUtils.isEmpty(this.getFinally())) {
|
||||
allFinally.addAll(this.getFinally());
|
||||
}
|
||||
|
||||
return allFinally;
|
||||
}
|
||||
|
||||
public Task findParentTasksByTaskId(String taskId) {
|
||||
return allTasksWithChilds()
|
||||
.stream()
|
||||
|
||||
@@ -116,7 +116,7 @@ public class State {
|
||||
}
|
||||
|
||||
public Instant maxDate() {
|
||||
if (this.histories.size() == 0) {
|
||||
if (this.histories.isEmpty()) {
|
||||
return Instant.now();
|
||||
}
|
||||
|
||||
@@ -124,7 +124,7 @@ public class State {
|
||||
}
|
||||
|
||||
public Instant minDate() {
|
||||
if (this.histories.size() == 0) {
|
||||
if (this.histories.isEmpty()) {
|
||||
return Instant.now();
|
||||
}
|
||||
|
||||
@@ -168,6 +168,16 @@ public class State {
|
||||
return this.current.isPaused();
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public boolean isBreakpoint() {
|
||||
return this.current.isBreakpoint();
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public boolean isQueued() {
|
||||
return this.current.isQueued();
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public boolean isRetrying() {
|
||||
return this.current.isRetrying();
|
||||
@@ -201,6 +211,14 @@ public class State {
|
||||
return this.histories.get(this.histories.size() - 2).state.isPaused();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true if the execution has failed, then was restarted.
|
||||
* This is to disambiguate between a RESTARTED after PAUSED and RESTARTED after FAILED state.
|
||||
*/
|
||||
public boolean failedThenRestarted() {
|
||||
return this.current == Type.RESTARTED && this.histories.get(this.histories.size() - 2).state.isFailed();
|
||||
}
|
||||
|
||||
@Introspected
|
||||
public enum Type {
|
||||
CREATED,
|
||||
@@ -216,7 +234,8 @@ public class State {
|
||||
QUEUED,
|
||||
RETRYING,
|
||||
RETRIED,
|
||||
SKIPPED;
|
||||
SKIPPED,
|
||||
BREAKPOINT;
|
||||
|
||||
public boolean isTerminated() {
|
||||
return this == Type.FAILED || this == Type.WARNING || this == Type.SUCCESS || this == Type.KILLED || this == Type.CANCELLED || this == Type.RETRIED || this == Type.SKIPPED;
|
||||
@@ -242,6 +261,10 @@ public class State {
|
||||
return this == Type.PAUSED;
|
||||
}
|
||||
|
||||
public boolean isBreakpoint() {
|
||||
return this == Type.BREAKPOINT;
|
||||
}
|
||||
|
||||
public boolean isRetrying() {
|
||||
return this == Type.RETRYING || this == Type.RETRIED;
|
||||
}
|
||||
@@ -254,6 +277,10 @@ public class State {
|
||||
return this == Type.KILLED;
|
||||
}
|
||||
|
||||
public boolean isQueued(){
|
||||
return this == Type.QUEUED;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return states that are terminal to an execution
|
||||
*/
|
||||
|
||||
@@ -20,8 +20,8 @@ public class FileInput extends Input<URI> {
|
||||
|
||||
private static final String DEFAULT_EXTENSION = ".upl";
|
||||
|
||||
@Builder.Default
|
||||
public String extension = DEFAULT_EXTENSION;
|
||||
@Deprecated(since = "0.24", forRemoval = true)
|
||||
public String extension;
|
||||
|
||||
@Override
|
||||
public void validate(URI input) throws ConstraintViolationException {
|
||||
@@ -32,6 +32,7 @@ public class FileInput extends Input<URI> {
|
||||
String res = inputs.stream()
|
||||
.filter(in -> in instanceof FileInput)
|
||||
.filter(in -> in.getId().equals(fileName))
|
||||
.filter(flowInput -> ((FileInput) flowInput).getExtension() != null)
|
||||
.map(flowInput -> ((FileInput) flowInput).getExtension())
|
||||
.findFirst()
|
||||
.orElse(FileInput.DEFAULT_EXTENSION);
|
||||
|
||||
@@ -108,7 +108,7 @@ public class MultiselectInput extends Input<List<String>> implements ItemTypeInt
|
||||
private List<String> renderExpressionValues(final Function<String, Object> renderer) {
|
||||
Object result;
|
||||
try {
|
||||
result = renderer.apply(expression);
|
||||
result = renderer.apply(expression.trim());
|
||||
} catch (Exception e) {
|
||||
throw ManualConstraintViolation.toConstraintViolationException(
|
||||
"Cannot render 'expression'. Cause: " + e.getMessage(),
|
||||
|
||||
@@ -86,7 +86,7 @@ public class SelectInput extends Input<String> implements RenderableInput {
|
||||
private List<String> renderExpressionValues(final Function<String, Object> renderer) {
|
||||
Object result;
|
||||
try {
|
||||
result = renderer.apply(expression);
|
||||
result = renderer.apply(expression.trim());
|
||||
} catch (Exception e) {
|
||||
throw ManualConstraintViolation.toConstraintViolationException(
|
||||
"Cannot render 'expression'. Cause: " + e.getMessage(),
|
||||
|
||||
@@ -68,6 +68,19 @@ public class Property<T> {
|
||||
String getExpression() {
|
||||
return expression;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new {@link Property} with no cached rendered value,
|
||||
* so that the next render will evaluate its original Pebble expression.
|
||||
* <p>
|
||||
* The returned property will still cache its rendered result.
|
||||
* To re-evaluate on a subsequent render, call {@code skipCache()} again.
|
||||
*
|
||||
* @return a new {@link Property} without a pre-rendered value
|
||||
*/
|
||||
public Property<T> skipCache() {
|
||||
return Property.ofExpression(expression);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a new Property object with a value already set.<br>
|
||||
|
||||
20
core/src/main/java/io/kestra/core/models/tasks/Cache.java
Normal file
20
core/src/main/java/io/kestra/core/models/tasks/Cache.java
Normal file
@@ -0,0 +1,20 @@
|
||||
package io.kestra.core.models.tasks;
|
||||
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Introspected
|
||||
public class Cache {
|
||||
@NotNull
|
||||
private Boolean enabled;
|
||||
|
||||
private Duration ttl;
|
||||
}
|
||||
@@ -11,6 +11,7 @@ import io.kestra.core.models.tasks.retrys.AbstractRetry;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.plugin.core.flow.WorkingDirectory;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.Size;
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
@@ -28,6 +29,7 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
|
||||
@Plugin
|
||||
abstract public class Task implements TaskInterface {
|
||||
@Size(max = 256, message = "Task id must be at most 256 characters")
|
||||
protected String id;
|
||||
|
||||
protected String type;
|
||||
@@ -72,6 +74,10 @@ abstract public class Task implements TaskInterface {
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
private boolean allowWarning = false;
|
||||
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
@Valid
|
||||
private Cache taskCache;
|
||||
|
||||
public Optional<Task> findById(String id) {
|
||||
if (this.getId().equals(id)) {
|
||||
return Optional.of(this);
|
||||
|
||||
@@ -30,7 +30,7 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
* Helper class for task runners and script tasks.
|
||||
*/
|
||||
public final class ScriptService {
|
||||
private static final Pattern INTERNAL_STORAGE_PATTERN = Pattern.compile("(kestra:\\/\\/[-a-zA-Z0-9%._\\+~#=/]*)");
|
||||
private static final Pattern INTERNAL_STORAGE_PATTERN = Pattern.compile("(kestra:\\/\\/[-\\p{Alnum}._\\+~#=/]*)", Pattern.UNICODE_CHARACTER_CLASS);
|
||||
|
||||
// These are the three common additional variables task runners must provide for variable rendering.
|
||||
public static final String VAR_WORKING_DIR = "workingDir";
|
||||
|
||||
@@ -28,6 +28,7 @@ public interface QueueFactoryInterface {
|
||||
String SUBFLOWEXECUTIONRESULT_NAMED = "subflowExecutionResultQueue";
|
||||
String CLUSTER_EVENT_NAMED = "clusterEventQueue";
|
||||
String SUBFLOWEXECUTIONEND_NAMED = "subflowExecutionEndQueue";
|
||||
String EXECUTION_RUNNING_NAMED = "executionRunningQueue";
|
||||
|
||||
QueueInterface<Execution> execution();
|
||||
|
||||
@@ -58,4 +59,6 @@ public interface QueueFactoryInterface {
|
||||
QueueInterface<SubflowExecutionResult> subflowExecutionResult();
|
||||
|
||||
QueueInterface<SubflowExecutionEnd> subflowExecutionEnd();
|
||||
|
||||
QueueInterface<ExecutionRunning> executionRunning();
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import io.kestra.core.models.Pauseable;
|
||||
import io.kestra.core.utils.Either;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.List;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public interface QueueInterface<T> extends Closeable, Pauseable {
|
||||
@@ -18,7 +19,15 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
|
||||
emitAsync(null, message);
|
||||
}
|
||||
|
||||
void emitAsync(String consumerGroup, T message) throws QueueException;
|
||||
default void emitAsync(String consumerGroup, T message) throws QueueException {
|
||||
emitAsync(consumerGroup, List.of(message));
|
||||
}
|
||||
|
||||
default void emitAsync(List<T> messages) throws QueueException {
|
||||
emitAsync(null, messages);
|
||||
}
|
||||
|
||||
void emitAsync(String consumerGroup, List<T> messages) throws QueueException;
|
||||
|
||||
default void delete(T message) throws QueueException {
|
||||
delete(null, message);
|
||||
@@ -27,7 +36,7 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
|
||||
void delete(String consumerGroup, T message) throws QueueException;
|
||||
|
||||
default Runnable receive(Consumer<Either<T, DeserializationException>> consumer) {
|
||||
return receive((String) null, consumer);
|
||||
return receive(null, consumer, false);
|
||||
}
|
||||
|
||||
default Runnable receive(String consumerGroup, Consumer<Either<T, DeserializationException>> consumer) {
|
||||
|
||||
@@ -27,8 +27,6 @@ public class QueueService {
|
||||
return ((Executor) object).getExecution().getId();
|
||||
} else if (object.getClass() == MetricEntry.class) {
|
||||
return null;
|
||||
} else if (object.getClass() == ExecutionRunning.class) {
|
||||
return ((ExecutionRunning) object).getExecution().getId();
|
||||
} else if (object.getClass() == SubflowExecutionEnd.class) {
|
||||
return ((SubflowExecutionEnd) object).getParentExecutionId();
|
||||
} else {
|
||||
|
||||
@@ -0,0 +1,12 @@
|
||||
package io.kestra.core.queues;
|
||||
|
||||
import java.io.Serial;
|
||||
|
||||
public class UnsupportedMessageException extends QueueException {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public UnsupportedMessageException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
||||
@@ -24,6 +24,8 @@ public interface DashboardRepositoryInterface {
|
||||
|
||||
List<Dashboard> findAll(String tenantId);
|
||||
|
||||
List<Dashboard> findAllWithNoAcl(String tenantId);
|
||||
|
||||
default Dashboard save(Dashboard dashboard, String source) {
|
||||
return this.save(null, dashboard, source);
|
||||
}
|
||||
|
||||
@@ -159,4 +159,9 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
|
||||
CHILD,
|
||||
MAIN
|
||||
}
|
||||
|
||||
List<Execution> lastExecutions(
|
||||
String tenantId,
|
||||
@Nullable List<FlowFilter> flows
|
||||
);
|
||||
}
|
||||
|
||||
@@ -103,6 +103,8 @@ public interface FlowRepositoryInterface {
|
||||
|
||||
List<FlowWithSource> findAllWithSource(String tenantId);
|
||||
|
||||
List<FlowWithSource> findAllWithSourceWithNoAcl(String tenantId);
|
||||
|
||||
List<Flow> findAllForAllTenants();
|
||||
|
||||
List<FlowWithSource> findAllWithSourceForAllTenants();
|
||||
|
||||
@@ -10,5 +10,7 @@ public interface FlowTopologyRepositoryInterface {
|
||||
|
||||
List<FlowTopology> findByNamespace(String tenantId, String namespace);
|
||||
|
||||
List<FlowTopology> findAll(String tenantId);
|
||||
|
||||
FlowTopology save(FlowTopology flowTopology);
|
||||
}
|
||||
|
||||
@@ -82,6 +82,8 @@ public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry
|
||||
Flux<LogEntry> findAsync(
|
||||
@Nullable String tenantId,
|
||||
@Nullable String namespace,
|
||||
@Nullable String flowId,
|
||||
@Nullable String executionId,
|
||||
@Nullable Level minLevel,
|
||||
ZonedDateTime startDate
|
||||
);
|
||||
@@ -96,5 +98,5 @@ public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry
|
||||
|
||||
void deleteByQuery(String tenantId, String namespace, String flowId, String triggerId);
|
||||
|
||||
int deleteByQuery(String tenantId, String namespace, String flowId, List<Level> logLevels, ZonedDateTime startDate, ZonedDateTime endDate);
|
||||
int deleteByQuery(String tenantId, String namespace, String flowId, String executionId, List<Level> logLevels, ZonedDateTime startDate, ZonedDateTime endDate);
|
||||
}
|
||||
|
||||
@@ -12,6 +12,8 @@ public interface TemplateRepositoryInterface {
|
||||
|
||||
List<Template> findAll(String tenantId);
|
||||
|
||||
List<Template> findAllWithNoAcl(String tenantId);
|
||||
|
||||
List<Template> findAllForAllTenants();
|
||||
|
||||
ArrayListTotal<Template> find(
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
@@ -11,7 +12,7 @@ import lombok.With;
|
||||
@Value
|
||||
@AllArgsConstructor
|
||||
@Builder
|
||||
public class ExecutionRunning {
|
||||
public class ExecutionRunning implements HasUID {
|
||||
String tenantId;
|
||||
|
||||
@NotNull
|
||||
@@ -26,9 +27,10 @@ public class ExecutionRunning {
|
||||
@With
|
||||
ConcurrencyState concurrencyState;
|
||||
|
||||
@Override
|
||||
public String uid() {
|
||||
return IdUtils.fromPartsAndSeparator('|', this.tenantId, this.namespace, this.flowId, this.execution.getId());
|
||||
}
|
||||
|
||||
public enum ConcurrencyState { CREATED, RUNNING, QUEUED }
|
||||
public enum ConcurrencyState { CREATED, RUNNING, QUEUED, CANCELLED, FAILED }
|
||||
}
|
||||
|
||||
@@ -85,7 +85,8 @@ public class Executor {
|
||||
}
|
||||
|
||||
public Boolean canBeProcessed() {
|
||||
return !(this.getException() != null || this.getFlow() == null || this.getFlow() instanceof FlowWithException || this.getFlow().getTasks() == null || this.getExecution().isDeleted() || this.getExecution().getState().isPaused());
|
||||
return !(this.getException() != null || this.getFlow() == null || this.getFlow() instanceof FlowWithException || this.getFlow().getTasks() == null ||
|
||||
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint() || this.getExecution().getState().isQueued());
|
||||
}
|
||||
|
||||
public Executor withFlow(FlowWithSource flow) {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.debug.Breakpoint;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.Label;
|
||||
@@ -18,6 +19,7 @@ import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.test.flow.TaskFixture;
|
||||
import io.kestra.core.trace.propagation.RunContextTextMapSetter;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.MapUtils;
|
||||
import io.kestra.core.utils.TruthUtils;
|
||||
import io.kestra.plugin.core.flow.LoopUntil;
|
||||
import io.kestra.plugin.core.flow.Pause;
|
||||
@@ -100,49 +102,39 @@ public class ExecutorService {
|
||||
return this.flowExecutorInterface;
|
||||
}
|
||||
|
||||
public Executor checkConcurrencyLimit(Executor executor, FlowInterface flow, Execution execution, long count) {
|
||||
// if above the limit, handle concurrency limit based on its behavior
|
||||
if (count >= flow.getConcurrency().getLimit()) {
|
||||
public ExecutionRunning processExecutionRunning(FlowInterface flow, int runningCount, ExecutionRunning executionRunning) {
|
||||
// if concurrency was removed, it can be null as we always get the latest flow definition
|
||||
if (flow.getConcurrency() != null && runningCount >= flow.getConcurrency().getLimit()) {
|
||||
return switch (flow.getConcurrency().getBehavior()) {
|
||||
case QUEUE -> {
|
||||
var newExecution = execution.withState(State.Type.QUEUED);
|
||||
|
||||
ExecutionRunning executionRunning = ExecutionRunning.builder()
|
||||
.tenantId(flow.getTenantId())
|
||||
.namespace(flow.getNamespace())
|
||||
.flowId(flow.getId())
|
||||
.execution(newExecution)
|
||||
.concurrencyState(ExecutionRunning.ConcurrencyState.QUEUED)
|
||||
.build();
|
||||
|
||||
// when max concurrency is reached, we throttle the execution and stop processing
|
||||
logService.logExecution(
|
||||
newExecution,
|
||||
executionRunning.getExecution(),
|
||||
Level.INFO,
|
||||
"Flow is queued due to concurrency limit exceeded, {} running(s)",
|
||||
count
|
||||
"Execution is queued due to concurrency limit exceeded, {} running(s)",
|
||||
runningCount
|
||||
);
|
||||
// return the execution queued
|
||||
yield executor
|
||||
.withExecutionRunning(executionRunning)
|
||||
.withExecution(newExecution, "checkConcurrencyLimit");
|
||||
var newExecution = executionRunning.getExecution().withState(State.Type.QUEUED);
|
||||
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_QUEUED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_QUEUED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
|
||||
yield executionRunning
|
||||
.withExecution(newExecution)
|
||||
.withConcurrencyState(ExecutionRunning.ConcurrencyState.QUEUED);
|
||||
}
|
||||
case CANCEL ->
|
||||
executor.withExecution(execution.withState(State.Type.CANCELLED), "checkConcurrencyLimit");
|
||||
executionRunning
|
||||
.withExecution(executionRunning.getExecution().withState(State.Type.CANCELLED))
|
||||
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
|
||||
case FAIL ->
|
||||
executor.withException(new IllegalStateException("Flow is FAILED due to concurrency limit exceeded"), "checkConcurrencyLimit");
|
||||
executionRunning
|
||||
.withExecution(executionRunning.getExecution().failedExecutionFromExecutor(new IllegalStateException("Execution is FAILED due to concurrency limit exceeded")).getExecution())
|
||||
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
// if under the limit, update the executor with a RUNNING ExecutionRunning to track them
|
||||
var executionRunning = new ExecutionRunning(
|
||||
flow.getTenantId(),
|
||||
flow.getNamespace(),
|
||||
flow.getId(),
|
||||
executor.getExecution(),
|
||||
ExecutionRunning.ConcurrencyState.RUNNING
|
||||
);
|
||||
return executor.withExecutionRunning(executionRunning);
|
||||
// if under the limit, run it!
|
||||
return executionRunning
|
||||
.withExecution(executionRunning.getExecution().withState(State.Type.RUNNING))
|
||||
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
|
||||
}
|
||||
|
||||
public Executor process(Executor executor) {
|
||||
@@ -245,9 +237,9 @@ public class ExecutorService {
|
||||
try {
|
||||
state = flowableParent.resolveState(runContext, execution, parentTaskRun);
|
||||
} catch (Exception e) {
|
||||
// This will lead to the next task being still executed but at least Kestra will not crash.
|
||||
// This will lead to the next task being still executed, but at least Kestra will not crash.
|
||||
// This is the best we can do, Flowable task should not fail, so it's a kind of panic mode.
|
||||
runContext.logger().error("Unable to resolve state from the Flowable task: " + e.getMessage(), e);
|
||||
runContext.logger().error("Unable to resolve state from the Flowable task: {}", e.getMessage(), e);
|
||||
state = Optional.of(State.Type.FAILED);
|
||||
}
|
||||
Optional<WorkerTaskResult> endedTask = childWorkerTaskTypeToWorkerTask(
|
||||
@@ -260,8 +252,10 @@ public class ExecutorService {
|
||||
// Compute outputs for the parent Flowable task if a terminated state was resolved
|
||||
if (workerTaskResult.getTaskRun().getState().isTerminated()) {
|
||||
try {
|
||||
// as flowable tasks can save outputs during iterative execution, we must merge the maps here
|
||||
Output outputs = flowableParent.outputs(runContext);
|
||||
Variables variables = variablesService.of(StorageContext.forTask(workerTaskResult.getTaskRun()), outputs);
|
||||
Map<String, Object> outputMap = MapUtils.merge(workerTaskResult.getTaskRun().getOutputs(), outputs == null ? null : outputs.toMap());
|
||||
Variables variables = variablesService.of(StorageContext.forTask(workerTaskResult.getTaskRun()), outputMap);
|
||||
return Optional.of(new WorkerTaskResult(workerTaskResult
|
||||
.getTaskRun()
|
||||
.withOutputs(variables)
|
||||
@@ -595,6 +589,23 @@ public class ExecutorService {
|
||||
list = list.stream().filter(workerTaskResult -> !workerTaskResult.getTaskRun().getId().equals(taskRun.getParentTaskRunId()))
|
||||
.collect(Collectors.toCollection(ArrayList::new));
|
||||
}
|
||||
|
||||
// If the task is a flowable and its terminated, check that all children are terminated.
|
||||
// This may not be the case for parallel flowable tasks like Parallel, Dag, ForEach...
|
||||
// After a fail task, some child flowable may not be correctly terminated.
|
||||
if (task instanceof FlowableTask<?> && taskRun.getState().isTerminated()) {
|
||||
List<TaskRun> updated = executor.getExecution().findChildren(taskRun).stream()
|
||||
.filter(child -> !child.getState().isTerminated())
|
||||
.map(throwFunction(child -> child.withState(taskRun.getState().getCurrent())))
|
||||
.toList();
|
||||
if (!updated.isEmpty()) {
|
||||
Execution execution = executor.getExecution();
|
||||
for (TaskRun child : updated) {
|
||||
execution = execution.withTaskRun(child);
|
||||
}
|
||||
executor = executor.withExecution(execution, "handledTerminatedFlowableTasks");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
metricRegistry
|
||||
@@ -735,6 +746,7 @@ public class ExecutorService {
|
||||
List<TaskRun> afterExecutionNexts = FlowableUtils.resolveSequentialNexts(executor.getExecution(), afterExecutionResolvedTasks)
|
||||
.stream()
|
||||
.map(throwFunction(NextTaskRun::getTaskRun))
|
||||
.map(taskRun -> taskRun.withForceExecution(true)) // forceExecution so it would be executed even if the execution is killed
|
||||
.toList();
|
||||
if (!afterExecutionNexts.isEmpty()) {
|
||||
return executor.withTaskRun(afterExecutionNexts, "handleAfterExecution ");
|
||||
@@ -887,13 +899,38 @@ public class ExecutorService {
|
||||
this.addWorkerTaskResults(executor, workerTaskResults);
|
||||
}
|
||||
|
||||
|
||||
if (workerTasks.isEmpty() || hasMockedWorkerTask) {
|
||||
return executor;
|
||||
}
|
||||
|
||||
Executor executorToReturn = executor;
|
||||
|
||||
// suspend on breakpoint: if a breakpoint is for a CREATED taskrun, set the execution state to BREAKPOINT and ends here
|
||||
if (!ListUtils.isEmpty(executor.getExecution().getBreakpoints())) {
|
||||
List<Breakpoint> breakpoints = executor.getExecution().getBreakpoints();
|
||||
if (executor.getExecution()
|
||||
.getTaskRunList()
|
||||
.stream()
|
||||
.anyMatch(taskRun -> shouldSuspend(taskRun, breakpoints))
|
||||
) {
|
||||
List<TaskRun> newTaskRuns = executor.getExecution().getTaskRunList().stream().map(
|
||||
taskRun -> {
|
||||
if (shouldSuspend(taskRun, breakpoints)) {
|
||||
return taskRun.withState(State.Type.BREAKPOINT);
|
||||
}
|
||||
return taskRun;
|
||||
}
|
||||
).toList();
|
||||
Execution newExecution = executor.getExecution().withTaskRunList(newTaskRuns).withState(State.Type.BREAKPOINT);
|
||||
executorToReturn = executorToReturn.withExecution(newExecution, "handleBreakpoint");
|
||||
logService.logExecution(
|
||||
newExecution,
|
||||
Level.INFO,
|
||||
"Flow is suspended at a breakpoint."
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Ends FAILED or CANCELLED task runs by creating worker task results
|
||||
List<WorkerTask> endedTasks = workerTasks.get(true);
|
||||
if (endedTasks != null && !endedTasks.isEmpty()) {
|
||||
@@ -907,7 +944,7 @@ public class ExecutorService {
|
||||
|
||||
// Send other TaskRun to the worker (create worker tasks)
|
||||
List<WorkerTask> processingTasks = workerTasks.get(false);
|
||||
if (processingTasks != null && !processingTasks.isEmpty()) {
|
||||
if (processingTasks != null && !processingTasks.isEmpty() && !executor.getExecution().getState().isBreakpoint()) {
|
||||
executorToReturn = executorToReturn.withWorkerTasks(processingTasks, "handleWorkerTask");
|
||||
|
||||
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_TASKRUN_CREATED_COUNT, MetricRegistry.METRIC_EXECUTOR_TASKRUN_CREATED_COUNT_DESCRIPTION, metricRegistry.tags(executor.getExecution())).increment(processingTasks.size());
|
||||
@@ -916,6 +953,11 @@ public class ExecutorService {
|
||||
return executorToReturn;
|
||||
}
|
||||
|
||||
private boolean shouldSuspend(TaskRun taskRun, List<Breakpoint> breakpoints) {
|
||||
return taskRun.getState().getCurrent().isCreated() && breakpoints.stream()
|
||||
.anyMatch(breakpoint -> taskRun.getTaskId().equals(breakpoint.getId()) && (breakpoint.getValue() == null || Objects.equals(taskRun.getValue(), breakpoint.getValue())));
|
||||
}
|
||||
|
||||
private Executor handleExecutableTask(final Executor executor) {
|
||||
List<SubflowExecution<?>> executions = new ArrayList<>();
|
||||
List<SubflowExecutionResult> subflowExecutionResults = new ArrayList<>();
|
||||
@@ -1138,71 +1180,83 @@ public class ExecutorService {
|
||||
}
|
||||
|
||||
public void log(Logger log, Boolean in, WorkerJob value) {
|
||||
if (value instanceof WorkerTask workerTask) {
|
||||
log.debug(
|
||||
"{} {} : {}",
|
||||
in ? "<< IN " : ">> OUT",
|
||||
workerTask.getClass().getSimpleName(),
|
||||
workerTask.getTaskRun().toStringState()
|
||||
);
|
||||
} else if (value instanceof WorkerTrigger workerTrigger) {
|
||||
log.debug(
|
||||
"{} {} : {}",
|
||||
in ? "<< IN " : ">> OUT",
|
||||
workerTrigger.getClass().getSimpleName(),
|
||||
workerTrigger.getTriggerContext().uid()
|
||||
);
|
||||
if (log.isDebugEnabled()) { // taskRun().toStringState() is costly so we avoid calling it if not needed
|
||||
if (value instanceof WorkerTask workerTask) {
|
||||
log.debug(
|
||||
"{} {} : {}",
|
||||
in ? "<< IN " : ">> OUT",
|
||||
workerTask.getClass().getSimpleName(),
|
||||
workerTask.getTaskRun().toStringState()
|
||||
);
|
||||
} else if (value instanceof WorkerTrigger workerTrigger) {
|
||||
log.debug(
|
||||
"{} {} : {}",
|
||||
in ? "<< IN " : ">> OUT",
|
||||
workerTrigger.getClass().getSimpleName(),
|
||||
workerTrigger.getTriggerContext().uid()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void log(Logger log, Boolean in, WorkerTaskResult value) {
|
||||
log.debug(
|
||||
"{} {} : {}",
|
||||
in ? "<< IN " : ">> OUT",
|
||||
value.getClass().getSimpleName(),
|
||||
value.getTaskRun().toStringState()
|
||||
);
|
||||
if (log.isDebugEnabled()) { // taskRun().toStringState() is costly so we avoid calling it if not needed
|
||||
log.debug(
|
||||
"{} {} : {}",
|
||||
in ? "<< IN " : ">> OUT",
|
||||
value.getClass().getSimpleName(),
|
||||
value.getTaskRun().toStringState()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public void log(Logger log, Boolean in, SubflowExecutionResult value) {
|
||||
log.debug(
|
||||
"{} {} : {}",
|
||||
in ? "<< IN " : ">> OUT",
|
||||
value.getClass().getSimpleName(),
|
||||
value.getParentTaskRun().toStringState()
|
||||
);
|
||||
if (log.isDebugEnabled()) { // taskRun().toStringState() is costly so we avoid calling it if not needed
|
||||
log.debug(
|
||||
"{} {} : {}",
|
||||
in ? "<< IN " : ">> OUT",
|
||||
value.getClass().getSimpleName(),
|
||||
value.getParentTaskRun().toStringState()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public void log(Logger log, Boolean in, SubflowExecutionEnd value) {
|
||||
log.debug(
|
||||
"{} {} : {}",
|
||||
in ? "<< IN " : ">> OUT",
|
||||
value.getClass().getSimpleName(),
|
||||
value.toStringState()
|
||||
);
|
||||
if (log.isDebugEnabled()) { // taskRun().toStringState() is costly so we avoid calling it if not needed
|
||||
log.debug(
|
||||
"{} {} : {}",
|
||||
in ? "<< IN " : ">> OUT",
|
||||
value.getClass().getSimpleName(),
|
||||
value.toStringState()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public void log(Logger log, Boolean in, Execution value) {
|
||||
log.debug(
|
||||
"{} {} [key='{}']\n{}",
|
||||
in ? "<< IN " : ">> OUT",
|
||||
value.getClass().getSimpleName(),
|
||||
value.getId(),
|
||||
value.toStringState()
|
||||
);
|
||||
if (log.isDebugEnabled()) { // taskRun().toStringState() is costly so we avoid calling it if not needed
|
||||
log.debug(
|
||||
"{} {} [key='{}']\n{}",
|
||||
in ? "<< IN " : ">> OUT",
|
||||
value.getClass().getSimpleName(),
|
||||
value.getId(),
|
||||
value.toStringState()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public void log(Logger log, Boolean in, Executor value) {
|
||||
log.debug(
|
||||
"{} {} [key='{}', from='{}', offset='{}', crc32='{}']\n{}",
|
||||
in ? "<< IN " : ">> OUT",
|
||||
value.getClass().getSimpleName(),
|
||||
value.getExecution().getId(),
|
||||
value.getFrom(),
|
||||
value.getOffset(),
|
||||
value.getExecution().toCrc32State(),
|
||||
value.getExecution().toStringState()
|
||||
);
|
||||
if (log.isDebugEnabled()) { // taskRun().toStringState() is costly so we avoid calling it if not needed
|
||||
log.debug(
|
||||
"{} {} [key='{}', from='{}', offset='{}', crc32='{}']\n{}",
|
||||
in ? "<< IN " : ">> OUT",
|
||||
value.getClass().getSimpleName(),
|
||||
value.getExecution().getId(),
|
||||
value.getFrom(),
|
||||
value.getOffset(),
|
||||
value.getExecution().toCrc32State(),
|
||||
value.getExecution().toStringState()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public void log(Logger log, Boolean in, ExecutionKilledExecution value) {
|
||||
|
||||
@@ -138,19 +138,30 @@ public class FlowInputOutput {
|
||||
.publishOn(Schedulers.boundedElastic())
|
||||
.<AbstractMap.SimpleEntry<String, String>>handle((input, sink) -> {
|
||||
if (input instanceof CompletedFileUpload fileUpload) {
|
||||
boolean oldStyleInput = false;
|
||||
if ("files".equals(fileUpload.getName())) {
|
||||
// we are maybe in an old-style usage of the input, let's check if there is an input named after the filename
|
||||
oldStyleInput = inputs.stream().anyMatch(i -> i.getId().equals(fileUpload.getFilename()));
|
||||
}
|
||||
if (oldStyleInput) {
|
||||
var runContext = runContextFactory.of(null, execution);
|
||||
runContext.logger().warn("Using a deprecated way to upload a FILE input. You must set the input 'id' as part name and set the name of the file using the regular 'filename' part attribute.");
|
||||
}
|
||||
String inputId = oldStyleInput ? fileUpload.getFilename() : fileUpload.getName();
|
||||
String fileName = oldStyleInput ? FileInput.findFileInputExtension(inputs, fileUpload.getFilename()) : fileUpload.getFilename();
|
||||
|
||||
if (!uploadFiles) {
|
||||
final String fileExtension = FileInput.findFileInputExtension(inputs, fileUpload.getFilename());
|
||||
URI from = URI.create("kestra://" + StorageContext
|
||||
.forInput(execution, fileUpload.getFilename(), fileUpload.getFilename() + fileExtension)
|
||||
.forInput(execution, inputId, fileName)
|
||||
.getContextStorageURI()
|
||||
);
|
||||
fileUpload.discard();
|
||||
sink.next(new AbstractMap.SimpleEntry<>(fileUpload.getFilename(), from.toString()));
|
||||
sink.next(new AbstractMap.SimpleEntry<>(inputId, from.toString()));
|
||||
} else {
|
||||
try {
|
||||
final String fileExtension = FileInput.findFileInputExtension(inputs, fileUpload.getFilename());
|
||||
final String fileExtension = FileInput.findFileInputExtension(inputs, fileName);
|
||||
|
||||
String prefix = StringUtils.leftPad(fileUpload.getFilename() + "_", 3, "_");
|
||||
String prefix = StringUtils.leftPad(fileName + "_", 3, "_");
|
||||
File tempFile = File.createTempFile(prefix, fileExtension);
|
||||
try (var inputStream = fileUpload.getInputStream();
|
||||
var outputStream = new FileOutputStream(tempFile)) {
|
||||
@@ -159,8 +170,8 @@ public class FlowInputOutput {
|
||||
sink.error(new KestraRuntimeException("Can't upload file: " + fileUpload.getFilename()));
|
||||
return;
|
||||
}
|
||||
URI from = storageInterface.from(execution, fileUpload.getFilename(), tempFile);
|
||||
sink.next(new AbstractMap.SimpleEntry<>(fileUpload.getFilename(), from.toString()));
|
||||
URI from = storageInterface.from(execution, inputId, fileName, tempFile);
|
||||
sink.next(new AbstractMap.SimpleEntry<>(inputId, from.toString()));
|
||||
} finally {
|
||||
if (!tempFile.delete()) {
|
||||
tempFile.deleteOnExit();
|
||||
@@ -429,7 +440,7 @@ public class FlowInputOutput {
|
||||
if (URIFetcher.supports(uri)) {
|
||||
yield uri;
|
||||
} else {
|
||||
yield storageInterface.from(execution, id, new File(current.toString()));
|
||||
yield storageInterface.from(execution, id, current.toString().substring(current.toString().lastIndexOf("/") + 1), new File(current.toString()));
|
||||
}
|
||||
}
|
||||
case JSON -> JacksonMapper.toObject(current.toString());
|
||||
|
||||
@@ -286,18 +286,10 @@ public class FlowableUtils {
|
||||
|
||||
// start as many tasks as we have concurrency slots
|
||||
return collect.values().stream()
|
||||
.map(resolvedTasks -> filterCreated(resolvedTasks, taskRuns, parentTaskRun))
|
||||
.map(resolvedTasks -> resolveSequentialNexts(execution, resolvedTasks, null, null, parentTaskRun))
|
||||
.filter(resolvedTasks -> !resolvedTasks.isEmpty())
|
||||
.limit(concurrencySlots)
|
||||
.map(resolvedTasks -> resolvedTasks.getFirst().toNextTaskRun(execution))
|
||||
.toList();
|
||||
}
|
||||
|
||||
private static List<ResolvedTask> filterCreated(List<ResolvedTask> tasks, List<TaskRun> taskRuns, TaskRun parentTaskRun) {
|
||||
return tasks.stream()
|
||||
.filter(resolvedTask -> taskRuns.stream()
|
||||
.noneMatch(taskRun -> FlowableUtils.isTaskRunFor(resolvedTask, taskRun, parentTaskRun))
|
||||
)
|
||||
.map(resolvedTasks -> resolvedTasks.getFirst())
|
||||
.toList();
|
||||
}
|
||||
|
||||
|
||||
@@ -29,7 +29,7 @@ import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
||||
private static final int MAX_MESSAGE_LENGTH = 1024 * 10;
|
||||
private static final int MAX_MESSAGE_LENGTH = 1024 * 15;
|
||||
public static final String ORIGINAL_TIMESTAMP_KEY = "originalTimestamp";
|
||||
|
||||
private final String loggerName;
|
||||
@@ -80,7 +80,6 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
||||
}
|
||||
|
||||
List<LogEntry> result = new ArrayList<>();
|
||||
long i = 0;
|
||||
for (String s : split) {
|
||||
result.add(LogEntry.builder()
|
||||
.namespace(logEntry.getNamespace())
|
||||
@@ -98,7 +97,6 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
||||
.thread(event.getThreadName())
|
||||
.build()
|
||||
);
|
||||
i++;
|
||||
}
|
||||
|
||||
return result;
|
||||
@@ -331,14 +329,11 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
||||
protected void append(ILoggingEvent e) {
|
||||
e = this.transform(e);
|
||||
|
||||
logEntries(e, logEntry)
|
||||
.forEach(l -> {
|
||||
try {
|
||||
logQueue.emitAsync(l);
|
||||
} catch (QueueException ex) {
|
||||
log.warn("Unable to emit logQueue", ex);
|
||||
}
|
||||
});
|
||||
try {
|
||||
logQueue.emitAsync(logEntries(e, logEntry));
|
||||
} catch (QueueException ex) {
|
||||
log.warn("Unable to emit logQueue", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,15 +4,11 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import jakarta.validation.ConstraintViolation;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import jakarta.validation.Validator;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
|
||||
@@ -27,12 +23,19 @@ public class RunContextProperty<T> {
|
||||
private final RunContext runContext;
|
||||
private final Task task;
|
||||
private final AbstractTrigger trigger;
|
||||
|
||||
private final boolean skipCache;
|
||||
|
||||
RunContextProperty(Property<T> property, RunContext runContext) {
|
||||
this(property, runContext, false);
|
||||
}
|
||||
|
||||
RunContextProperty(Property<T> property, RunContext runContext, boolean skipCache) {
|
||||
this.property = property;
|
||||
this.runContext = runContext;
|
||||
this.task = ((DefaultRunContext) runContext).getTask();
|
||||
this.trigger = ((DefaultRunContext) runContext).getTrigger();
|
||||
this.skipCache = skipCache;
|
||||
}
|
||||
|
||||
private void validate() {
|
||||
@@ -45,6 +48,19 @@ public class RunContextProperty<T> {
|
||||
log.trace("Unable to do validation: no task or trigger found");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new {@link RunContextProperty} that will always be rendered by evaluating
|
||||
* its original Pebble expression, without using any previously cached value.
|
||||
* <p>
|
||||
* This ensures that each time the property is rendered, the underlying
|
||||
* expression is re-evaluated to produce a fresh result.
|
||||
*
|
||||
* @return a new {@link Property} that bypasses the cache
|
||||
*/
|
||||
public RunContextProperty<T> skipCache() {
|
||||
return new RunContextProperty<>(this.property, this.runContext, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Render a property then convert it to its target type and validate it.<br>
|
||||
@@ -55,13 +71,13 @@ public class RunContextProperty<T> {
|
||||
* Warning, due to the caching mechanism, this method is not thread-safe.
|
||||
*/
|
||||
public Optional<T> as(Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.as(prop, this.runContext, clazz)));
|
||||
|
||||
validate();
|
||||
return as;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Render a property with additional variables, then convert it to its target type and validate it.<br>
|
||||
*
|
||||
@@ -71,7 +87,7 @@ public class RunContextProperty<T> {
|
||||
* Warning, due to the caching mechanism, this method is not thread-safe.
|
||||
*/
|
||||
public Optional<T> as(Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.as(prop, this.runContext, clazz, variables)));
|
||||
|
||||
validate();
|
||||
@@ -89,7 +105,7 @@ public class RunContextProperty<T> {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public <I> T asList(Class<I> itemClazz) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz)))
|
||||
.orElse((T) Collections.emptyList());
|
||||
|
||||
@@ -108,7 +124,7 @@ public class RunContextProperty<T> {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public <I> T asList(Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz, variables)))
|
||||
.orElse((T) Collections.emptyList());
|
||||
|
||||
@@ -127,7 +143,7 @@ public class RunContextProperty<T> {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass)))
|
||||
.orElse((T) Collections.emptyMap());
|
||||
|
||||
@@ -146,11 +162,15 @@ public class RunContextProperty<T> {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass, variables)))
|
||||
.orElse((T) Collections.emptyMap());
|
||||
|
||||
validate();
|
||||
return as;
|
||||
}
|
||||
|
||||
private Property<T> getProperty() {
|
||||
return skipCache ? this.property.skipCache() : this.property;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -215,7 +215,7 @@ public final class RunVariables {
|
||||
|
||||
executionMap.put("id", execution.getId());
|
||||
|
||||
if (execution.getState() != null) { // can occurs in tests
|
||||
if (execution.getState() != null) { // can occur in tests
|
||||
executionMap.put("state", execution.getState().getCurrent());
|
||||
}
|
||||
|
||||
@@ -225,6 +225,10 @@ public final class RunVariables {
|
||||
Optional.ofNullable(execution.getOriginalId())
|
||||
.ifPresent(originalId -> executionMap.put("originalId", originalId));
|
||||
|
||||
if (execution.getOutputs() != null) {
|
||||
executionMap.put("outputs", execution.getOutputs());
|
||||
}
|
||||
|
||||
builder.put("execution", executionMap.build());
|
||||
|
||||
if (execution.getTaskRunList() != null) {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Throwables;
|
||||
@@ -44,7 +45,14 @@ import org.apache.commons.lang3.time.StopWatch;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
@@ -57,6 +65,9 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.zip.ZipEntry;
|
||||
import java.util.zip.ZipInputStream;
|
||||
import java.util.zip.ZipOutputStream;
|
||||
|
||||
import static io.kestra.core.models.flows.State.Type.*;
|
||||
import static io.kestra.core.server.Service.ServiceState.TERMINATED_FORCED;
|
||||
@@ -497,14 +508,11 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
Execution execution = workerTrigger.getTrigger().isFailOnTriggerError() ? TriggerService.generateExecution(workerTrigger.getTrigger(), workerTrigger.getConditionContext(), workerTrigger.getTriggerContext(), (Output) null)
|
||||
.withState(FAILED) : null;
|
||||
if (execution != null) {
|
||||
RunContextLogger.logEntries(Execution.loggingEventFromException(e), LogEntry.of(execution))
|
||||
.forEach(log -> {
|
||||
try {
|
||||
logQueue.emitAsync(log);
|
||||
} catch (QueueException ex) {
|
||||
// fail silently
|
||||
}
|
||||
});
|
||||
try {
|
||||
logQueue.emitAsync(RunContextLogger.logEntries(Execution.loggingEventFromException(e), LogEntry.of(execution)));
|
||||
} catch (QueueException ex) {
|
||||
// fail silently
|
||||
}
|
||||
}
|
||||
this.workerTriggerResultQueue.emit(
|
||||
WorkerTriggerResult.builder()
|
||||
@@ -657,7 +665,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
));
|
||||
}
|
||||
|
||||
if (killedExecution.contains(workerTask.getTaskRun().getExecutionId())) {
|
||||
if (! Boolean.TRUE.equals(workerTask.getTaskRun().getForceExecution()) && killedExecution.contains(workerTask.getTaskRun().getExecutionId())) {
|
||||
WorkerTaskResult workerTaskResult = new WorkerTaskResult(workerTask.getTaskRun().withState(KILLED));
|
||||
try {
|
||||
this.workerTaskResultQueue.emit(workerTaskResult);
|
||||
@@ -682,9 +690,44 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
|
||||
workerTask = workerTask.withTaskRun(workerTask.getTaskRun().withState(RUNNING));
|
||||
|
||||
DefaultRunContext runContext = runContextInitializer.forWorker((DefaultRunContext) workerTask.getRunContext(), workerTask);
|
||||
Optional<String> hash = Optional.empty();
|
||||
if (workerTask.getTask().getTaskCache() != null && workerTask.getTask().getTaskCache().getEnabled()) {
|
||||
runContext.logger().debug("Task output caching is enabled for task '{}''", workerTask.getTask().getId());
|
||||
hash = hashTask(runContext, workerTask.getTask());
|
||||
if (hash.isPresent()) {
|
||||
try {
|
||||
Optional<InputStream> cacheFile = runContext.storage().getCacheFile(hash.get(), workerTask.getTaskRun().getValue(), workerTask.getTask().getTaskCache().getTtl());
|
||||
if (cacheFile.isPresent()) {
|
||||
runContext.logger().info("Skipping task execution for task '{}' as there is an existing cache entry for it", workerTask.getTask().getId());
|
||||
try (ZipInputStream archive = new ZipInputStream(cacheFile.get())) {
|
||||
if (archive.getNextEntry() != null) {
|
||||
byte[] cache = archive.readAllBytes();
|
||||
Map<String, Object> outputMap = JacksonMapper.ofIon().readValue(cache, JacksonMapper.MAP_TYPE_REFERENCE);
|
||||
Variables variables = variablesService.of(StorageContext.forTask(workerTask.getTaskRun()), outputMap);
|
||||
|
||||
TaskRunAttempt attempt = TaskRunAttempt.builder()
|
||||
.state(new io.kestra.core.models.flows.State().withState(SUCCESS))
|
||||
.workerId(this.id)
|
||||
.build();
|
||||
List<TaskRunAttempt> attempts = this.addAttempt(workerTask, attempt);
|
||||
TaskRun taskRun = workerTask.getTaskRun().withAttempts(attempts).withOutputs(variables).withState(SUCCESS);
|
||||
WorkerTaskResult workerTaskResult = new WorkerTaskResult(taskRun);
|
||||
this.workerTaskResultQueue.emit(workerTaskResult);
|
||||
return workerTaskResult;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException | RuntimeException | QueueException e) {
|
||||
// in case of any exception, log an error and continue
|
||||
runContext.logger().error("Unexpected exception while loading the cache for task '{}', the task will be executed instead.", workerTask.getTask().getId(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
// run
|
||||
workerTask = this.runAttempt(workerTask);
|
||||
workerTask = this.runAttempt(runContext, workerTask);
|
||||
|
||||
// get last state
|
||||
TaskRunAttempt lastAttempt = workerTask.getTaskRun().lastAttempt();
|
||||
@@ -718,7 +761,30 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
workerTask = workerTask.withTaskRun(workerTask.getTaskRun().withState(state));
|
||||
|
||||
WorkerTaskResult workerTaskResult = new WorkerTaskResult(workerTask.getTaskRun(), dynamicTaskRuns);
|
||||
|
||||
this.workerTaskResultQueue.emit(workerTaskResult);
|
||||
|
||||
// upload the cache file, hash may not be present if we didn't succeed in computing it
|
||||
if (workerTask.getTask().getTaskCache() != null && workerTask.getTask().getTaskCache().getEnabled() && hash.isPresent() &&
|
||||
(state == State.Type.SUCCESS || state == State.Type.WARNING)) {
|
||||
runContext.logger().info("Uploading a cache entry for task '{}'", workerTask.getTask().getId());
|
||||
|
||||
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||
ZipOutputStream archive = new ZipOutputStream(bos)) {
|
||||
var zipEntry = new ZipEntry("outputs.ion");
|
||||
archive.putNextEntry(zipEntry);
|
||||
archive.write(JacksonMapper.ofIon().writeValueAsBytes(workerTask.getTaskRun().getOutputs()));
|
||||
archive.closeEntry();
|
||||
archive.finish();
|
||||
Path archiveFile = runContext.workingDir().createTempFile( ".zip");
|
||||
Files.write(archiveFile, bos.toByteArray());
|
||||
URI uri = runContext.storage().putCacheFile(archiveFile.toFile(), hash.get(), workerTask.getTaskRun().getValue());
|
||||
runContext.logger().debug("Caching entry uploaded in URI {}", uri);
|
||||
} catch (IOException | RuntimeException e) {
|
||||
// in case of any exception, log an error and continue
|
||||
runContext.logger().error("Unexpected exception while uploading the cache entry for task '{}', the task not be cached.", workerTask.getTask().getId(), e);
|
||||
}
|
||||
}
|
||||
return workerTaskResult;
|
||||
} catch (QueueException e) {
|
||||
// If there is a QueueException it can either be caused by the message limit or another queue issue.
|
||||
@@ -728,6 +794,10 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
// If it's a message too big, we remove the outputs
|
||||
failed = failed.withOutputs(Variables.empty());
|
||||
}
|
||||
if (e instanceof UnsupportedMessageException) {
|
||||
// we expect the offending char is in the output so we remove it
|
||||
failed = failed.withOutputs(Variables.empty());
|
||||
}
|
||||
WorkerTaskResult workerTaskResult = new WorkerTaskResult(failed);
|
||||
RunContextLogger contextLogger = runContextLoggerFactory.create(workerTask);
|
||||
contextLogger.logger().error("Unable to emit the worker task result to the queue: {}", e.getMessage(), e);
|
||||
@@ -747,6 +817,26 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
}
|
||||
}
|
||||
|
||||
private Optional<String> hashTask(RunContext runContext, Task task) {
|
||||
try {
|
||||
var map = JacksonMapper.toMap(task);
|
||||
// If there are task provided variables, rendering the task may fail.
|
||||
// The best we can do is to add a fake 'workingDir' as it's an often added variables,
|
||||
// and it should not be part of the task hash.
|
||||
Map<String, Object> variables = Map.of("workingDir", "workingDir");
|
||||
var rMap = runContext.render(map, variables);
|
||||
var json = JacksonMapper.ofJson().writeValueAsBytes(rMap);
|
||||
MessageDigest digest = MessageDigest.getInstance("SHA-256");
|
||||
digest.update(json);
|
||||
byte[] bytes = digest.digest();
|
||||
return Optional.of(HexFormat.of().formatHex(bytes));
|
||||
} catch (RuntimeException | IllegalVariableEvaluationException | JsonProcessingException |
|
||||
NoSuchAlgorithmException e) {
|
||||
runContext.logger().error("Unable to create the cache key for the task '{}'", task.getId(), e);
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
private List<TaskRun> dynamicWorkerResults(List<WorkerTaskResult> dynamicWorkerResults) {
|
||||
return dynamicWorkerResults
|
||||
.stream()
|
||||
@@ -802,9 +892,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
}
|
||||
}
|
||||
|
||||
private WorkerTask runAttempt(final WorkerTask workerTask) throws QueueException {
|
||||
DefaultRunContext runContext = runContextInitializer.forWorker((DefaultRunContext) workerTask.getRunContext(), workerTask);
|
||||
|
||||
private WorkerTask runAttempt(RunContext runContext, final WorkerTask workerTask) throws QueueException {
|
||||
Logger logger = runContext.logger();
|
||||
|
||||
if (!(workerTask.getTask() instanceof RunnableTask<?> task)) {
|
||||
@@ -1066,18 +1154,16 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method should only be used on tests.
|
||||
* It shut down the worker without waiting for tasks to end,
|
||||
* and without closing the queue, so tests can launch and shutdown a worker manually without closing the queue.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public void shutdown() {
|
||||
// initiate shutdown
|
||||
shutdown.compareAndSet(false, true);
|
||||
|
||||
try {
|
||||
// close the WorkerJob queue to stop receiving new JobTask execution.
|
||||
workerJobQueue.close();
|
||||
} catch (IOException e) {
|
||||
log.error("Failed to close the WorkerJobQueue");
|
||||
}
|
||||
|
||||
// close all queues and shutdown now
|
||||
this.receiveCancellations.forEach(Runnable::run);
|
||||
this.executorService.shutdownNow();
|
||||
|
||||
@@ -17,9 +17,11 @@ public abstract class AbstractDate {
|
||||
|
||||
private static final Map<String, DateTimeFormatter> FORMATTERS = ImmutableMap.<String, DateTimeFormatter>builder()
|
||||
.put("iso", DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX"))
|
||||
.put("iso_milli", DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"))
|
||||
.put("iso_sec", DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssXXX"))
|
||||
.put("sql", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"))
|
||||
.put("sql_seq", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
|
||||
.put("sql_milli", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))
|
||||
.put("sql_sec", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
|
||||
.put("iso_date_time", DateTimeFormatter.ISO_DATE_TIME)
|
||||
.put("iso_date", DateTimeFormatter.ISO_DATE)
|
||||
.put("iso_time", DateTimeFormatter.ISO_TIME)
|
||||
@@ -100,6 +102,19 @@ public abstract class AbstractDate {
|
||||
}
|
||||
|
||||
if (value instanceof Long longValue) {
|
||||
if(value.toString().length() == 13) {
|
||||
return Instant.ofEpochMilli(longValue).atZone(zoneId);
|
||||
}else if(value.toString().length() == 19 ){
|
||||
if(value.toString().endsWith("000")){
|
||||
long seconds = longValue/1_000_000_000;
|
||||
int nanos = (int) (longValue%1_000_000_000);
|
||||
return Instant.ofEpochSecond(seconds,nanos).atZone(zoneId);
|
||||
}else{
|
||||
long milliseconds = longValue/1_000_000;
|
||||
int micros = (int) (longValue%1_000_000);
|
||||
return Instant.ofEpochMilli(milliseconds).atZone(zoneId).withNano(micros*1000);
|
||||
}
|
||||
}
|
||||
return Instant.ofEpochSecond(longValue).atZone(zoneId);
|
||||
}
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.core.runners.pebble;
|
||||
|
||||
import io.kestra.core.runners.pebble.expression.NullCoalescingExpression;
|
||||
import io.kestra.core.runners.pebble.expression.UndefinedCoalescingExpression;
|
||||
import io.kestra.core.runners.pebble.expression.InExpression;
|
||||
import io.kestra.core.runners.pebble.filters.*;
|
||||
import io.kestra.core.runners.pebble.functions.*;
|
||||
import io.kestra.core.runners.pebble.tests.JsonTest;
|
||||
@@ -76,6 +77,7 @@ public class Extension extends AbstractExtension {
|
||||
|
||||
operators.add(new BinaryOperatorImpl("??", 120, NullCoalescingExpression::new, NORMAL, Associativity.LEFT));
|
||||
operators.add(new BinaryOperatorImpl("???", 120, UndefinedCoalescingExpression::new, NORMAL, Associativity.LEFT));
|
||||
operators.add(new BinaryOperatorImpl("isIn", 120, InExpression::new, NORMAL, Associativity.LEFT));
|
||||
|
||||
return operators;
|
||||
}
|
||||
@@ -91,6 +93,7 @@ public class Extension extends AbstractExtension {
|
||||
filters.put("dateAdd", new DateAddFilter());
|
||||
filters.put("timestamp", new TimestampFilter());
|
||||
filters.put("timestampMicro", new TimestampMicroFilter());
|
||||
filters.put("timestampMilli", new TimestampMilliFilter());
|
||||
filters.put("timestampNano", new TimestampNanoFilter());
|
||||
filters.put("jq", new JqFilter());
|
||||
filters.put("escapeChar", new EscapeCharFilter());
|
||||
@@ -155,6 +158,7 @@ public class Extension extends AbstractExtension {
|
||||
functions.put("fetchContext", new FetchContextFunction());
|
||||
functions.put("uuid", new UUIDFunction());
|
||||
functions.put("id", new IDFunction());
|
||||
functions.put("ksuid", new KSUIDFunction());
|
||||
functions.put("fromIon", new FromIonFunction());
|
||||
functions.put("fileSize", fileSizeFunction);
|
||||
if (this.errorLogsFunction != null) {
|
||||
|
||||
@@ -0,0 +1,60 @@
|
||||
package io.kestra.core.runners.pebble.expression;
|
||||
|
||||
import io.pebbletemplates.pebble.node.expression.BinaryExpression;
|
||||
import io.pebbletemplates.pebble.node.expression.Expression;
|
||||
import io.pebbletemplates.pebble.template.EvaluationContextImpl;
|
||||
import io.pebbletemplates.pebble.template.PebbleTemplateImpl;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
public class InExpression extends BinaryExpression<Object> {
|
||||
public InExpression() {
|
||||
}
|
||||
|
||||
public InExpression(Expression<?> left, Expression<?> right) {
|
||||
super(left, right);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object evaluate(PebbleTemplateImpl self, EvaluationContextImpl context) {
|
||||
Object leftValue = getLeftExpression().evaluate(self, context);
|
||||
Object rightValue = getRightExpression().evaluate(self, context);
|
||||
|
||||
if (leftValue == null || rightValue == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
switch (rightValue) {
|
||||
case Collection<?> objects -> {
|
||||
return objects.stream().map(Object::toString).toList().contains(leftValue.toString());
|
||||
}
|
||||
case Iterable<?> objects -> {
|
||||
for (Object item : objects) {
|
||||
if (Objects.equals(item.toString(), leftValue.toString())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
case Map<?, ?> map -> {
|
||||
for (Map.Entry<?, ?> entry : map.entrySet()) {
|
||||
if (Objects.equals(entry.getKey().toString(), leftValue.toString()) ||
|
||||
Objects.equals(entry.getValue().toString(), leftValue.toString())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
default -> {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("%s isIn %s", getLeftExpression(), getRightExpression());
|
||||
}
|
||||
}
|
||||
@@ -24,7 +24,7 @@ public class DateAddFilter extends AbstractDate implements Filter {
|
||||
return null;
|
||||
}
|
||||
|
||||
final Long amount = (Long) args.get("amount");
|
||||
final Long amount = getAsLong(args.get("amount"), lineNumber, self);
|
||||
final String unit = (String) args.get("unit");
|
||||
final String timeZone = (String) args.get("timeZone");
|
||||
final String existingFormat = (String) args.get("existingFormat");
|
||||
@@ -36,4 +36,24 @@ public class DateAddFilter extends AbstractDate implements Filter {
|
||||
|
||||
return format(plus, args, context);
|
||||
}
|
||||
|
||||
public static Long getAsLong(Object value, int lineNumber, PebbleTemplate self) {
|
||||
if (value instanceof Long longValue) {
|
||||
return longValue;
|
||||
} else if (value instanceof Integer integerValue) {
|
||||
return integerValue.longValue();
|
||||
} else if (value instanceof Number numberValue) {
|
||||
return numberValue.longValue();
|
||||
} else if (value instanceof String stringValue) {
|
||||
try {
|
||||
return Long.parseLong(stringValue);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new PebbleException(e, "%s can't be converted to long".formatted(stringValue),
|
||||
lineNumber, self != null ? self.getName() : "Unknown");
|
||||
}
|
||||
}
|
||||
throw new PebbleException(null, "Incorrect %s format, must be a number".formatted(value),
|
||||
lineNumber, self != null ? self.getName() : "Unknown");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,33 @@
|
||||
package io.kestra.core.runners.pebble.filters;
|
||||
|
||||
import io.kestra.core.runners.pebble.AbstractDate;
|
||||
import io.pebbletemplates.pebble.error.PebbleException;
|
||||
import io.pebbletemplates.pebble.extension.Filter;
|
||||
import io.pebbletemplates.pebble.template.EvaluationContext;
|
||||
import io.pebbletemplates.pebble.template.PebbleTemplate;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class TimestampMilliFilter extends AbstractDate implements Filter {
|
||||
@Override
|
||||
public List<String> getArgumentNames() {
|
||||
return List.of("timeZone", "existingFormat");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object apply(Object input, Map<String, Object> args, PebbleTemplate self, EvaluationContext context, int lineNumber) throws PebbleException {
|
||||
if (input == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final String timeZone = (String) args.get("timeZone");
|
||||
final String existingFormat = (String) args.get("existingFormat");
|
||||
|
||||
ZoneId zoneId = zoneId(timeZone);
|
||||
ZonedDateTime date = convert(input, zoneId, existingFormat);
|
||||
|
||||
return date.toInstant().toEpochMilli();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
package io.kestra.core.runners.pebble.functions;
|
||||
|
||||
import com.github.ksuid.KsuidGenerator;
|
||||
import io.pebbletemplates.pebble.extension.Function;
|
||||
import io.pebbletemplates.pebble.template.EvaluationContext;
|
||||
import io.pebbletemplates.pebble.template.PebbleTemplate;
|
||||
|
||||
import java.security.SecureRandom;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* This function implements the 'ksuid' function which generates a K-Sortable Unique IDentifier.
|
||||
* KSUID is a 20-byte identifier: 4 bytes of timestamp + 16 random bytes, encoded as base62.
|
||||
*
|
||||
* @see <a href="https://github.com/segmentio/ksuid">https://github.com/segmentio/ksuid</a>
|
||||
* @see <a href="https://github.com/ksuid/ksuid">https://github.com/ksuid/ksuid</a>
|
||||
*/
|
||||
public class KSUIDFunction implements Function {
|
||||
private static final KsuidGenerator KSUID_GENERATOR = new KsuidGenerator(new SecureRandom());
|
||||
|
||||
@Override
|
||||
public Object execute(
|
||||
Map<String, Object> args, PebbleTemplate self, EvaluationContext context, int lineNumber) {
|
||||
return generateKsuid();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getArgumentNames() {
|
||||
return List.of();
|
||||
}
|
||||
|
||||
private String generateKsuid() {
|
||||
return KSUID_GENERATOR.newKsuid().toString();
|
||||
}
|
||||
}
|
||||
@@ -1,18 +1,21 @@
|
||||
package io.kestra.core.runners.pebble.functions;
|
||||
|
||||
import com.fasterxml.uuid.Generators;
|
||||
import com.fasterxml.uuid.impl.TimeBasedEpochRandomGenerator;
|
||||
import io.pebbletemplates.pebble.extension.Function;
|
||||
import io.pebbletemplates.pebble.template.EvaluationContext;
|
||||
import io.pebbletemplates.pebble.template.PebbleTemplate;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
public class UUIDFunction implements Function {
|
||||
|
||||
private static final TimeBasedEpochRandomGenerator generator = Generators.timeBasedEpochRandomGenerator();
|
||||
|
||||
@Override
|
||||
public Object execute(
|
||||
Map<String, Object> args, PebbleTemplate self, EvaluationContext context, int lineNumber) {
|
||||
return UUID.randomUUID().toString();
|
||||
return generator.generate().toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -8,6 +8,7 @@ import io.kestra.core.events.CrudEventType;
|
||||
import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
@@ -29,10 +30,7 @@ import io.kestra.core.server.Service;
|
||||
import io.kestra.core.server.ServiceStateChangeEvent;
|
||||
import io.kestra.core.server.ServiceType;
|
||||
import io.kestra.core.services.*;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.Either;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.*;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.event.ApplicationEventPublisher;
|
||||
import io.micronaut.core.util.CollectionUtils;
|
||||
@@ -91,7 +89,9 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
private volatile Boolean isReady = false;
|
||||
|
||||
private final ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||
private ScheduledFuture<?> scheduledFuture;
|
||||
private final ScheduledExecutorService executionMonitorExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||
private ScheduledFuture<?> executionMonitorFuture;
|
||||
|
||||
@Getter
|
||||
protected SchedulerTriggerStateInterface triggerState;
|
||||
@@ -152,7 +152,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
this.flowListeners.run();
|
||||
this.flowListeners.listen(this::initializedTriggers);
|
||||
|
||||
ScheduledFuture<?> evaluationLoop = scheduleExecutor.scheduleAtFixedRate(
|
||||
scheduledFuture = scheduleExecutor.scheduleAtFixedRate(
|
||||
this::handle,
|
||||
0,
|
||||
1,
|
||||
@@ -162,10 +162,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
// look at exception on the evaluation loop thread
|
||||
Thread.ofVirtual().name("scheduler-evaluation-loop-watch").start(
|
||||
() -> {
|
||||
Await.until(evaluationLoop::isDone);
|
||||
Await.until(scheduledFuture::isDone);
|
||||
|
||||
try {
|
||||
evaluationLoop.get();
|
||||
scheduledFuture.get();
|
||||
} catch (CancellationException ignored) {
|
||||
|
||||
} catch (ExecutionException | InterruptedException e) {
|
||||
@@ -177,7 +177,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
);
|
||||
|
||||
// Periodically report metrics and logs of running executions
|
||||
ScheduledFuture<?> monitoringLoop = executionMonitorExecutor.scheduleWithFixedDelay(
|
||||
executionMonitorFuture = executionMonitorExecutor.scheduleWithFixedDelay(
|
||||
this::executionMonitor,
|
||||
30,
|
||||
10,
|
||||
@@ -187,10 +187,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
// look at exception on the monitoring loop thread
|
||||
Thread.ofVirtual().name("scheduler-monitoring-loop-watch").start(
|
||||
() -> {
|
||||
Await.until(monitoringLoop::isDone);
|
||||
Await.until(executionMonitorFuture::isDone);
|
||||
|
||||
try {
|
||||
monitoringLoop.get();
|
||||
executionMonitorFuture.get();
|
||||
} catch (CancellationException ignored) {
|
||||
|
||||
} catch (ExecutionException | InterruptedException e) {
|
||||
@@ -318,7 +318,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
}
|
||||
|
||||
synchronized (this) { // we need a sync block as we read then update so we should not do it in multiple threads concurrently
|
||||
List<Trigger> triggers = triggerState.findAllForAllTenants();
|
||||
Map<String, Trigger> triggers = triggerState.findAllForAllTenants().stream().collect(Collectors.toMap(HasUID::uid, Function.identity()));
|
||||
|
||||
flows
|
||||
.stream()
|
||||
@@ -328,7 +328,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
.flatMap(flow -> flow.getTriggers().stream().filter(trigger -> trigger instanceof WorkerTriggerInterface).map(trigger -> new FlowAndTrigger(flow, trigger)))
|
||||
.distinct()
|
||||
.forEach(flowAndTrigger -> {
|
||||
Optional<Trigger> trigger = triggers.stream().filter(t -> t.uid().equals(Trigger.uid(flowAndTrigger.flow(), flowAndTrigger.trigger()))).findFirst(); // must have one or none
|
||||
String triggerUid = Trigger.uid(flowAndTrigger.flow(), flowAndTrigger.trigger());
|
||||
Optional<Trigger> trigger = Optional.ofNullable(triggers.get(triggerUid));
|
||||
if (trigger.isEmpty()) {
|
||||
RunContext runContext = runContextFactory.of(flowAndTrigger.flow(), flowAndTrigger.trigger());
|
||||
ConditionContext conditionContext = conditionService.conditionContext(runContext, flowAndTrigger.flow(), null);
|
||||
@@ -467,9 +468,12 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
|
||||
private List<FlowWithTriggers> computeSchedulable(List<FlowWithSource> flows, List<Trigger> triggerContextsToEvaluate, ScheduleContextInterface scheduleContext) {
|
||||
List<String> flowToKeep = triggerContextsToEvaluate.stream().map(Trigger::getFlowId).toList();
|
||||
List<String> flowIds = flows.stream().map(FlowId::uidWithoutRevision).toList();
|
||||
Map<String, Trigger> triggerById = triggerContextsToEvaluate.stream().collect(Collectors.toMap(HasUID::uid, Function.identity()));
|
||||
|
||||
// delete trigger which flow has been deleted
|
||||
triggerContextsToEvaluate.stream()
|
||||
.filter(trigger -> !flows.stream().map(FlowId::uidWithoutRevision).toList().contains(FlowId.uid(trigger)))
|
||||
.filter(trigger -> !flowIds.contains(FlowId.uid(trigger)))
|
||||
.forEach(trigger -> {
|
||||
try {
|
||||
this.triggerState.delete(trigger);
|
||||
@@ -491,12 +495,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
.map(abstractTrigger -> {
|
||||
RunContext runContext = runContextFactory.of(flow, abstractTrigger);
|
||||
ConditionContext conditionContext = conditionService.conditionContext(runContext, flow, null);
|
||||
Trigger triggerContext = null;
|
||||
Trigger lastTrigger = triggerContextsToEvaluate
|
||||
.stream()
|
||||
.filter(triggerContextToFind -> triggerContextToFind.uid().equals(Trigger.uid(flow, abstractTrigger)))
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
Trigger triggerContext;
|
||||
Trigger lastTrigger = triggerById.get(Trigger.uid(flow, abstractTrigger));
|
||||
// If a trigger is not found in triggers to evaluate, then we ignore it
|
||||
if (lastTrigger == null) {
|
||||
return null;
|
||||
@@ -1006,8 +1006,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
|
||||
setState(ServiceState.TERMINATING);
|
||||
this.receiveCancellations.forEach(Runnable::run);
|
||||
this.scheduleExecutor.shutdown();
|
||||
this.executionMonitorExecutor.shutdown();
|
||||
ExecutorsUtils.closeScheduledThreadPool(this.scheduleExecutor, Duration.ofSeconds(5), List.of(scheduledFuture));
|
||||
ExecutorsUtils.closeScheduledThreadPool(executionMonitorExecutor, Duration.ofSeconds(5), List.of(executionMonitorFuture));
|
||||
try {
|
||||
if (onClose != null) {
|
||||
onClose.run();
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user