mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 11:12:12 -05:00
Compare commits
271 Commits
fix/filter
...
feat/worke
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8ea8d86516 | ||
|
|
a6844e0ecf | ||
|
|
f71574cfb5 | ||
|
|
c5341e56e9 | ||
|
|
88b0723147 | ||
|
|
f79fcf5734 | ||
|
|
cf27827f20 | ||
|
|
408b6b97a7 | ||
|
|
d57753e62b | ||
|
|
2571eaf56c | ||
|
|
37ea7f31a0 | ||
|
|
478c911718 | ||
|
|
1bce0d673f | ||
|
|
609a5b8066 | ||
|
|
6182015a6f | ||
|
|
6f8044f347 | ||
|
|
b3b7596bf4 | ||
|
|
36b1c14424 | ||
|
|
1aef9578d9 | ||
|
|
6a07e3c048 | ||
|
|
b643954921 | ||
|
|
fe1ae290d0 | ||
|
|
6ae2fde78f | ||
|
|
260f5c427b | ||
|
|
f2dbc41cdb | ||
|
|
39fdb7ed5d | ||
|
|
c6b9c445c5 | ||
|
|
da8992f130 | ||
|
|
e448690086 | ||
|
|
3929bf6172 | ||
|
|
ab9951466d | ||
|
|
ef59a6de26 | ||
|
|
0a64ae7e63 | ||
|
|
8c3cd2856a | ||
|
|
6def8ef831 | ||
|
|
0cc1bffc20 | ||
|
|
3bdf55a649 | ||
|
|
767a375292 | ||
|
|
1509ce9b98 | ||
|
|
5a3f3d3312 | ||
|
|
6394c337ae | ||
|
|
be4518466f | ||
|
|
543bed48c9 | ||
|
|
5e57d11b73 | ||
|
|
98189392a2 | ||
|
|
ac9a01964a | ||
|
|
8479323f97 | ||
|
|
4b80b92423 | ||
|
|
2e7d714bcb | ||
|
|
73cf7f04fb | ||
|
|
ac0ab7e8fa | ||
|
|
c1876e69ed | ||
|
|
cf73a80f2e | ||
|
|
53687f4a1f | ||
|
|
749bf94125 | ||
|
|
25a7994f63 | ||
|
|
e03c894f3a | ||
|
|
99772c1a48 | ||
|
|
93d6b816bf | ||
|
|
a3b0512bec | ||
|
|
265f72b629 | ||
|
|
07a8d9a665 | ||
|
|
59bd607db2 | ||
|
|
1618815df4 | ||
|
|
a2c3799ab7 | ||
|
|
986a2b4d11 | ||
|
|
cdd591dab7 | ||
|
|
9f5cf5aeb9 | ||
|
|
cc5f73ae06 | ||
|
|
e461e46a1c | ||
|
|
fa6da9bd0b | ||
|
|
3cb6815eac | ||
|
|
bde9972b26 | ||
|
|
bc828efec9 | ||
|
|
c62f503f1a | ||
|
|
15a6323122 | ||
|
|
21cb7b497d | ||
|
|
26cb6ef9ad | ||
|
|
95c438515d | ||
|
|
194ae826e5 | ||
|
|
31dbecec77 | ||
|
|
b39bcce2e8 | ||
|
|
95ac5ce8a7 | ||
|
|
90f913815d | ||
|
|
5944db5cc8 | ||
|
|
577f813eef | ||
|
|
06a9f13676 | ||
|
|
1fd6e23f96 | ||
|
|
9a32780c8c | ||
|
|
af140baa66 | ||
|
|
54b0183b95 | ||
|
|
64de3d5fa8 | ||
|
|
4c17aadb81 | ||
|
|
bf424fbf53 | ||
|
|
edcdb88559 | ||
|
|
9a9d0b995a | ||
|
|
5c5d313fb0 | ||
|
|
dfd4d87867 | ||
|
|
367d773a86 | ||
|
|
c819f15c66 | ||
|
|
673b5c994c | ||
|
|
2acf37e0e6 | ||
|
|
0d7fcbb936 | ||
|
|
42b01d6951 | ||
|
|
9edfb01920 | ||
|
|
7813337f48 | ||
|
|
ea0342f82a | ||
|
|
ca8f25108e | ||
|
|
49b6c331a6 | ||
|
|
e409fb7ac0 | ||
|
|
0b64c29794 | ||
|
|
c4665460aa | ||
|
|
5423b6e3a7 | ||
|
|
114669e1b5 | ||
|
|
d75f0ced38 | ||
|
|
0a788d8429 | ||
|
|
8c25d1bbd7 | ||
|
|
4e2e8f294f | ||
|
|
2c34804ce2 | ||
|
|
bab4eef790 | ||
|
|
94aa628ac1 | ||
|
|
da180fbc00 | ||
|
|
c7bd592bc7 | ||
|
|
693d174960 | ||
|
|
8ee492b9c5 | ||
|
|
d6b8ba34ea | ||
|
|
08cc853e00 | ||
|
|
4f68715483 | ||
|
|
edde1b6730 | ||
|
|
399446f52e | ||
|
|
c717890fbc | ||
|
|
5328b0c574 | ||
|
|
de14cae1f0 | ||
|
|
d8a3e703e7 | ||
|
|
90659bc320 | ||
|
|
37d1d8856e | ||
|
|
93a4eb5cbc | ||
|
|
de160c8a2d | ||
|
|
28458b59eb | ||
|
|
2a256d9505 | ||
|
|
9008b21007 | ||
|
|
8c13bf6a71 | ||
|
|
43888cc3dd | ||
|
|
c94093d9f6 | ||
|
|
8779dec28a | ||
|
|
41614c3a6e | ||
|
|
6b4fdd0688 | ||
|
|
0319f3d267 | ||
|
|
0b37fe2cb8 | ||
|
|
e623dd7729 | ||
|
|
db4f7cb4ff | ||
|
|
b14b16db0e | ||
|
|
77f6cec0e4 | ||
|
|
1748b18d66 | ||
|
|
32f96348c1 | ||
|
|
07db0a8c80 | ||
|
|
2035fd42c3 | ||
|
|
2856bf07e8 | ||
|
|
f5327cec33 | ||
|
|
42955936b2 | ||
|
|
771b98e023 | ||
|
|
b80e8487e3 | ||
|
|
f35a0b6d60 | ||
|
|
0c9ed17f1c | ||
|
|
7ca20371f8 | ||
|
|
8ff3454cbd | ||
|
|
09593d9fd2 | ||
|
|
d3cccf36f0 | ||
|
|
eeb91cd9ed | ||
|
|
2679b0f067 | ||
|
|
54281864c8 | ||
|
|
e4f9b11d0c | ||
|
|
12cef0593c | ||
|
|
c6cf8f307f | ||
|
|
3b4eb55f84 | ||
|
|
d32949985d | ||
|
|
c051ca2e66 | ||
|
|
93a456963b | ||
|
|
9a45f17680 | ||
|
|
5fb6806d74 | ||
|
|
f3cff72edd | ||
|
|
0abc660e7d | ||
|
|
f09ca3d92e | ||
|
|
9fd778fca1 | ||
|
|
667af25e1b | ||
|
|
1b1aed5ff1 | ||
|
|
da1bb58199 | ||
|
|
d3e661f9f8 | ||
|
|
2126c8815e | ||
|
|
6cfc5b8799 | ||
|
|
16d44034f0 | ||
|
|
f76e62a4af | ||
|
|
f6645da94c | ||
|
|
93b2bbf0d0 | ||
|
|
9d46e2aece | ||
|
|
133315a2a5 | ||
|
|
b96b9bb414 | ||
|
|
9865d8a7dc | ||
|
|
29f22c2f81 | ||
|
|
3e69469381 | ||
|
|
38c24ccf7f | ||
|
|
12cf41a309 | ||
|
|
7b8ea0d885 | ||
|
|
cf88bbcb12 | ||
|
|
6abe7f96e7 | ||
|
|
e73ac78d8b | ||
|
|
b0687eb702 | ||
|
|
85f9070f56 | ||
|
|
0a42ab40ec | ||
|
|
856d2d1d51 | ||
|
|
a7d6dbc8a3 | ||
|
|
cf82109da6 | ||
|
|
d4168ba424 | ||
|
|
46a294f25a | ||
|
|
a229036d8d | ||
|
|
a518fefecd | ||
|
|
1d3210fd7d | ||
|
|
597f84ecb7 | ||
|
|
5f3c7ac9f0 | ||
|
|
77c4691b04 | ||
|
|
6d34416529 | ||
|
|
40a67d5dcd | ||
|
|
2c68c704f6 | ||
|
|
e59d9f622c | ||
|
|
c951ba39a7 | ||
|
|
a0200cfacb | ||
|
|
c6310f0697 | ||
|
|
21ba59a525 | ||
|
|
4f9e3cd06c | ||
|
|
e74010d1a4 | ||
|
|
465e6467e9 | ||
|
|
c68c1b16d9 | ||
|
|
468c32156e | ||
|
|
6e0a1c61ef | ||
|
|
552d55ef6b | ||
|
|
08b0b682bf | ||
|
|
cff90c93bb | ||
|
|
ea465056d0 | ||
|
|
02f150f0b0 | ||
|
|
95d95d3d3c | ||
|
|
6b8d3d6928 | ||
|
|
1e347073ca | ||
|
|
ac09dcecd9 | ||
|
|
40b337cd22 | ||
|
|
5377d16036 | ||
|
|
f717bc413f | ||
|
|
d6bed2d235 | ||
|
|
07fd74b238 | ||
|
|
60eef29de2 | ||
|
|
20ca7b6380 | ||
|
|
9d82df61c6 | ||
|
|
e78210b5eb | ||
|
|
83143fae83 | ||
|
|
25f5ccc6b5 | ||
|
|
cf3e49a284 | ||
|
|
9a72d378df | ||
|
|
752a927fac | ||
|
|
4053392921 | ||
|
|
8b0483643a | ||
|
|
5feeb41c7a | ||
|
|
d7f5e5c05d | ||
|
|
4840f723fc | ||
|
|
8cf159b281 | ||
|
|
4c79576113 | ||
|
|
f87f2ed753 | ||
|
|
298a6c7ca8 | ||
|
|
ab464fff6e | ||
|
|
6dcba16314 | ||
|
|
80a328e87e | ||
|
|
f2034f4975 | ||
|
|
edca56d168 |
4
.github/workflows/auto-translate-ui-keys.yml
vendored
4
.github/workflows/auto-translate-ui-keys.yml
vendored
@@ -2,7 +2,7 @@ name: Auto-Translate UI keys and create PR
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: "0 9-21 * * *" # Every hour from 9 AM to 9 PM
|
||||
- cron: "0 9-21/3 * * *" # Every 3 hours from 9 AM to 9 PM
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
retranslate_modified_keys:
|
||||
@@ -20,7 +20,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 10
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
name: Checkout
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
2
.github/workflows/codeql-analysis.yml
vendored
2
.github/workflows/codeql-analysis.yml
vendored
@@ -27,7 +27,7 @@ jobs:
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
# We must fetch at least the immediate parents so that if this is
|
||||
# a pull request then we can checkout the head.
|
||||
|
||||
147
.github/workflows/docker.yml
vendored
147
.github/workflows/docker.yml
vendored
@@ -1,147 +0,0 @@
|
||||
name: Create Docker images on Release
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
retag-latest:
|
||||
description: 'Retag latest Docker images'
|
||||
required: true
|
||||
type: string
|
||||
default: "false"
|
||||
options:
|
||||
- "true"
|
||||
- "false"
|
||||
release-tag:
|
||||
description: 'Kestra Release Tag'
|
||||
required: false
|
||||
type: string
|
||||
plugin-version:
|
||||
description: 'Plugin version'
|
||||
required: false
|
||||
type: string
|
||||
default: "LATEST"
|
||||
env:
|
||||
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
jobs:
|
||||
plugins:
|
||||
name: List Plugins
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
plugins: ${{ steps.plugins.outputs.plugins }}
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
# Get Plugins List
|
||||
- name: Get Plugins List
|
||||
uses: ./.github/actions/plugins-list
|
||||
id: plugins
|
||||
with:
|
||||
plugin-version: ${{ env.PLUGIN_VERSION }}
|
||||
docker:
|
||||
name: Publish Docker
|
||||
needs: [ plugins ]
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
image:
|
||||
- name: "-no-plugins"
|
||||
plugins: ""
|
||||
packages: jattach
|
||||
python-libs: ""
|
||||
- name: ""
|
||||
plugins: ${{needs.plugins.outputs.plugins}}
|
||||
packages: python3 python-is-python3 python3-pip curl jattach
|
||||
python-libs: kestra
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
# Vars
|
||||
- name: Set image name
|
||||
id: vars
|
||||
run: |
|
||||
if [[ "${{ inputs.release-tag }}" == "" ]]; then
|
||||
TAG=${GITHUB_REF#refs/*/}
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
else
|
||||
TAG="${{ inputs.release-tag }}"
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
if [[ "${{ env.PLUGIN_VERSION }}" == *"-SNAPSHOT" ]]; then
|
||||
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ ${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT;
|
||||
else
|
||||
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
# Download release
|
||||
- name: Download release
|
||||
uses: robinraju/release-downloader@v1.12
|
||||
with:
|
||||
tag: ${{steps.vars.outputs.tag}}
|
||||
fileName: 'kestra-*'
|
||||
out-file-path: build/executable
|
||||
|
||||
- name: Copy exe to image
|
||||
run: |
|
||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||
|
||||
# Docker setup
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Docker - Fix Qemu
|
||||
shell: bash
|
||||
run: |
|
||||
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
# Docker Login
|
||||
- name: Login to DockerHub
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
username: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
|
||||
# Docker Build and push
|
||||
- name: Push to Docker Hub
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
context: .
|
||||
push: true
|
||||
tags: ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }}
|
||||
platforms: linux/amd64,linux/arm64
|
||||
build-args: |
|
||||
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }}
|
||||
APT_PACKAGES=${{ matrix.image.packages }}
|
||||
PYTHON_LIBRARIES=${{ matrix.image.python-libs }}
|
||||
|
||||
- name: Install regctl
|
||||
if: github.event.inputs.retag-latest == 'true'
|
||||
uses: regclient/actions/regctl-installer@main
|
||||
|
||||
- name: Retag to latest
|
||||
if: github.event.inputs.retag-latest == 'true'
|
||||
run: |
|
||||
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
|
||||
|
||||
end:
|
||||
runs-on: ubuntu-latest
|
||||
needs:
|
||||
- docker
|
||||
if: always()
|
||||
env:
|
||||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
steps:
|
||||
|
||||
# Slack
|
||||
- name: Slack notification
|
||||
uses: Gamesight/slack-workflow-status@master
|
||||
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
|
||||
with:
|
||||
repo_token: ${{ secrets.GITHUB_TOKEN }}
|
||||
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
name: GitHub Actions
|
||||
icon_emoji: ':github-actions:'
|
||||
channel: 'C02DQ1A7JLR' # _int_git channel
|
||||
4
.github/workflows/e2e.yml
vendored
4
.github/workflows/e2e.yml
vendored
@@ -19,7 +19,7 @@ on:
|
||||
default: "no input"
|
||||
jobs:
|
||||
check:
|
||||
timeout-minutes: 10
|
||||
timeout-minutes: 15
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
|
||||
@@ -32,7 +32,7 @@ jobs:
|
||||
password: ${{ github.token }}
|
||||
|
||||
- name: Checkout kestra
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
path: kestra
|
||||
|
||||
|
||||
4
.github/workflows/gradle-release-plugins.yml
vendored
4
.github/workflows/gradle-release-plugins.yml
vendored
@@ -21,12 +21,12 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
|
||||
4
.github/workflows/gradle-release.yml
vendored
4
.github/workflows/gradle-release.yml
vendored
@@ -33,13 +33,13 @@ jobs:
|
||||
exit 1;
|
||||
fi
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
path: kestra
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
|
||||
10
.github/workflows/main.yml
vendored
10
.github/workflows/main.yml
vendored
@@ -4,9 +4,8 @@ on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
default: 'LATEST'
|
||||
required: true
|
||||
description: "plugins version"
|
||||
required: false
|
||||
type: string
|
||||
push:
|
||||
branches:
|
||||
@@ -34,7 +33,7 @@ jobs:
|
||||
if: "!startsWith(github.ref, 'refs/heads/releases')"
|
||||
uses: ./.github/workflows/workflow-release.yml
|
||||
with:
|
||||
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
plugin-version: ${{ inputs.plugin-version != '' && inputs.plugin-version || (github.ref == 'refs/heads/develop' && 'LATEST-SNAPSHOT' || 'LATEST') }}
|
||||
secrets:
|
||||
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
@@ -43,7 +42,8 @@ jobs:
|
||||
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
|
||||
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
|
||||
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
|
||||
|
||||
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
|
||||
end:
|
||||
runs-on: ubuntu-latest
|
||||
needs:
|
||||
|
||||
2
.github/workflows/setversion-tag-plugins.yml
vendored
2
.github/workflows/setversion-tag-plugins.yml
vendored
@@ -17,7 +17,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
|
||||
2
.github/workflows/setversion-tag.yml
vendored
2
.github/workflows/setversion-tag.yml
vendored
@@ -34,7 +34,7 @@ jobs:
|
||||
fi
|
||||
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
|
||||
12
.github/workflows/vulnerabilities-check.yml
vendored
12
.github/workflows/vulnerabilities-check.yml
vendored
@@ -17,12 +17,12 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
@@ -66,12 +66,12 @@ jobs:
|
||||
actions: read
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
@@ -111,12 +111,12 @@ jobs:
|
||||
actions: read
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
path: actions
|
||||
|
||||
2
.github/workflows/workflow-backend-test.yml
vendored
2
.github/workflows/workflow-backend-test.yml
vendored
@@ -29,7 +29,7 @@ jobs:
|
||||
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
|
||||
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
name: Checkout - Current ref
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
76
.github/workflows/workflow-build-artifacts.yml
vendored
76
.github/workflows/workflow-build-artifacts.yml
vendored
@@ -1,23 +1,7 @@
|
||||
name: Build Artifacts
|
||||
|
||||
on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
default: 'LATEST'
|
||||
required: true
|
||||
type: string
|
||||
outputs:
|
||||
docker-tag:
|
||||
value: ${{ jobs.build.outputs.docker-tag }}
|
||||
description: "The Docker image Tag for Kestra"
|
||||
docker-artifact-name:
|
||||
value: ${{ jobs.build.outputs.docker-artifact-name }}
|
||||
description: "The GitHub artifact containing the Kestra docker image name."
|
||||
plugins:
|
||||
value: ${{ jobs.build.outputs.plugins }}
|
||||
description: "The Kestra plugins list used for the build."
|
||||
workflow_call: {}
|
||||
|
||||
jobs:
|
||||
build:
|
||||
@@ -31,7 +15,7 @@ jobs:
|
||||
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
steps:
|
||||
- name: Checkout - Current ref
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
@@ -82,55 +66,6 @@ jobs:
|
||||
run: |
|
||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||
|
||||
# Docker Tag
|
||||
- name: Setup - Docker vars
|
||||
id: vars
|
||||
shell: bash
|
||||
run: |
|
||||
TAG=${GITHUB_REF#refs/*/}
|
||||
if [[ $TAG = "master" ]]
|
||||
then
|
||||
TAG="latest";
|
||||
elif [[ $TAG = "develop" ]]
|
||||
then
|
||||
TAG="develop";
|
||||
elif [[ $TAG = v* ]]
|
||||
then
|
||||
TAG="${TAG}";
|
||||
else
|
||||
TAG="build-${{ github.run_id }}";
|
||||
fi
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
echo "artifact=docker-kestra-${TAG}" >> $GITHUB_OUTPUT
|
||||
|
||||
# Docker setup
|
||||
- name: Docker - Setup QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Docker - Fix Qemu
|
||||
shell: bash
|
||||
run: |
|
||||
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
|
||||
|
||||
- name: Docker - Setup Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
# Docker Build
|
||||
- name: Docker - Build & export image
|
||||
uses: docker/build-push-action@v6
|
||||
if: "!startsWith(github.ref, 'refs/tags/v')"
|
||||
with:
|
||||
context: .
|
||||
push: false
|
||||
file: Dockerfile
|
||||
tags: |
|
||||
kestra/kestra:${{ steps.vars.outputs.tag }}
|
||||
build-args: |
|
||||
KESTRA_PLUGINS=${{ steps.plugins.outputs.plugins }}
|
||||
APT_PACKAGES=${{ env.DOCKER_APT_PACKAGES }}
|
||||
PYTHON_LIBRARIES=${{ env.DOCKER_PYTHON_LIBRARIES }}
|
||||
outputs: type=docker,dest=/tmp/${{ steps.vars.outputs.artifact }}.tar
|
||||
|
||||
# Upload artifacts
|
||||
- name: Artifacts - Upload JAR
|
||||
uses: actions/upload-artifact@v4
|
||||
@@ -143,10 +78,3 @@ jobs:
|
||||
with:
|
||||
name: exe
|
||||
path: build/executable/
|
||||
|
||||
- name: Artifacts - Upload Docker
|
||||
uses: actions/upload-artifact@v4
|
||||
if: "!startsWith(github.ref, 'refs/tags/v')"
|
||||
with:
|
||||
name: ${{ steps.vars.outputs.artifact }}
|
||||
path: /tmp/${{ steps.vars.outputs.artifact }}.tar
|
||||
|
||||
2
.github/workflows/workflow-frontend-test.yml
vendored
2
.github/workflows/workflow-frontend-test.yml
vendored
@@ -20,7 +20,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
|
||||
- name: Cache Node Modules
|
||||
id: cache-node-modules
|
||||
|
||||
15
.github/workflows/workflow-github-release.yml
vendored
15
.github/workflows/workflow-github-release.yml
vendored
@@ -1,14 +1,17 @@
|
||||
name: Github - Release
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
workflow_call:
|
||||
secrets:
|
||||
GH_PERSONAL_TOKEN:
|
||||
description: "The Github personal token."
|
||||
required: true
|
||||
push:
|
||||
tags:
|
||||
- '*'
|
||||
SLACK_RELEASES_WEBHOOK_URL:
|
||||
description: "The Slack webhook URL."
|
||||
required: true
|
||||
|
||||
|
||||
|
||||
jobs:
|
||||
publish:
|
||||
@@ -17,14 +20,14 @@ jobs:
|
||||
steps:
|
||||
# Check out
|
||||
- name: Checkout - Repository
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
submodules: true
|
||||
|
||||
# Checkout GitHub Actions
|
||||
- name: Checkout - Actions
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
repository: kestra-io/actions
|
||||
sparse-checkout-cone-mode: true
|
||||
@@ -35,7 +38,7 @@ jobs:
|
||||
# Download Exec
|
||||
# Must be done after checkout actions
|
||||
- name: Artifacts - Download executable
|
||||
uses: actions/download-artifact@v4
|
||||
uses: actions/download-artifact@v5
|
||||
if: startsWith(github.ref, 'refs/tags/v')
|
||||
with:
|
||||
name: exe
|
||||
|
||||
210
.github/workflows/workflow-publish-docker.yml
vendored
210
.github/workflows/workflow-publish-docker.yml
vendored
@@ -1,22 +1,37 @@
|
||||
name: Publish - Docker
|
||||
name: Create Docker images on Release
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
default: 'LATEST'
|
||||
retag-latest:
|
||||
description: 'Retag latest Docker images'
|
||||
required: true
|
||||
type: choice
|
||||
default: "false"
|
||||
options:
|
||||
- "true"
|
||||
- "false"
|
||||
release-tag:
|
||||
description: 'Kestra Release Tag (by default, deduced with the ref)'
|
||||
required: false
|
||||
type: string
|
||||
plugin-version:
|
||||
description: 'Plugin version'
|
||||
required: false
|
||||
type: string
|
||||
default: "LATEST"
|
||||
force-download-artifact:
|
||||
description: 'Force download artifact'
|
||||
required: false
|
||||
type: string
|
||||
type: choice
|
||||
default: "true"
|
||||
options:
|
||||
- "true"
|
||||
- "false"
|
||||
workflow_call:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
description: "Plugin version"
|
||||
default: 'LATEST'
|
||||
required: false
|
||||
type: string
|
||||
@@ -33,47 +48,93 @@ on:
|
||||
description: "The Dockerhub password."
|
||||
required: true
|
||||
|
||||
env:
|
||||
PLUGIN_VERSION: ${{ inputs.plugin-version != null && inputs.plugin-version || 'LATEST' }}
|
||||
jobs:
|
||||
plugins:
|
||||
name: List Plugins
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
plugins: ${{ steps.plugins.outputs.plugins }}
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v5
|
||||
|
||||
# Get Plugins List
|
||||
- name: Get Plugins List
|
||||
uses: ./.github/actions/plugins-list
|
||||
id: plugins
|
||||
with: # remap LATEST-SNAPSHOT to LATEST
|
||||
plugin-version: ${{ env.PLUGIN_VERSION == 'LATEST-SNAPSHOT' && 'LATEST' || env.PLUGIN_VERSION }}
|
||||
|
||||
# ********************************************************************************************************************
|
||||
# Build
|
||||
# ********************************************************************************************************************
|
||||
build-artifacts:
|
||||
name: Build Artifacts
|
||||
if: ${{ github.event.inputs.force-download-artifact == 'true' }}
|
||||
if: ${{ inputs.force-download-artifact == 'true' }}
|
||||
uses: ./.github/workflows/workflow-build-artifacts.yml
|
||||
with:
|
||||
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
# ********************************************************************************************************************
|
||||
# Docker
|
||||
# ********************************************************************************************************************
|
||||
publish:
|
||||
name: Publish - Docker
|
||||
|
||||
docker:
|
||||
name: Publish Docker
|
||||
needs: [ plugins, build-artifacts ]
|
||||
if: always()
|
||||
runs-on: ubuntu-latest
|
||||
needs: build-artifacts
|
||||
if: |
|
||||
always() &&
|
||||
(needs.build-artifacts.result == 'success' ||
|
||||
github.event.inputs.force-download-artifact != 'true')
|
||||
env:
|
||||
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
strategy:
|
||||
matrix:
|
||||
image:
|
||||
- tag: -no-plugins
|
||||
- name: "-no-plugins"
|
||||
plugins: ""
|
||||
packages: jattach
|
||||
plugins: false
|
||||
python-libraries: ""
|
||||
|
||||
- tag: ""
|
||||
plugins: true
|
||||
packages: python3 python3-venv python-is-python3 python3-pip nodejs npm curl zip unzip jattach
|
||||
python-libraries: kestra
|
||||
python-libs: ""
|
||||
- name: ""
|
||||
plugins: ${{needs.plugins.outputs.plugins}}
|
||||
packages: python3 python-is-python3 python3-pip curl jattach
|
||||
python-libs: kestra
|
||||
steps:
|
||||
- name: Checkout - Current ref
|
||||
uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
|
||||
# Vars
|
||||
- name: Set image name
|
||||
id: vars
|
||||
run: |
|
||||
if [[ "${{ inputs.release-tag }}" == "" ]]; then
|
||||
TAG=${GITHUB_REF#refs/*/}
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
else
|
||||
TAG="${{ inputs.release-tag }}"
|
||||
echo "tag=${TAG}" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
if [[ $GITHUB_REF == refs/tags/* ]]; then
|
||||
if [[ $TAG =~ ^v[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
|
||||
# this will remove the patch version number
|
||||
MINOR_SEMVER=${TAG%.*}
|
||||
echo "minor_semver=${MINOR_SEMVER}" >> $GITHUB_OUTPUT
|
||||
else
|
||||
echo "Tag '$TAG' is not a valid semver (vMAJOR.MINOR.PATCH), skipping minor_semver"
|
||||
fi
|
||||
fi
|
||||
|
||||
if [[ "${{ env.PLUGIN_VERSION }}" == *"-SNAPSHOT" ]]; then
|
||||
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ ${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT;
|
||||
else
|
||||
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
# Download executable from artifact
|
||||
- name: Artifacts - Download executable
|
||||
uses: actions/download-artifact@v5
|
||||
with:
|
||||
name: exe
|
||||
path: build/executable
|
||||
|
||||
- name: Copy exe to image
|
||||
run: |
|
||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||
|
||||
# Docker setup
|
||||
- name: Docker - Setup QEMU
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Docker - Fix Qemu
|
||||
@@ -81,66 +142,59 @@ jobs:
|
||||
run: |
|
||||
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
|
||||
|
||||
- name: Docker - Setup Docker Buildx
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
# Docker Login
|
||||
- name: Docker - Login to DockerHub
|
||||
- name: Login to DockerHub
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
username: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
|
||||
|
||||
# # Get Plugins List
|
||||
- name: Plugins - Get List
|
||||
uses: ./.github/actions/plugins-list
|
||||
id: plugins-list
|
||||
if: ${{ matrix.image.plugins}}
|
||||
with:
|
||||
plugin-version: ${{ env.PLUGIN_VERSION }}
|
||||
|
||||
# Vars
|
||||
- name: Docker - Set variables
|
||||
shell: bash
|
||||
id: vars
|
||||
run: |
|
||||
TAG=${GITHUB_REF#refs/*/}
|
||||
PLUGINS="${{ matrix.image.plugins == true && steps.plugins-list.outputs.plugins || '' }}"
|
||||
if [[ $TAG == v* ]]; then
|
||||
TAG="${TAG}";
|
||||
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
|
||||
elif [[ $TAG = "develop" ]]; then
|
||||
TAG="develop";
|
||||
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ $PLUGINS" >> $GITHUB_OUTPUT
|
||||
else
|
||||
TAG="build-${{ github.run_id }}";
|
||||
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ $PLUGINS" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
echo "tag=${TAG}${{ matrix.image.tag }}" >> $GITHUB_OUTPUT
|
||||
|
||||
# Build Docker Image
|
||||
- name: Artifacts - Download executable
|
||||
uses: actions/download-artifact@v4
|
||||
with:
|
||||
name: exe
|
||||
path: build/executable
|
||||
|
||||
- name: Docker - Copy exe to image
|
||||
shell: bash
|
||||
run: |
|
||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||
|
||||
# Docker Build and push
|
||||
- name: Docker - Build image
|
||||
- name: Push to Docker Hub
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
context: .
|
||||
push: true
|
||||
tags: kestra/kestra:${{ steps.vars.outputs.tag }}
|
||||
tags: ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }}
|
||||
platforms: linux/amd64,linux/arm64
|
||||
build-args: |
|
||||
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }}
|
||||
APT_PACKAGES=${{ matrix.image.packages }}
|
||||
PYTHON_LIBRARIES=${{ matrix.image.python-libraries }}
|
||||
PYTHON_LIBRARIES=${{ matrix.image.python-libs }}
|
||||
|
||||
- name: Install regctl
|
||||
if: startsWith(github.ref, 'refs/tags/v')
|
||||
uses: regclient/actions/regctl-installer@main
|
||||
|
||||
- name: Retag to minor semver version
|
||||
if: startsWith(github.ref, 'refs/tags/v') && steps.vars.outputs.minor_semver != ''
|
||||
run: |
|
||||
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.minor_semver, matrix.image.name) }}
|
||||
|
||||
- name: Retag to latest
|
||||
if: startsWith(github.ref, 'refs/tags/v') && inputs.retag-latest == 'true'
|
||||
run: |
|
||||
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
|
||||
|
||||
end:
|
||||
runs-on: ubuntu-latest
|
||||
needs:
|
||||
- docker
|
||||
if: always()
|
||||
env:
|
||||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
steps:
|
||||
|
||||
# Slack
|
||||
- name: Slack notification
|
||||
uses: Gamesight/slack-workflow-status@master
|
||||
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
|
||||
with:
|
||||
repo_token: ${{ secrets.GITHUB_TOKEN }}
|
||||
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
name: GitHub Actions
|
||||
icon_emoji: ':github-actions:'
|
||||
channel: 'C02DQ1A7JLR' # _int_git channel
|
||||
2
.github/workflows/workflow-publish-maven.yml
vendored
2
.github/workflows/workflow-publish-maven.yml
vendored
@@ -25,7 +25,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout - Current ref
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
|
||||
# Setup build
|
||||
- name: Setup - Build
|
||||
|
||||
16
.github/workflows/workflow-pullrequest-delete-docker.yml
vendored
Normal file
16
.github/workflows/workflow-pullrequest-delete-docker.yml
vendored
Normal file
@@ -0,0 +1,16 @@
|
||||
name: Pull Request - Delete Docker
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
types: [closed]
|
||||
|
||||
jobs:
|
||||
publish:
|
||||
name: Pull Request - Delete Docker
|
||||
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: dataaxiom/ghcr-cleanup-action@v1
|
||||
with:
|
||||
package: kestra-pr
|
||||
delete-tags: ${{ github.event.pull_request.number }}
|
||||
78
.github/workflows/workflow-pullrequest-publish-docker.yml
vendored
Normal file
78
.github/workflows/workflow-pullrequest-publish-docker.yml
vendored
Normal file
@@ -0,0 +1,78 @@
|
||||
name: Pull Request - Publish Docker
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
branches:
|
||||
- develop
|
||||
|
||||
jobs:
|
||||
build-artifacts:
|
||||
name: Build Artifacts
|
||||
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
|
||||
uses: ./.github/workflows/workflow-build-artifacts.yml
|
||||
|
||||
publish:
|
||||
name: Publish Docker
|
||||
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
|
||||
runs-on: ubuntu-latest
|
||||
needs: build-artifacts
|
||||
env:
|
||||
GITHUB_IMAGE_PATH: "ghcr.io/kestra-io/kestra-pr"
|
||||
steps:
|
||||
- name: Checkout - Current ref
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
# Docker setup
|
||||
- name: Docker - Setup QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Docker - Setup Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
# Docker Login
|
||||
- name: Login to GHCR
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: ghcr.io
|
||||
username: ${{ github.actor }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
# Build Docker Image
|
||||
- name: Artifacts - Download executable
|
||||
uses: actions/download-artifact@v5
|
||||
with:
|
||||
name: exe
|
||||
path: build/executable
|
||||
|
||||
- name: Docker - Copy exe to image
|
||||
shell: bash
|
||||
run: |
|
||||
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
|
||||
|
||||
- name: Docker - Build image
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
context: .
|
||||
file: ./Dockerfile.pr
|
||||
push: true
|
||||
tags: ${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }}
|
||||
platforms: linux/amd64,linux/arm64
|
||||
|
||||
# Add comment on pull request
|
||||
- name: Add comment to PR
|
||||
uses: actions/github-script@v7
|
||||
with:
|
||||
github-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
script: |
|
||||
await github.rest.issues.createComment({
|
||||
issue_number: context.issue.number,
|
||||
owner: context.repo.owner,
|
||||
repo: context.repo.repo,
|
||||
body: `**🐋 Docker image**: \`${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }}\`\n` +
|
||||
`\n` +
|
||||
`\`\`\`bash\n` +
|
||||
`docker run --pull=always --rm -it -p 8080:8080 --user=root -v /var/run/docker.sock:/var/run/docker.sock -v /tmp:/tmp ${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }} server local\n` +
|
||||
`\`\`\``
|
||||
})
|
||||
19
.github/workflows/workflow-release.yml
vendored
19
.github/workflows/workflow-release.yml
vendored
@@ -4,7 +4,7 @@ on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
description: "plugins version"
|
||||
default: 'LATEST'
|
||||
required: false
|
||||
type: string
|
||||
@@ -16,7 +16,7 @@ on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
plugin-version:
|
||||
description: "Kestra version"
|
||||
description: "plugins version"
|
||||
default: 'LATEST'
|
||||
required: false
|
||||
type: string
|
||||
@@ -42,21 +42,25 @@ on:
|
||||
SONATYPE_GPG_FILE:
|
||||
description: "The Sonatype GPG file."
|
||||
required: true
|
||||
GH_PERSONAL_TOKEN:
|
||||
description: "GH personnal Token."
|
||||
required: true
|
||||
SLACK_RELEASES_WEBHOOK_URL:
|
||||
description: "Slack webhook for releases channel."
|
||||
required: true
|
||||
jobs:
|
||||
build-artifacts:
|
||||
name: Build - Artifacts
|
||||
uses: ./.github/workflows/workflow-build-artifacts.yml
|
||||
with:
|
||||
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
|
||||
Docker:
|
||||
name: Publish Docker
|
||||
needs: build-artifacts
|
||||
uses: ./.github/workflows/workflow-publish-docker.yml
|
||||
if: startsWith(github.ref, 'refs/heads/develop') || github.event.inputs.publish-docker == 'true'
|
||||
if: github.ref == 'refs/heads/develop' || inputs.publish-docker == 'true'
|
||||
with:
|
||||
force-download-artifact: 'false'
|
||||
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
|
||||
plugin-version: ${{ inputs.plugin-version != null && inputs.plugin-version || 'LATEST' }}
|
||||
secrets:
|
||||
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
@@ -77,4 +81,5 @@ jobs:
|
||||
if: startsWith(github.ref, 'refs/tags/v')
|
||||
uses: ./.github/workflows/workflow-github-release.yml
|
||||
secrets:
|
||||
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
|
||||
2
.github/workflows/workflow-test.yml
vendored
2
.github/workflows/workflow-test.yml
vendored
@@ -27,7 +27,7 @@ jobs:
|
||||
ui: ${{ steps.changes.outputs.ui }}
|
||||
backend: ${{ steps.changes.outputs.backend }}
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v5
|
||||
if: "!startsWith(github.ref, 'refs/tags/v')"
|
||||
- uses: dorny/paths-filter@v3
|
||||
if: "!startsWith(github.ref, 'refs/tags/v')"
|
||||
|
||||
7
.plugins
7
.plugins
@@ -19,6 +19,7 @@
|
||||
#plugin-databricks:io.kestra.plugin:plugin-databricks:LATEST
|
||||
#plugin-datahub:io.kestra.plugin:plugin-datahub:LATEST
|
||||
#plugin-dataform:io.kestra.plugin:plugin-dataform:LATEST
|
||||
#plugin-datagen:io.kestra.plugin:plugin-datagen:LATEST
|
||||
#plugin-dbt:io.kestra.plugin:plugin-dbt:LATEST
|
||||
#plugin-debezium:io.kestra.plugin:plugin-debezium-db2:LATEST
|
||||
#plugin-debezium:io.kestra.plugin:plugin-debezium-mongodb:LATEST
|
||||
@@ -26,6 +27,7 @@
|
||||
#plugin-debezium:io.kestra.plugin:plugin-debezium-oracle:LATEST
|
||||
#plugin-debezium:io.kestra.plugin:plugin-debezium-postgres:LATEST
|
||||
#plugin-debezium:io.kestra.plugin:plugin-debezium-sqlserver:LATEST
|
||||
#plugin-deepseek:io.kestra.plugin:plugin-deepseek:LATEST
|
||||
#plugin-docker:io.kestra.plugin:plugin-docker:LATEST
|
||||
#plugin-elasticsearch:io.kestra.plugin:plugin-elasticsearch:LATEST
|
||||
#plugin-fivetran:io.kestra.plugin:plugin-fivetran:LATEST
|
||||
@@ -86,13 +88,18 @@
|
||||
#plugin-powerbi:io.kestra.plugin:plugin-powerbi:LATEST
|
||||
#plugin-pulsar:io.kestra.plugin:plugin-pulsar:LATEST
|
||||
#plugin-redis:io.kestra.plugin:plugin-redis:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-bun:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-deno:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-go:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-groovy:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-jbang:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-julia:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-jython:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-lua:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-nashorn:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-node:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-perl:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-php:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-powershell:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-python:LATEST
|
||||
#plugin-scripts:io.kestra.plugin:plugin-script-r:LATEST
|
||||
|
||||
7
Dockerfile.pr
Normal file
7
Dockerfile.pr
Normal file
@@ -0,0 +1,7 @@
|
||||
FROM kestra/kestra:develop
|
||||
|
||||
USER root
|
||||
|
||||
COPY --chown=kestra:kestra docker /
|
||||
|
||||
USER kestra
|
||||
@@ -65,10 +65,6 @@ Kestra is an open-source, event-driven orchestration platform that makes both **
|
||||
|
||||
## 🚀 Quick Start
|
||||
|
||||
### Try the Live Demo
|
||||
|
||||
Try Kestra with our [**Live Demo**](https://demo.kestra.io/ui/login?auto). No installation required!
|
||||
|
||||
### Get Started Locally in 5 Minutes
|
||||
|
||||
#### Launch Kestra in Docker
|
||||
|
||||
@@ -7,7 +7,7 @@ set -e
|
||||
# run tests on this image
|
||||
|
||||
|
||||
LOCAL_IMAGE_VERSION="local-e2e"
|
||||
LOCAL_IMAGE_VERSION="local-e2e-$(date +%s)"
|
||||
|
||||
echo "Running E2E"
|
||||
echo "Start time: $(date '+%Y-%m-%d %H:%M:%S')"
|
||||
@@ -15,6 +15,7 @@ start_time=$(date +%s)
|
||||
|
||||
echo ""
|
||||
echo "Building the image for this current repository"
|
||||
make clean
|
||||
make build-docker VERSION=$LOCAL_IMAGE_VERSION
|
||||
|
||||
end_time=$(date +%s)
|
||||
@@ -32,7 +33,7 @@ echo "npm i"
|
||||
npm i
|
||||
|
||||
echo 'sh ./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"'
|
||||
sh ./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"
|
||||
./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"
|
||||
|
||||
end_time2=$(date +%s)
|
||||
elapsed2=$(( end_time2 - start_time2 ))
|
||||
|
||||
50
build.gradle
50
build.gradle
@@ -16,7 +16,7 @@ plugins {
|
||||
id "java"
|
||||
id 'java-library'
|
||||
id "idea"
|
||||
id "com.gradleup.shadow" version "8.3.8"
|
||||
id "com.gradleup.shadow" version "8.3.9"
|
||||
id "application"
|
||||
|
||||
// test
|
||||
@@ -225,14 +225,14 @@ subprojects {
|
||||
}
|
||||
|
||||
testlogger {
|
||||
theme 'mocha-parallel'
|
||||
showExceptions true
|
||||
showFullStackTraces true
|
||||
showCauses true
|
||||
slowThreshold 2000
|
||||
showStandardStreams true
|
||||
showPassedStandardStreams false
|
||||
showSkippedStandardStreams true
|
||||
theme = 'mocha-parallel'
|
||||
showExceptions = true
|
||||
showFullStackTraces = true
|
||||
showCauses = true
|
||||
slowThreshold = 2000
|
||||
showStandardStreams = true
|
||||
showPassedStandardStreams = false
|
||||
showSkippedStandardStreams = true
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -410,7 +410,7 @@ jar {
|
||||
shadowJar {
|
||||
archiveClassifier.set(null)
|
||||
mergeServiceFiles()
|
||||
zip64 true
|
||||
zip64 = true
|
||||
}
|
||||
|
||||
distZip.dependsOn shadowJar
|
||||
@@ -427,8 +427,8 @@ def executableDir = layout.buildDirectory.dir("executable")
|
||||
def executable = layout.buildDirectory.file("executable/${project.name}-${project.version}").get().asFile
|
||||
|
||||
tasks.register('writeExecutableJar') {
|
||||
group "build"
|
||||
description "Write an executable jar from shadow jar"
|
||||
group = "build"
|
||||
description = "Write an executable jar from shadow jar"
|
||||
dependsOn = [shadowJar]
|
||||
|
||||
final shadowJarFile = tasks.shadowJar.outputs.files.singleFile
|
||||
@@ -454,8 +454,8 @@ tasks.register('writeExecutableJar') {
|
||||
}
|
||||
|
||||
tasks.register('executableJar', Zip) {
|
||||
group "build"
|
||||
description "Zip the executable jar"
|
||||
group = "build"
|
||||
description = "Zip the executable jar"
|
||||
dependsOn = [writeExecutableJar]
|
||||
|
||||
archiveFileName = "${project.name}-${project.version}.zip"
|
||||
@@ -620,6 +620,28 @@ subprojects {subProject ->
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (subProject.name != 'platform' && subProject.name != 'cli') {
|
||||
// only if a test source set actually exists (avoids empty artifacts)
|
||||
def hasTests = subProject.extensions.findByName('sourceSets')?.findByName('test') != null
|
||||
|
||||
if (hasTests) {
|
||||
// wire the artifact onto every Maven publication of this subproject
|
||||
publishing {
|
||||
publications {
|
||||
withType(MavenPublication).configureEach { pub ->
|
||||
// keep the normal java component + sources/javadoc already configured
|
||||
pub.artifact(subProject.tasks.named('testsJar').get())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// make sure publish tasks build the tests jar first
|
||||
tasks.matching { it.name.startsWith('publish') }.configureEach {
|
||||
dependsOn subProject.tasks.named('testsJar')
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -34,6 +34,7 @@ dependencies {
|
||||
implementation project(":storage-local")
|
||||
|
||||
implementation project(":webserver")
|
||||
implementation project(":worker")
|
||||
|
||||
//test
|
||||
testImplementation "org.wiremock:wiremock-jetty12"
|
||||
|
||||
@@ -16,6 +16,6 @@ abstract public class AbstractServerCommand extends AbstractCommand implements S
|
||||
}
|
||||
|
||||
protected static int defaultWorkerThread() {
|
||||
return Runtime.getRuntime().availableProcessors() * 4;
|
||||
return Runtime.getRuntime().availableProcessors() * 8;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ import picocli.CommandLine;
|
||||
WebServerCommand.class,
|
||||
WorkerCommand.class,
|
||||
LocalCommand.class,
|
||||
WorkerAgentCommand.class,
|
||||
}
|
||||
)
|
||||
@Slf4j
|
||||
|
||||
@@ -10,6 +10,7 @@ import io.kestra.core.runners.StandAloneRunner;
|
||||
import io.kestra.core.services.SkipExecutionService;
|
||||
import io.kestra.core.services.StartExecutorService;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.controller.Controller;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -48,7 +49,7 @@ public class StandAloneCommand extends AbstractServerCommand {
|
||||
@CommandLine.Option(names = "--tenant", description = "Tenant identifier, Required to load flows from path with the enterprise edition")
|
||||
private String tenantId;
|
||||
|
||||
@CommandLine.Option(names = {"--worker-thread"}, description = "the number of worker threads, defaults to four times the number of available processors. Set it to 0 to avoid starting a worker.")
|
||||
@CommandLine.Option(names = {"--worker-thread"}, description = "the number of worker threads, defaults to eight times the number of available processors. Set it to 0 to avoid starting a worker.")
|
||||
private int workerThread = defaultWorkerThread();
|
||||
|
||||
@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "a list of execution identifiers to skip, separated by a coma; for troubleshooting purpose only")
|
||||
@@ -110,6 +111,8 @@ public class StandAloneCommand extends AbstractServerCommand {
|
||||
}
|
||||
|
||||
StandAloneRunner standAloneRunner = applicationContext.getBean(StandAloneRunner.class);
|
||||
|
||||
Controller controller = applicationContext.getBean(Controller.class);
|
||||
|
||||
if (this.workerThread == 0) {
|
||||
standAloneRunner.setWorkerEnabled(false);
|
||||
|
||||
@@ -0,0 +1,59 @@
|
||||
package io.kestra.cli.commands.servers;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.contexts.KestraContext;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.worker.Worker;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import jakarta.inject.Inject;
|
||||
import picocli.CommandLine;
|
||||
import picocli.CommandLine.Option;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "worker-agent",
|
||||
description = "Start the Kestra worker"
|
||||
)
|
||||
public class WorkerAgentCommand extends AbstractServerCommand {
|
||||
|
||||
@Inject
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
@Option(names = {"-t", "--thread"}, description = "The max number of worker threads, defaults to four times the number of available processors")
|
||||
private int thread = defaultWorkerThread();
|
||||
|
||||
@Option(names = {"-g", "--worker-group"}, description = "The worker group key, must match the regex [a-zA-Z0-9_-]+ (EE only)")
|
||||
private String workerGroupKey = null;
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static Map<String, Object> propertiesOverrides() {
|
||||
return ImmutableMap.of(
|
||||
"kestra.server-type", ServerType.WORKER_AGENT
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
|
||||
KestraContext.getContext().injectWorkerConfigs(thread, workerGroupKey);
|
||||
|
||||
super.call();
|
||||
|
||||
if (this.workerGroupKey != null && !this.workerGroupKey.matches("[a-zA-Z0-9_-]+")) {
|
||||
throw new IllegalArgumentException("The --worker-group option must match the [a-zA-Z0-9_-]+ pattern");
|
||||
}
|
||||
|
||||
Worker worker = applicationContext.getBean(Worker.class);
|
||||
worker.start(thread, workerGroupKey);
|
||||
|
||||
Await.until(() -> !this.applicationContext.isRunning());
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
public String workerGroupKey() {
|
||||
return workerGroupKey;
|
||||
}
|
||||
}
|
||||
@@ -22,7 +22,7 @@ public class WorkerCommand extends AbstractServerCommand {
|
||||
@Inject
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
@Option(names = {"-t", "--thread"}, description = "The max number of worker threads, defaults to four times the number of available processors")
|
||||
@Option(names = {"-t", "--thread"}, description = "The max number of worker threads, defaults to eight times the number of available processors")
|
||||
private int thread = defaultWorkerThread();
|
||||
|
||||
@Option(names = {"-g", "--worker-group"}, description = "The worker group key, must match the regex [a-zA-Z0-9_-]+ (EE only)")
|
||||
|
||||
@@ -10,24 +10,21 @@ import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.services.FlowListenersInterface;
|
||||
import io.kestra.core.services.PluginDefaultService;
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
import io.micronaut.context.annotation.Value;
|
||||
import io.micronaut.scheduling.io.watch.FileWatchConfiguration;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.file.*;
|
||||
import java.nio.file.attribute.BasicFileAttributes;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
@Requires(property = "micronaut.io.watch.enabled", value = "true")
|
||||
@@ -49,13 +46,9 @@ public class FileChangedEventListener {
|
||||
@Inject
|
||||
protected FlowListenersInterface flowListeners;
|
||||
|
||||
@Nullable
|
||||
@Value("${micronaut.io.watch.tenantId}")
|
||||
private String tenantId;
|
||||
|
||||
FlowFilesManager flowFilesManager;
|
||||
|
||||
private List<FlowWithPath> flows = new ArrayList<>();
|
||||
private List<FlowWithPath> flows = new CopyOnWriteArrayList<>();
|
||||
|
||||
private boolean isStarted = false;
|
||||
|
||||
@@ -113,8 +106,6 @@ public class FileChangedEventListener {
|
||||
}
|
||||
|
||||
public void startListening(List<Path> paths) throws IOException, InterruptedException {
|
||||
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
|
||||
|
||||
for (Path path : paths) {
|
||||
path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY);
|
||||
}
|
||||
@@ -157,12 +148,20 @@ public class FileChangedEventListener {
|
||||
flows.add(FlowWithPath.of(flow.get(), filePath.toString()));
|
||||
}
|
||||
|
||||
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(tenantId, content));
|
||||
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(getTenantIdFromPath(filePath), content));
|
||||
log.info("Flow {} from file {} has been created or modified", flow.get().getId(), entry);
|
||||
}
|
||||
|
||||
} catch (NoSuchFileException e) {
|
||||
log.error("File not found: {}", entry, e);
|
||||
log.warn("File not found: {}, deleting it", entry, e);
|
||||
// the file might have been deleted while reading so if not found we try to delete the flow
|
||||
flows.stream()
|
||||
.filter(flow -> flow.getPath().equals(filePath.toString()))
|
||||
.findFirst()
|
||||
.ifPresent(flowWithPath -> {
|
||||
flowFilesManager.deleteFlow(flowWithPath.getTenantId(), flowWithPath.getNamespace(), flowWithPath.getId());
|
||||
this.flows.removeIf(fwp -> fwp.uidWithoutRevision().equals(flowWithPath.uidWithoutRevision()));
|
||||
});
|
||||
} catch (IOException e) {
|
||||
log.error("Error reading file: {}", entry, e);
|
||||
}
|
||||
@@ -193,8 +192,6 @@ public class FileChangedEventListener {
|
||||
}
|
||||
|
||||
private void loadFlowsFromFolder(Path folder) {
|
||||
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
|
||||
|
||||
try {
|
||||
Files.walkFileTree(folder, new SimpleFileVisitor<Path>() {
|
||||
@Override
|
||||
@@ -214,7 +211,7 @@ public class FileChangedEventListener {
|
||||
|
||||
if (flow.isPresent() && flows.stream().noneMatch(flowWithPath -> flowWithPath.uidWithoutRevision().equals(flow.get().uidWithoutRevision()))) {
|
||||
flows.add(FlowWithPath.of(flow.get(), file.toString()));
|
||||
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(tenantId, content));
|
||||
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(getTenantIdFromPath(file), content));
|
||||
}
|
||||
}
|
||||
return FileVisitResult.CONTINUE;
|
||||
@@ -238,10 +235,8 @@ public class FileChangedEventListener {
|
||||
}
|
||||
|
||||
private Optional<FlowWithSource> parseFlow(String content, Path entry) {
|
||||
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
|
||||
|
||||
try {
|
||||
FlowWithSource flow = pluginDefaultService.parseFlowWithAllDefaults(tenantId, content, false);
|
||||
FlowWithSource flow = pluginDefaultService.parseFlowWithAllDefaults(getTenantIdFromPath(entry), content, false);
|
||||
modelValidator.validate(flow);
|
||||
return Optional.of(flow);
|
||||
} catch (ConstraintViolationException | FlowProcessingException e) {
|
||||
@@ -265,4 +260,8 @@ public class FileChangedEventListener {
|
||||
private Path buildPath(FlowInterface flow) {
|
||||
return fileWatchConfiguration.getPaths().getFirst().resolve(flow.uidWithoutRevision() + ".yml");
|
||||
}
|
||||
|
||||
private String getTenantIdFromPath(Path path) {
|
||||
return path.getFileName().toString().split("_")[0];
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,7 +50,6 @@ micronaut:
|
||||
caches:
|
||||
default:
|
||||
maximum-weight: 10485760
|
||||
|
||||
http:
|
||||
client:
|
||||
read-idle-timeout: 60s
|
||||
@@ -93,7 +92,13 @@ jackson:
|
||||
deserialization:
|
||||
FAIL_ON_UNKNOWN_PROPERTIES: false
|
||||
|
||||
endpoints:
|
||||
# Disable Micronaut GRPC
|
||||
grpc:
|
||||
server:
|
||||
enabled: false
|
||||
health:
|
||||
enabled: false
|
||||
endpoints:
|
||||
all:
|
||||
port: 8081
|
||||
enabled: true
|
||||
@@ -212,7 +217,7 @@ kestra:
|
||||
retention: 30d
|
||||
anonymous-usage-report:
|
||||
enabled: true
|
||||
uri: https://api.kestra.io/v1/reports/usages
|
||||
uri: https://api.kestra.io/v1/server-events/
|
||||
initial-delay: 5m
|
||||
fixed-delay: 1h
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.cli.services;
|
||||
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.GenericFlow;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
|
||||
@@ -71,7 +72,9 @@ class FileChangedEventListenerTest {
|
||||
type: io.kestra.plugin.core.log.Log
|
||||
message: Hello World! 🚀
|
||||
""";
|
||||
Files.write(Path.of(FILE_WATCH + "/myflow.yaml"), flow.getBytes());
|
||||
|
||||
GenericFlow genericFlow = GenericFlow.fromYaml(MAIN_TENANT, flow);
|
||||
Files.write(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"), flow.getBytes());
|
||||
Await.until(
|
||||
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").isPresent(),
|
||||
Duration.ofMillis(100),
|
||||
@@ -83,7 +86,7 @@ class FileChangedEventListenerTest {
|
||||
assertThat(myflow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
|
||||
|
||||
// delete the flow
|
||||
Files.delete(Path.of(FILE_WATCH + "/myflow.yaml"));
|
||||
Files.delete(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"));
|
||||
Await.until(
|
||||
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").isEmpty(),
|
||||
Duration.ofMillis(100),
|
||||
@@ -110,7 +113,8 @@ class FileChangedEventListenerTest {
|
||||
values:
|
||||
message: Hello World!
|
||||
""";
|
||||
Files.write(Path.of(FILE_WATCH + "/plugin-default.yaml"), pluginDefault.getBytes());
|
||||
GenericFlow genericFlow = GenericFlow.fromYaml(MAIN_TENANT, pluginDefault);
|
||||
Files.write(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"), pluginDefault.getBytes());
|
||||
Await.until(
|
||||
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").isPresent(),
|
||||
Duration.ofMillis(100),
|
||||
@@ -122,7 +126,7 @@ class FileChangedEventListenerTest {
|
||||
assertThat(pluginDefaultFlow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
|
||||
|
||||
// delete both files
|
||||
Files.delete(Path.of(FILE_WATCH + "/plugin-default.yaml"));
|
||||
Files.delete(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"));
|
||||
Await.until(
|
||||
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").isEmpty(),
|
||||
Duration.ofMillis(100),
|
||||
|
||||
@@ -53,6 +53,8 @@ import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import static io.kestra.core.docs.AbstractClassDocumentation.flattenWithoutType;
|
||||
import static io.kestra.core.docs.AbstractClassDocumentation.required;
|
||||
import static io.kestra.core.serializers.JacksonMapper.MAP_TYPE_REFERENCE;
|
||||
|
||||
@Singleton
|
||||
@@ -92,12 +94,16 @@ public class JsonSchemaGenerator {
|
||||
}
|
||||
|
||||
public <T> Map<String, Object> schemas(Class<? extends T> cls, boolean arrayOf, List<String> allowedPluginTypes) {
|
||||
return this.schemas(cls, arrayOf, allowedPluginTypes, false);
|
||||
}
|
||||
|
||||
public <T> Map<String, Object> schemas(Class<? extends T> cls, boolean arrayOf, List<String> allowedPluginTypes, boolean withOutputs) {
|
||||
SchemaGeneratorConfigBuilder builder = new SchemaGeneratorConfigBuilder(
|
||||
SchemaVersion.DRAFT_7,
|
||||
OptionPreset.PLAIN_JSON
|
||||
);
|
||||
|
||||
this.build(builder, true, allowedPluginTypes);
|
||||
this.build(builder, true, allowedPluginTypes, withOutputs);
|
||||
|
||||
SchemaGeneratorConfig schemaGeneratorConfig = builder.build();
|
||||
|
||||
@@ -122,12 +128,13 @@ public class JsonSchemaGenerator {
|
||||
if (jsonNode instanceof ObjectNode clazzSchema && clazzSchema.get("required") instanceof ArrayNode requiredPropsNode && clazzSchema.get("properties") instanceof ObjectNode properties) {
|
||||
List<String> requiredFieldValues = StreamSupport.stream(requiredPropsNode.spliterator(), false)
|
||||
.map(JsonNode::asText)
|
||||
.toList();
|
||||
.collect(Collectors.toList());
|
||||
|
||||
properties.fields().forEachRemaining(e -> {
|
||||
int indexInRequiredArray = requiredFieldValues.indexOf(e.getKey());
|
||||
if (indexInRequiredArray != -1 && e.getValue() instanceof ObjectNode valueNode && valueNode.has("default")) {
|
||||
requiredPropsNode.remove(indexInRequiredArray);
|
||||
requiredFieldValues.remove(indexInRequiredArray);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -248,6 +255,10 @@ public class JsonSchemaGenerator {
|
||||
}
|
||||
|
||||
protected void build(SchemaGeneratorConfigBuilder builder, boolean draft7, List<String> allowedPluginTypes) {
|
||||
this.build(builder, draft7, allowedPluginTypes, false);
|
||||
}
|
||||
|
||||
protected void build(SchemaGeneratorConfigBuilder builder, boolean draft7, List<String> allowedPluginTypes, boolean withOutputs) {
|
||||
// builder.withObjectMapper(builder.getObjectMapper().configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false));
|
||||
builder
|
||||
.with(new JakartaValidationModule(
|
||||
@@ -429,6 +440,13 @@ public class JsonSchemaGenerator {
|
||||
if (pluginAnnotation.beta()) {
|
||||
collectedTypeAttributes.put("$beta", true);
|
||||
}
|
||||
|
||||
if (withOutputs) {
|
||||
Map<String, Object> outputsSchema = this.outputs(null, scope.getType().getErasedType());
|
||||
collectedTypeAttributes.set("outputs", context.getGeneratorConfig().createObjectNode().pojoNode(
|
||||
flattenWithoutType(AbstractClassDocumentation.properties(outputsSchema), required(outputsSchema))
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// handle deprecated tasks
|
||||
|
||||
@@ -15,6 +15,8 @@ import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.ArrayUtils;
|
||||
|
||||
import java.util.function.Supplier;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class MetricRegistry {
|
||||
@@ -180,11 +182,26 @@ public class MetricRegistry {
|
||||
* statement.
|
||||
*/
|
||||
public <T extends Number> T gauge(String name, String description, T number, String... tags) {
|
||||
Gauge.builder(metricName(name), () -> number)
|
||||
registerGauge(name, description, () -> number, tags);
|
||||
return number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a gauge that reports the value of the {@link Number}.
|
||||
*
|
||||
* @param name Name of the gauge being registered.
|
||||
* @param description The metric description
|
||||
* @param supplier A function that yields a double value for the gauge.
|
||||
* @param tags Sequence of dimensions for breaking down the name.
|
||||
* @param <T> The type of the number from which the gauge value is extracted.
|
||||
* @return The number that was passed in so the registration can be done as part of an assignment
|
||||
* statement.
|
||||
*/
|
||||
public <T extends Number> Gauge registerGauge(String name, String description, Supplier<T> supplier, String... tags) {
|
||||
return Gauge.builder(metricName(name),supplier)
|
||||
.description(description)
|
||||
.tags(tags)
|
||||
.register(this.meterRegistry);
|
||||
return number;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
package io.kestra.core.models;
|
||||
|
||||
import io.kestra.core.utils.MapUtils;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public record Label(@NotNull String key, @NotNull String value) {
|
||||
@@ -29,11 +28,36 @@ public record Label(@NotNull String key, @NotNull String value) {
|
||||
* @return the nested {@link Map}.
|
||||
*/
|
||||
public static Map<String, Object> toNestedMap(List<Label> labels) {
|
||||
Map<String, Object> asMap = labels.stream()
|
||||
return MapUtils.flattenToNestedMap(toMap(labels));
|
||||
}
|
||||
|
||||
/**
|
||||
* Static helper method for converting a list of labels to a flat map.
|
||||
* Key order is kept.
|
||||
*
|
||||
* @param labels The list of {@link Label} to be converted.
|
||||
* @return the flat {@link Map}.
|
||||
*/
|
||||
public static Map<String, String> toMap(@Nullable List<Label> labels) {
|
||||
if (labels == null || labels.isEmpty()) return Collections.emptyMap();
|
||||
return labels.stream()
|
||||
.filter(label -> label.value() != null && label.key() != null)
|
||||
// using an accumulator in case labels with the same key exists: the first is kept
|
||||
.collect(Collectors.toMap(Label::key, Label::value, (first, second) -> first));
|
||||
return MapUtils.flattenToNestedMap(asMap);
|
||||
// using an accumulator in case labels with the same key exists: the second is kept
|
||||
.collect(Collectors.toMap(Label::key, Label::value, (first, second) -> second, LinkedHashMap::new));
|
||||
}
|
||||
|
||||
/**
|
||||
* Static helper method for deduplicating a list of labels by their key.
|
||||
* Value of the last key occurrence is kept.
|
||||
*
|
||||
* @param labels The list of {@link Label} to be deduplicated.
|
||||
* @return the deduplicated {@link List}.
|
||||
*/
|
||||
public static List<Label> deduplicate(@Nullable List<Label> labels) {
|
||||
if (labels == null || labels.isEmpty()) return Collections.emptyList();
|
||||
return toMap(labels).entrySet().stream()
|
||||
.map(entry -> new Label(entry.getKey(), entry.getValue()))
|
||||
.collect(Collectors.toCollection(ArrayList::new));
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -6,9 +6,9 @@ import com.fasterxml.jackson.annotation.JsonValue;
|
||||
import io.kestra.core.exceptions.InvalidQueryFiltersException;
|
||||
import io.kestra.core.models.dashboards.filters.*;
|
||||
import io.kestra.core.utils.Enums;
|
||||
import java.util.ArrayList;
|
||||
import lombok.Builder;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -49,42 +49,27 @@ public record QueryFilter(
|
||||
PREFIX
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private List<Object> asValues(Object value) {
|
||||
return value instanceof String valueStr ? Arrays.asList(valueStr.split(",")) : (List<Object>) value;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T extends Enum<T>> AbstractFilter<T> toDashboardFilterBuilder(T field, Object value) {
|
||||
switch (this.operation) {
|
||||
case EQUALS:
|
||||
return EqualTo.<T>builder().field(field).value(value).build();
|
||||
case NOT_EQUALS:
|
||||
return NotEqualTo.<T>builder().field(field).value(value).build();
|
||||
case GREATER_THAN:
|
||||
return GreaterThan.<T>builder().field(field).value(value).build();
|
||||
case LESS_THAN:
|
||||
return LessThan.<T>builder().field(field).value(value).build();
|
||||
case GREATER_THAN_OR_EQUAL_TO:
|
||||
return GreaterThanOrEqualTo.<T>builder().field(field).value(value).build();
|
||||
case LESS_THAN_OR_EQUAL_TO:
|
||||
return LessThanOrEqualTo.<T>builder().field(field).value(value).build();
|
||||
case IN:
|
||||
return In.<T>builder().field(field).values(asValues(value)).build();
|
||||
case NOT_IN:
|
||||
return NotIn.<T>builder().field(field).values(asValues(value)).build();
|
||||
case STARTS_WITH:
|
||||
return StartsWith.<T>builder().field(field).value(value.toString()).build();
|
||||
case ENDS_WITH:
|
||||
return EndsWith.<T>builder().field(field).value(value.toString()).build();
|
||||
case CONTAINS:
|
||||
return Contains.<T>builder().field(field).value(value.toString()).build();
|
||||
case REGEX:
|
||||
return Regex.<T>builder().field(field).value(value.toString()).build();
|
||||
case PREFIX:
|
||||
return Regex.<T>builder().field(field).value("^" + value.toString().replace(".", "\\.") + "(?:\\..+)?$").build();
|
||||
default:
|
||||
throw new IllegalArgumentException("Unsupported operation: " + this.operation);
|
||||
}
|
||||
return switch (this.operation) {
|
||||
case EQUALS -> EqualTo.<T>builder().field(field).value(value).build();
|
||||
case NOT_EQUALS -> NotEqualTo.<T>builder().field(field).value(value).build();
|
||||
case GREATER_THAN -> GreaterThan.<T>builder().field(field).value(value).build();
|
||||
case LESS_THAN -> LessThan.<T>builder().field(field).value(value).build();
|
||||
case GREATER_THAN_OR_EQUAL_TO -> GreaterThanOrEqualTo.<T>builder().field(field).value(value).build();
|
||||
case LESS_THAN_OR_EQUAL_TO -> LessThanOrEqualTo.<T>builder().field(field).value(value).build();
|
||||
case IN -> In.<T>builder().field(field).values(asValues(value)).build();
|
||||
case NOT_IN -> NotIn.<T>builder().field(field).values(asValues(value)).build();
|
||||
case STARTS_WITH -> StartsWith.<T>builder().field(field).value(value.toString()).build();
|
||||
case ENDS_WITH -> EndsWith.<T>builder().field(field).value(value.toString()).build();
|
||||
case CONTAINS -> Contains.<T>builder().field(field).value(value.toString()).build();
|
||||
case REGEX -> Regex.<T>builder().field(field).value(value.toString()).build();
|
||||
case PREFIX -> Regex.<T>builder().field(field).value("^" + value.toString().replace(".", "\\.") + "(?:\\..+)?$").build();
|
||||
};
|
||||
}
|
||||
|
||||
public enum Field {
|
||||
@@ -154,6 +139,12 @@ public record QueryFilter(
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
|
||||
}
|
||||
},
|
||||
EXECUTION_ID("executionId") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
|
||||
}
|
||||
},
|
||||
CHILD_FILTER("childFilter") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
@@ -228,7 +219,7 @@ public record QueryFilter(
|
||||
@Override
|
||||
public List<Field> supportedField() {
|
||||
return List.of(Field.QUERY, Field.SCOPE, Field.NAMESPACE, Field.START_DATE,
|
||||
Field.END_DATE, Field.FLOW_ID, Field.TRIGGER_ID, Field.MIN_LEVEL
|
||||
Field.END_DATE, Field.FLOW_ID, Field.TRIGGER_ID, Field.MIN_LEVEL, Field.EXECUTION_ID
|
||||
);
|
||||
}
|
||||
},
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
package io.kestra.core.models;
|
||||
|
||||
public enum ServerType {
|
||||
public enum ServerType {
|
||||
EXECUTOR,
|
||||
INDEXER,
|
||||
SCHEDULER,
|
||||
STANDALONE,
|
||||
WEBSERVER,
|
||||
WORKER,
|
||||
WORKER_AGENT,
|
||||
}
|
||||
|
||||
@@ -62,6 +62,7 @@ public record ServiceUsage(
|
||||
|
||||
List<DailyServiceStatistics> statistics = Arrays
|
||||
.stream(ServiceType.values())
|
||||
.filter(it -> !it.equals(ServiceType.INVALID))
|
||||
.map(type -> of(from, to, repository, type, interval))
|
||||
.toList();
|
||||
return new ServiceUsage(statistics);
|
||||
|
||||
@@ -1,74 +0,0 @@
|
||||
package io.kestra.core.models.collectors;
|
||||
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import lombok.extern.jackson.Jacksonized;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@Jacksonized
|
||||
@Introspected
|
||||
@AllArgsConstructor
|
||||
public class Usage {
|
||||
@NotNull
|
||||
private final String uuid;
|
||||
|
||||
@NotNull
|
||||
private final String startUuid;
|
||||
|
||||
@NotNull
|
||||
private final String instanceUuid;
|
||||
|
||||
@NotNull
|
||||
private final ServerType serverType;
|
||||
|
||||
@NotNull
|
||||
private final String version;
|
||||
|
||||
@NotNull
|
||||
private final ZoneId zoneId;
|
||||
|
||||
@Nullable
|
||||
private final String uri;
|
||||
|
||||
@Nullable
|
||||
private final Set<String> environments;
|
||||
|
||||
@NotNull
|
||||
private final Instant startTime;
|
||||
|
||||
@Valid
|
||||
private final HostUsage host;
|
||||
|
||||
@Valid
|
||||
private final ConfigurationUsage configurations;
|
||||
|
||||
@Valid
|
||||
private final List<PluginUsage> plugins;
|
||||
|
||||
@Valid
|
||||
private final FlowUsage flows;
|
||||
|
||||
@Valid
|
||||
private final ExecutionUsage executions;
|
||||
|
||||
@Valid
|
||||
@Nullable
|
||||
private ServiceUsage services;
|
||||
|
||||
@Valid
|
||||
@Nullable
|
||||
private List<PluginMetric> pluginMetrics;
|
||||
}
|
||||
@@ -25,6 +25,7 @@ import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
|
||||
import io.kestra.core.services.LabelService;
|
||||
import io.kestra.core.test.flow.TaskFixture;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.MapUtils;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
import jakarta.annotation.Nullable;
|
||||
@@ -131,12 +132,12 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
* @param labels The Flow labels.
|
||||
* @return a new {@link Execution}.
|
||||
*/
|
||||
public static Execution newExecution(final Flow flow, final List<Label> labels) {
|
||||
public static Execution newExecution(final FlowInterface flow, final List<Label> labels) {
|
||||
return newExecution(flow, null, labels, Optional.empty());
|
||||
}
|
||||
|
||||
public List<Label> getLabels() {
|
||||
return Optional.ofNullable(this.labels).orElse(new ArrayList<>());
|
||||
return ListUtils.emptyOnNull(this.labels);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -181,8 +182,22 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Customization of Lombok-generated builder.
|
||||
*/
|
||||
public static class ExecutionBuilder {
|
||||
|
||||
/**
|
||||
* Enforce unique values of {@link Label} when using the builder.
|
||||
*
|
||||
* @param labels The labels.
|
||||
* @return Deduplicated labels.
|
||||
*/
|
||||
public ExecutionBuilder labels(List<Label> labels) {
|
||||
this.labels = Label.deduplicate(labels);
|
||||
return this;
|
||||
}
|
||||
|
||||
void prebuild() {
|
||||
this.originalId = this.id;
|
||||
this.metadata = ExecutionMetadata.builder()
|
||||
@@ -231,7 +246,6 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
}
|
||||
|
||||
public Execution withLabels(List<Label> labels) {
|
||||
|
||||
return new Execution(
|
||||
this.tenantId,
|
||||
this.id,
|
||||
@@ -241,7 +255,7 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
this.taskRunList,
|
||||
this.inputs,
|
||||
this.outputs,
|
||||
labels,
|
||||
Label.deduplicate(labels),
|
||||
this.variables,
|
||||
this.state,
|
||||
this.parentId,
|
||||
@@ -400,7 +414,7 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
*
|
||||
* @param resolvedTasks normal tasks
|
||||
* @param resolvedErrors errors tasks
|
||||
* @param resolvedErrors finally tasks
|
||||
* @param resolvedFinally finally tasks
|
||||
* @return the flow we need to follow
|
||||
*/
|
||||
public List<ResolvedTask> findTaskDependingFlowState(
|
||||
@@ -1026,6 +1040,16 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find all children of this {@link TaskRun}.
|
||||
*/
|
||||
public List<TaskRun> findChildren(TaskRun parentTaskRun) {
|
||||
return taskRunList.stream()
|
||||
.filter(taskRun -> parentTaskRun.getId().equals(taskRun.getParentTaskRunId()))
|
||||
.toList();
|
||||
}
|
||||
|
||||
|
||||
public List<String> findParentsValues(TaskRun taskRun, boolean withCurrent) {
|
||||
return (withCurrent ?
|
||||
Stream.concat(findParents(taskRun).stream(), Stream.of(taskRun)) :
|
||||
|
||||
@@ -38,6 +38,8 @@ public abstract class AbstractFlow implements FlowInterface {
|
||||
@Min(value = 1)
|
||||
Integer revision;
|
||||
|
||||
String description;
|
||||
|
||||
@Valid
|
||||
List<Input<?>> inputs;
|
||||
|
||||
|
||||
@@ -61,13 +61,10 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
}
|
||||
});
|
||||
|
||||
String description;
|
||||
|
||||
Map<String, Object> variables;
|
||||
|
||||
@Valid
|
||||
@NotEmpty
|
||||
|
||||
@Schema(additionalProperties = Schema.AdditionalPropertiesValue.TRUE)
|
||||
List<Task> tasks;
|
||||
|
||||
@@ -125,7 +122,7 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
AbstractRetry retry;
|
||||
|
||||
@Valid
|
||||
@PluginProperty(beta = true)
|
||||
@PluginProperty
|
||||
List<SLA> sla;
|
||||
|
||||
public Stream<String> allTypes() {
|
||||
|
||||
@@ -4,6 +4,7 @@ import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.Optional;
|
||||
@@ -57,6 +58,7 @@ public interface FlowId {
|
||||
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
@EqualsAndHashCode
|
||||
class Default implements FlowId {
|
||||
private final String tenantId;
|
||||
private final String namespace;
|
||||
|
||||
@@ -31,6 +31,8 @@ public interface FlowInterface extends FlowId, DeletedInterface, TenantInterface
|
||||
|
||||
Pattern YAML_REVISION_MATCHER = Pattern.compile("(?m)^revision: \\d+\n?");
|
||||
|
||||
String getDescription();
|
||||
|
||||
boolean isDisabled();
|
||||
|
||||
boolean isDeleted();
|
||||
|
||||
@@ -4,6 +4,8 @@ import com.fasterxml.jackson.annotation.JsonSetter;
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import io.kestra.core.models.flows.input.*;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
@@ -16,6 +18,8 @@ import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.util.function.Function;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@SuperBuilder
|
||||
@Getter
|
||||
@@ -78,7 +82,7 @@ public abstract class Input<T> implements Data {
|
||||
@Schema(
|
||||
title = "The default value to use if no value is specified."
|
||||
)
|
||||
T defaults;
|
||||
Property<T> defaults;
|
||||
|
||||
@Schema(
|
||||
title = "The display name of the input."
|
||||
|
||||
@@ -43,4 +43,11 @@ public class Output implements Data {
|
||||
Type type;
|
||||
|
||||
String displayName;
|
||||
|
||||
/**
|
||||
* Specifies whether the output is required or not.
|
||||
* <p>
|
||||
* By default, an output is always required.
|
||||
*/
|
||||
Boolean required;
|
||||
}
|
||||
|
||||
@@ -116,7 +116,7 @@ public class State {
|
||||
}
|
||||
|
||||
public Instant maxDate() {
|
||||
if (this.histories.size() == 0) {
|
||||
if (this.histories.isEmpty()) {
|
||||
return Instant.now();
|
||||
}
|
||||
|
||||
@@ -124,7 +124,7 @@ public class State {
|
||||
}
|
||||
|
||||
public Instant minDate() {
|
||||
if (this.histories.size() == 0) {
|
||||
if (this.histories.isEmpty()) {
|
||||
return Instant.now();
|
||||
}
|
||||
|
||||
@@ -173,6 +173,11 @@ public class State {
|
||||
return this.current.isBreakpoint();
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public boolean isQueued() {
|
||||
return this.current.isQueued();
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public boolean isRetrying() {
|
||||
return this.current.isRetrying();
|
||||
@@ -206,6 +211,14 @@ public class State {
|
||||
return this.histories.get(this.histories.size() - 2).state.isPaused();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true if the execution has failed, then was restarted.
|
||||
* This is to disambiguate between a RESTARTED after PAUSED and RESTARTED after FAILED state.
|
||||
*/
|
||||
public boolean failedThenRestarted() {
|
||||
return this.current == Type.RESTARTED && this.histories.get(this.histories.size() - 2).state.isFailed();
|
||||
}
|
||||
|
||||
@Introspected
|
||||
public enum Type {
|
||||
CREATED,
|
||||
@@ -264,6 +277,10 @@ public class State {
|
||||
return this == Type.KILLED;
|
||||
}
|
||||
|
||||
public boolean isQueued(){
|
||||
return this == Type.QUEUED;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return states that are terminal to an execution
|
||||
*/
|
||||
|
||||
@@ -20,9 +20,8 @@ public class FileInput extends Input<URI> {
|
||||
|
||||
private static final String DEFAULT_EXTENSION = ".upl";
|
||||
|
||||
@Builder.Default
|
||||
@Deprecated(since = "0.24", forRemoval = true)
|
||||
public String extension = DEFAULT_EXTENSION;
|
||||
public String extension;
|
||||
|
||||
@Override
|
||||
public void validate(URI input) throws ConstraintViolationException {
|
||||
@@ -33,6 +32,7 @@ public class FileInput extends Input<URI> {
|
||||
String res = inputs.stream()
|
||||
.filter(in -> in instanceof FileInput)
|
||||
.filter(in -> in.getId().equals(fileName))
|
||||
.filter(flowInput -> ((FileInput) flowInput).getExtension() != null)
|
||||
.map(flowInput -> ((FileInput) flowInput).getExtension())
|
||||
.findFirst()
|
||||
.orElse(FileInput.DEFAULT_EXTENSION);
|
||||
|
||||
@@ -6,19 +6,21 @@ import jakarta.validation.ConstraintViolationException;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
|
||||
/**
|
||||
* Represents a
|
||||
* Represents an input along with its associated value and validation state.
|
||||
*
|
||||
* @param input The flow's {@link Input}.
|
||||
* @param value The flow's input value/data.
|
||||
* @param enabled Specify whether the input is enabled.
|
||||
* @param exception The input validation exception.
|
||||
* @param input The {@link Input} definition of the flow.
|
||||
* @param value The provided value for the input.
|
||||
* @param enabled {@code true} if the input is enabled; {@code false} otherwise.
|
||||
* @param isDefault {@code true} if the provided value is the default; {@code false} otherwise.
|
||||
* @param exception The validation exception, if the input value is invalid; {@code null} otherwise.
|
||||
*/
|
||||
public record InputAndValue(
|
||||
Input<?> input,
|
||||
Object value,
|
||||
boolean enabled,
|
||||
boolean isDefault,
|
||||
ConstraintViolationException exception) {
|
||||
|
||||
|
||||
/**
|
||||
* Creates a new {@link InputAndValue} instance.
|
||||
*
|
||||
@@ -26,6 +28,6 @@ public record InputAndValue(
|
||||
* @param value The value.
|
||||
*/
|
||||
public InputAndValue(@NotNull Input<?> input, @Nullable Object value) {
|
||||
this(input, value, true, null);
|
||||
this(input, value, true, false, null);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,6 +68,19 @@ public class Property<T> {
|
||||
String getExpression() {
|
||||
return expression;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new {@link Property} with no cached rendered value,
|
||||
* so that the next render will evaluate its original Pebble expression.
|
||||
* <p>
|
||||
* The returned property will still cache its rendered result.
|
||||
* To re-evaluate on a subsequent render, call {@code skipCache()} again.
|
||||
*
|
||||
* @return a new {@link Property} without a pre-rendered value
|
||||
*/
|
||||
public Property<T> skipCache() {
|
||||
return Property.ofExpression(expression);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a new Property object with a value already set.<br>
|
||||
@@ -132,8 +145,8 @@ public class Property<T> {
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#as(Class)
|
||||
*/
|
||||
public static <T> T as(Property<T> property, RunContext runContext, Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||
return as(property, runContext, clazz, Map.of());
|
||||
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||
return as(property, context, clazz, Map.of());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -143,9 +156,9 @@ public class Property<T> {
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#as(Class, Map)
|
||||
*/
|
||||
public static <T> T as(Property<T> property, RunContext runContext, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
if (property.value == null) {
|
||||
String rendered = runContext.render(property.expression, variables);
|
||||
String rendered = context.render(property.expression, variables);
|
||||
property.value = MAPPER.convertValue(rendered, clazz);
|
||||
}
|
||||
|
||||
@@ -159,8 +172,8 @@ public class Property<T> {
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#asList(Class)
|
||||
*/
|
||||
public static <T, I> T asList(Property<T> property, RunContext runContext, Class<I> itemClazz) throws IllegalVariableEvaluationException {
|
||||
return asList(property, runContext, itemClazz, Map.of());
|
||||
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz) throws IllegalVariableEvaluationException {
|
||||
return asList(property, context, itemClazz, Map.of());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -171,7 +184,7 @@ public class Property<T> {
|
||||
* @see io.kestra.core.runners.RunContextProperty#asList(Class, Map)
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T, I> T asList(Property<T> property, RunContext runContext, Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
if (property.value == null) {
|
||||
JavaType type = MAPPER.getTypeFactory().constructCollectionLikeType(List.class, itemClazz);
|
||||
try {
|
||||
@@ -179,7 +192,7 @@ public class Property<T> {
|
||||
// We need to detect if the expression is already a list or if it's a pebble expression (for eg. referencing a variable containing a list).
|
||||
// Doing that allows us to, if it's an expression, first render then read it as a list.
|
||||
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
|
||||
property.value = MAPPER.readValue(runContext.render(property.expression, variables), type);
|
||||
property.value = MAPPER.readValue(context.render(property.expression, variables), type);
|
||||
}
|
||||
// Otherwise, if it's already a list, we read it as a list first then render it from run context which handle list rendering by rendering each item of the list
|
||||
else {
|
||||
@@ -187,9 +200,9 @@ public class Property<T> {
|
||||
property.value = (T) asRawList.stream()
|
||||
.map(throwFunction(item -> {
|
||||
if (item instanceof String str) {
|
||||
return MAPPER.convertValue(runContext.render(str, variables), itemClazz);
|
||||
return MAPPER.convertValue(context.render(str, variables), itemClazz);
|
||||
} else if (item instanceof Map map) {
|
||||
return MAPPER.convertValue(runContext.render(map, variables), itemClazz);
|
||||
return MAPPER.convertValue(context.render(map, variables), itemClazz);
|
||||
}
|
||||
return item;
|
||||
}))
|
||||
|
||||
@@ -0,0 +1,38 @@
|
||||
package io.kestra.core.models.property;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Contextual object for rendering properties.
|
||||
*
|
||||
* @see Property
|
||||
*/
|
||||
public interface PropertyContext {
|
||||
|
||||
String render(String inline, Map<String, Object> variables) throws IllegalVariableEvaluationException;
|
||||
|
||||
Map<String, Object> render(Map<String, Object> inline, Map<String, Object> variables) throws IllegalVariableEvaluationException;
|
||||
|
||||
/**
|
||||
* Static helper method for creating a new {@link PropertyContext} from a given {@link VariableRenderer}.
|
||||
*
|
||||
* @param renderer the {@link VariableRenderer}.
|
||||
* @return a new {@link PropertyContext}.
|
||||
*/
|
||||
static PropertyContext create(final VariableRenderer renderer) {
|
||||
return new PropertyContext() {
|
||||
@Override
|
||||
public String render(String inline, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
return renderer.render(inline, variables);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> render(Map<String, Object> inline, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
return renderer.render(inline, variables);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -222,6 +222,7 @@ public class Trigger extends TriggerContext implements HasUID {
|
||||
}
|
||||
// If trigger is a schedule and execution ended after the next execution date
|
||||
else if (abstractTrigger instanceof Schedule schedule &&
|
||||
this.getNextExecutionDate() != null &&
|
||||
execution.getState().getEndDate().get().isAfter(this.getNextExecutionDate().toInstant())
|
||||
) {
|
||||
RecoverMissedSchedules recoverMissedSchedules = Optional.ofNullable(schedule.getRecoverMissedSchedules())
|
||||
|
||||
@@ -5,11 +5,9 @@ import io.kestra.core.models.executions.ExecutionKilled;
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.models.executions.MetricEntry;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.templates.Template;
|
||||
import io.kestra.core.models.triggers.Trigger;
|
||||
import io.kestra.core.runners.*;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.templates.Template;
|
||||
|
||||
public interface QueueFactoryInterface {
|
||||
String EXECUTION_NAMED = "executionQueue";
|
||||
@@ -28,12 +26,13 @@ public interface QueueFactoryInterface {
|
||||
String SUBFLOWEXECUTIONRESULT_NAMED = "subflowExecutionResultQueue";
|
||||
String CLUSTER_EVENT_NAMED = "clusterEventQueue";
|
||||
String SUBFLOWEXECUTIONEND_NAMED = "subflowExecutionEndQueue";
|
||||
String EXECUTION_RUNNING_NAMED = "executionRunningQueue";
|
||||
|
||||
QueueInterface<Execution> execution();
|
||||
|
||||
QueueInterface<Executor> executor();
|
||||
|
||||
QueueInterface<WorkerJob> workerJob();
|
||||
WorkerJobQueueInterface workerJob();
|
||||
|
||||
QueueInterface<WorkerTaskResult> workerTaskResult();
|
||||
|
||||
@@ -58,4 +57,6 @@ public interface QueueFactoryInterface {
|
||||
QueueInterface<SubflowExecutionResult> subflowExecutionResult();
|
||||
|
||||
QueueInterface<SubflowExecutionEnd> subflowExecutionEnd();
|
||||
|
||||
QueueInterface<ExecutionRunning> executionRunning();
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
|
||||
void delete(String consumerGroup, T message) throws QueueException;
|
||||
|
||||
default Runnable receive(Consumer<Either<T, DeserializationException>> consumer) {
|
||||
return receive((String) null, consumer);
|
||||
return receive(null, consumer, false);
|
||||
}
|
||||
|
||||
default Runnable receive(String consumerGroup, Consumer<Either<T, DeserializationException>> consumer) {
|
||||
|
||||
@@ -27,8 +27,6 @@ public class QueueService {
|
||||
return ((Executor) object).getExecution().getId();
|
||||
} else if (object.getClass() == MetricEntry.class) {
|
||||
return null;
|
||||
} else if (object.getClass() == ExecutionRunning.class) {
|
||||
return ((ExecutionRunning) object).getExecution().getId();
|
||||
} else if (object.getClass() == SubflowExecutionEnd.class) {
|
||||
return ((SubflowExecutionEnd) object).getParentExecutionId();
|
||||
} else {
|
||||
|
||||
@@ -0,0 +1,12 @@
|
||||
package io.kestra.core.queues;
|
||||
|
||||
import java.io.Serial;
|
||||
|
||||
public class UnsupportedMessageException extends QueueException {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public UnsupportedMessageException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
package io.kestra.core.queues;
|
||||
|
||||
import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.runners.WorkerJob;
|
||||
import io.kestra.core.utils.Either;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public interface WorkerJobQueueInterface extends QueueInterface<WorkerJob> {
|
||||
|
||||
Runnable subscribe(String workerId, String workerGroup, Consumer<Either<WorkerJob, DeserializationException>> consumer);
|
||||
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
package io.kestra.core.reporter;
|
||||
|
||||
public abstract class AbstractReportable<T extends Reportable.Event> implements Reportable<T> {
|
||||
|
||||
private final Type type;
|
||||
private final ReportingSchedule schedule;
|
||||
private final boolean isTenantSupported;
|
||||
|
||||
public AbstractReportable(Type type, ReportingSchedule schedule, boolean isTenantSupported) {
|
||||
this.type = type;
|
||||
this.schedule = schedule;
|
||||
this.isTenantSupported = isTenantSupported;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTenantSupported() {
|
||||
return isTenantSupported;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Type type() {
|
||||
return type;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReportingSchedule schedule() {
|
||||
return schedule;
|
||||
}
|
||||
}
|
||||
94
core/src/main/java/io/kestra/core/reporter/Reportable.java
Normal file
94
core/src/main/java/io/kestra/core/reporter/Reportable.java
Normal file
@@ -0,0 +1,94 @@
|
||||
package io.kestra.core.reporter;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
|
||||
/**
|
||||
* Interface for reporting server event for a specific type.
|
||||
*
|
||||
* @param <T>
|
||||
*/
|
||||
public interface Reportable<T extends Reportable.Event> {
|
||||
|
||||
/**
|
||||
* Gets the type of the event to report.
|
||||
*/
|
||||
Type type();
|
||||
|
||||
/**
|
||||
* Gets the reporting schedule.
|
||||
*/
|
||||
ReportingSchedule schedule();
|
||||
|
||||
/**
|
||||
* Generates a report for the given timestamp.
|
||||
*
|
||||
* @param now the time when the report is triggered.
|
||||
* @return an Optional containing the report data if available.
|
||||
*/
|
||||
T report(Instant now, TimeInterval interval);
|
||||
|
||||
default T report(Instant now) {
|
||||
ZonedDateTime to = now.atZone(ZoneId.systemDefault());
|
||||
ZonedDateTime from = to.minus(Duration.ofDays(1));
|
||||
return report(now, new TimeInterval(from, to));
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether this reportable is enabled for scheduled reporting.
|
||||
*/
|
||||
boolean isEnabled();
|
||||
|
||||
/**
|
||||
* Generates a report for the given timestamp and tenant.
|
||||
*
|
||||
* @param now the time when the report is triggered.
|
||||
* @param tenant the tenant for which the report is triggered.
|
||||
* @return the event to report.
|
||||
*/
|
||||
default T report(Instant now, TimeInterval interval, String tenant) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
default T report(Instant now, String tenant) {
|
||||
ZonedDateTime to = now.atZone(ZoneId.systemDefault());
|
||||
ZonedDateTime from = to.minus(Duration.ofDays(1));
|
||||
return report(now, new TimeInterval(from, to), tenant);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether this {@link Reportable} can accept a tenant.
|
||||
*
|
||||
* @return {@code true} a {@link #report(Instant, TimeInterval, String)} can called, Otherwise {@code false}.
|
||||
*/
|
||||
default boolean isTenantSupported() {
|
||||
return false;
|
||||
}
|
||||
|
||||
record TimeInterval(ZonedDateTime from, ZonedDateTime to){
|
||||
public static TimeInterval of(ZonedDateTime from, ZonedDateTime to) {
|
||||
return new TimeInterval(from, to);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Marker interface indicating that the returned event
|
||||
* must be a structured, domain-specific object
|
||||
* (not a primitive wrapper, String, collection, or other basic type).
|
||||
*/
|
||||
interface Event {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines the schedule for a report.
|
||||
*/
|
||||
interface ReportingSchedule {
|
||||
/**
|
||||
* Determines whether a report should run at the given instant.
|
||||
*/
|
||||
boolean shouldRun(Instant now);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
package io.kestra.core.reporter;
|
||||
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class ReportableRegistry {
|
||||
|
||||
private final Map<Type, Reportable<?>> reportables = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* Creates a new {@link ReportableRegistry} instance.
|
||||
*
|
||||
* @param reportables The {@link Reportable reportables}
|
||||
*/
|
||||
@Inject
|
||||
public ReportableRegistry(final List<Reportable<?>> reportables) {
|
||||
reportables.forEach(reportable -> this.reportables.put(reportable.type(), reportable));
|
||||
}
|
||||
|
||||
public void register(final Reportable<?> reportable) {
|
||||
Objects.requireNonNull(reportable, "reportable must not be null");
|
||||
if (reportables.containsKey(reportable.type())) {
|
||||
log.warn("Event already registered for type '{}'", reportable.type());
|
||||
} else {
|
||||
reportables.put(reportable.type(), reportable);
|
||||
}
|
||||
}
|
||||
|
||||
public List<Reportable<?>> getAll() {
|
||||
return List.copyOf(reportables.values());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,43 @@
|
||||
package io.kestra.core.reporter;
|
||||
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
import io.micronaut.scheduling.annotation.Scheduled;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.time.Clock;
|
||||
import java.time.Instant;
|
||||
|
||||
@Singleton
|
||||
@Requires(property = "kestra.anonymous-usage-report.enabled", value = "true")
|
||||
@Requires(property = "kestra.server-type")
|
||||
@Slf4j
|
||||
public class ReportableScheduler {
|
||||
|
||||
private final ReportableRegistry registry;
|
||||
private final ServerEventSender sender;
|
||||
private final Clock clock;
|
||||
|
||||
@Inject
|
||||
public ReportableScheduler(ReportableRegistry registry, ServerEventSender sender) {
|
||||
this.registry = registry;
|
||||
this.sender = sender;
|
||||
this.clock = Clock.systemDefaultZone();
|
||||
}
|
||||
|
||||
@Scheduled(fixedDelay = "5m", initialDelay = "${kestra.anonymous-usage-report.initial-delay}")
|
||||
public void tick() {
|
||||
Instant now = clock.instant();
|
||||
for (Reportable<?> r : registry.getAll()) {
|
||||
if (r.isEnabled() && r.schedule().shouldRun(now)) {
|
||||
try {
|
||||
Object value = r.report(now);
|
||||
if (value != null) sender.send(now, r.type(), value);
|
||||
} catch (Exception e) {
|
||||
log.debug("Failed to send report for event-type '{}'", r.type(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
57
core/src/main/java/io/kestra/core/reporter/Schedules.java
Normal file
57
core/src/main/java/io/kestra/core/reporter/Schedules.java
Normal file
@@ -0,0 +1,57 @@
|
||||
package io.kestra.core.reporter;
|
||||
|
||||
import io.kestra.core.reporter.Reportable.ReportingSchedule;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
|
||||
/**
|
||||
* Utility class providing common implementations of {@link Reportable.ReportingSchedule}.
|
||||
*/
|
||||
public class Schedules {
|
||||
|
||||
/**
|
||||
* Creates a reporting schedule that triggers after the specified period has elapsed
|
||||
* since the last execution.
|
||||
*
|
||||
* @param period the duration between successive runs; must be positive
|
||||
* @return a {@link Reportable.ReportingSchedule} that runs at the given interval
|
||||
* @throws IllegalArgumentException if {@code period} is zero or negative
|
||||
*/
|
||||
public static ReportingSchedule every(final Duration period) {
|
||||
if (period.isZero() || period.isNegative()) {
|
||||
throw new IllegalArgumentException("Period must be positive");
|
||||
}
|
||||
|
||||
return new ReportingSchedule() {
|
||||
private Instant lastRun = Instant.EPOCH;
|
||||
|
||||
@Override
|
||||
public boolean shouldRun(Instant now) {
|
||||
if (Duration.between(lastRun, now).compareTo(period) >= 0) {
|
||||
lastRun = now;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a reporting schedule that triggers once every hour.
|
||||
*
|
||||
* @return a schedule running every 1 hour
|
||||
*/
|
||||
public static ReportingSchedule hourly() {
|
||||
return every(Duration.ofHours(1));
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a reporting schedule that triggers once every day.
|
||||
*
|
||||
* @return a schedule running every 24 hours
|
||||
*/
|
||||
public static ReportingSchedule daily() {
|
||||
return every(Duration.ofDays(1));
|
||||
}
|
||||
}
|
||||
31
core/src/main/java/io/kestra/core/reporter/ServerEvent.java
Normal file
31
core/src/main/java/io/kestra/core/reporter/ServerEvent.java
Normal file
@@ -0,0 +1,31 @@
|
||||
package io.kestra.core.reporter;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonUnwrapped;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import lombok.Builder;
|
||||
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
|
||||
/**
|
||||
* Represents a Kestra Server Event.
|
||||
*/
|
||||
@Builder(toBuilder = true)
|
||||
public record ServerEvent(
|
||||
String instanceUuid,
|
||||
String sessionUuid,
|
||||
ServerType serverType,
|
||||
String serverVersion,
|
||||
ZoneId zoneId,
|
||||
Object payload,
|
||||
String uuid,
|
||||
ZonedDateTime reportedAt
|
||||
) {
|
||||
|
||||
@JsonUnwrapped
|
||||
public Object payload() {
|
||||
return payload;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,91 @@
|
||||
package io.kestra.core.reporter;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.kestra.core.contexts.KestraContext;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.models.collectors.Result;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.services.InstanceService;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.VersionProvider;
|
||||
import io.micronaut.context.annotation.Value;
|
||||
import io.micronaut.core.type.Argument;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
import io.micronaut.http.MutableHttpRequest;
|
||||
import io.micronaut.http.client.annotation.Client;
|
||||
import io.micronaut.http.client.exceptions.HttpClientResponseException;
|
||||
import io.micronaut.http.hateoas.JsonError;
|
||||
import io.micronaut.reactor.http.client.ReactorHttpClient;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.net.URI;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.util.UUID;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class ServerEventSender {
|
||||
|
||||
private static final String SESSION_UUID = IdUtils.create();
|
||||
private static final ObjectMapper OBJECT_MAPPER = JacksonMapper.ofJson();
|
||||
|
||||
@Inject
|
||||
@Client
|
||||
private ReactorHttpClient client;
|
||||
|
||||
@Inject
|
||||
private VersionProvider versionProvider;
|
||||
|
||||
@Inject
|
||||
private InstanceService instanceService;
|
||||
|
||||
private final ServerType serverType;
|
||||
|
||||
@Value("${kestra.anonymous-usage-report.uri}")
|
||||
protected URI url;
|
||||
|
||||
public ServerEventSender( ) {
|
||||
this.serverType = KestraContext.getContext().getServerType();
|
||||
}
|
||||
|
||||
public void send(final Instant now, final Type type, Object event) {
|
||||
ServerEvent serverEvent = ServerEvent
|
||||
.builder()
|
||||
.uuid(UUID.randomUUID().toString())
|
||||
.sessionUuid(SESSION_UUID)
|
||||
.instanceUuid(instanceService.fetch())
|
||||
.serverType(serverType)
|
||||
.serverVersion(versionProvider.getVersion())
|
||||
.reportedAt(now.atZone(ZoneId.systemDefault()))
|
||||
.payload(event)
|
||||
.zoneId(ZoneId.systemDefault())
|
||||
.build();
|
||||
try {
|
||||
MutableHttpRequest<ServerEvent> request = this.request(serverEvent, type);
|
||||
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Report anonymous usage: '{}'", OBJECT_MAPPER.writeValueAsString(serverEvent));
|
||||
}
|
||||
|
||||
this.handleResponse(client.toBlocking().retrieve(request, Argument.of(Result.class), Argument.of(JsonError.class)));
|
||||
} catch (HttpClientResponseException t) {
|
||||
log.trace("Unable to report anonymous usage with body '{}'", t.getResponse().getBody(String.class), t);
|
||||
} catch (Exception t) {
|
||||
log.trace("Unable to handle anonymous usage", t);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleResponse (Result result){
|
||||
|
||||
}
|
||||
|
||||
protected MutableHttpRequest<ServerEvent> request(ServerEvent event, Type type) throws Exception {
|
||||
URI baseUri = URI.create(this.url.toString().endsWith("/") ? this.url.toString() : this.url + "/");
|
||||
URI resolvedUri = baseUri.resolve(type.name().toLowerCase());
|
||||
return HttpRequest.POST(resolvedUri, event)
|
||||
.header("User-Agent", "Kestra/" + versionProvider.getVersion());
|
||||
}
|
||||
}
|
||||
9
core/src/main/java/io/kestra/core/reporter/Type.java
Normal file
9
core/src/main/java/io/kestra/core/reporter/Type.java
Normal file
@@ -0,0 +1,9 @@
|
||||
package io.kestra.core.reporter;
|
||||
|
||||
/**
|
||||
* A reportable event type.
|
||||
*/
|
||||
public interface Type {
|
||||
|
||||
String name();
|
||||
}
|
||||
12
core/src/main/java/io/kestra/core/reporter/Types.java
Normal file
12
core/src/main/java/io/kestra/core/reporter/Types.java
Normal file
@@ -0,0 +1,12 @@
|
||||
package io.kestra.core.reporter;
|
||||
|
||||
/**
|
||||
* All supported reportable event type.
|
||||
*/
|
||||
public enum Types implements Type {
|
||||
USAGE,
|
||||
SYSTEM_INFORMATION,
|
||||
PLUGIN_METRICS,
|
||||
SERVICE_USAGE,
|
||||
PLUGIN_USAGE;
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
package io.kestra.core.reporter.model;
|
||||
|
||||
public record Count(
|
||||
long count
|
||||
) {
|
||||
}
|
||||
@@ -0,0 +1,80 @@
|
||||
package io.kestra.core.reporter.reports;
|
||||
|
||||
import io.kestra.core.contexts.KestraContext;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.models.collectors.ExecutionUsage;
|
||||
import io.kestra.core.models.collectors.FlowUsage;
|
||||
import io.kestra.core.reporter.AbstractReportable;
|
||||
import io.kestra.core.reporter.Schedules;
|
||||
import io.kestra.core.reporter.Types;
|
||||
import io.kestra.core.reporter.model.Count;
|
||||
import io.kestra.core.repositories.DashboardRepositoryInterface;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.Getter;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import lombok.extern.jackson.Jacksonized;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Objects;
|
||||
|
||||
@Singleton
|
||||
public class FeatureUsageReport extends AbstractReportable<FeatureUsageReport.UsageEvent> {
|
||||
|
||||
private final FlowRepositoryInterface flowRepository;
|
||||
private final ExecutionRepositoryInterface executionRepository;
|
||||
private final DashboardRepositoryInterface dashboardRepository;
|
||||
private final boolean enabled;
|
||||
|
||||
@Inject
|
||||
public FeatureUsageReport(FlowRepositoryInterface flowRepository,
|
||||
ExecutionRepositoryInterface executionRepository,
|
||||
DashboardRepositoryInterface dashboardRepository) {
|
||||
super(Types.USAGE, Schedules.hourly(), true);
|
||||
this.flowRepository = flowRepository;
|
||||
this.executionRepository = executionRepository;
|
||||
this.dashboardRepository = dashboardRepository;
|
||||
|
||||
ServerType serverType = KestraContext.getContext().getServerType();
|
||||
this.enabled = ServerType.EXECUTOR.equals(serverType) || ServerType.STANDALONE.equals(serverType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public UsageEvent report(final Instant now, TimeInterval interval) {
|
||||
return UsageEvent
|
||||
.builder()
|
||||
.flows(FlowUsage.of(flowRepository))
|
||||
.executions(ExecutionUsage.of(executionRepository, interval.from(), interval.to()))
|
||||
.dashboards(new Count(dashboardRepository.count()))
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEnabled() {
|
||||
return enabled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public UsageEvent report(Instant now, TimeInterval interval, String tenant) {
|
||||
Objects.requireNonNull(tenant, "tenant is null");
|
||||
Objects.requireNonNull(interval, "interval is null");
|
||||
return UsageEvent
|
||||
.builder()
|
||||
.flows(FlowUsage.of(tenant, flowRepository))
|
||||
.executions(ExecutionUsage.of(tenant, executionRepository, interval.from(), interval.to()))
|
||||
.build();
|
||||
}
|
||||
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@Jacksonized
|
||||
@Introspected
|
||||
public static class UsageEvent implements Event {
|
||||
private ExecutionUsage executions;
|
||||
private FlowUsage flows;
|
||||
private Count dashboards;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,105 @@
|
||||
package io.kestra.core.reporter.reports;
|
||||
|
||||
import io.kestra.core.contexts.KestraContext;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.models.collectors.PluginMetric;
|
||||
import io.kestra.core.plugins.PluginRegistry;
|
||||
import io.kestra.core.reporter.AbstractReportable;
|
||||
import io.kestra.core.reporter.Schedules;
|
||||
import io.kestra.core.reporter.Types;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.micrometer.core.instrument.Timer;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.Builder;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Singleton
|
||||
public class PluginMetricReport extends AbstractReportable<PluginMetricReport.PluginMetricEvent> {
|
||||
|
||||
private final PluginRegistry pluginRegistry;
|
||||
private final MetricRegistry metricRegistry;
|
||||
private final boolean enabled;
|
||||
|
||||
@Inject
|
||||
public PluginMetricReport(PluginRegistry pluginRegistry,
|
||||
MetricRegistry metricRegistry) {
|
||||
super(Types.PLUGIN_METRICS, Schedules.daily(), false);
|
||||
this.metricRegistry = metricRegistry;
|
||||
this.pluginRegistry = pluginRegistry;
|
||||
|
||||
ServerType serverType = KestraContext.getContext().getServerType();
|
||||
this.enabled = ServerType.SCHEDULER.equals(serverType) || ServerType.WORKER.equals(serverType) || ServerType.STANDALONE.equals(serverType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PluginMetricEvent report(final Instant now, final TimeInterval period) {
|
||||
return PluginMetricEvent
|
||||
.builder()
|
||||
.pluginMetrics(pluginMetrics())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEnabled() {
|
||||
return enabled;
|
||||
}
|
||||
|
||||
@Builder
|
||||
@Introspected
|
||||
public record PluginMetricEvent (
|
||||
List<PluginMetric> pluginMetrics
|
||||
) implements Event {
|
||||
}
|
||||
|
||||
private List<PluginMetric> pluginMetrics() {
|
||||
List<PluginMetric> taskMetrics = pluginRegistry.plugins().stream()
|
||||
.flatMap(registeredPlugin -> registeredPlugin.getTasks().stream())
|
||||
.map(Class::getName)
|
||||
.map(this::taskMetric)
|
||||
.flatMap(Optional::stream)
|
||||
.toList();
|
||||
|
||||
List<PluginMetric> triggerMetrics = pluginRegistry.plugins().stream()
|
||||
.flatMap(registeredPlugin -> registeredPlugin.getTriggers().stream())
|
||||
.map(Class::getName)
|
||||
.map(this::triggerMetric)
|
||||
.flatMap(Optional::stream)
|
||||
.toList();
|
||||
|
||||
return ListUtils.concat(taskMetrics, triggerMetrics);
|
||||
}
|
||||
|
||||
private Optional<PluginMetric> taskMetric(String type) {
|
||||
Timer duration = metricRegistry.find(MetricRegistry.METRIC_WORKER_ENDED_DURATION).tag(MetricRegistry.TAG_TASK_TYPE, type).timer();
|
||||
return fromTimer(type, duration);
|
||||
}
|
||||
|
||||
private Optional<PluginMetric> triggerMetric(String type) {
|
||||
Timer duration = metricRegistry.find(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION).tag(MetricRegistry.TAG_TRIGGER_TYPE, type).timer();
|
||||
|
||||
if (duration == null) {
|
||||
// this may be because this is a trigger executed by the scheduler, we search there instead
|
||||
duration = metricRegistry.find(MetricRegistry.METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION).tag(MetricRegistry.TAG_TRIGGER_TYPE, type).timer();
|
||||
}
|
||||
return fromTimer(type, duration);
|
||||
}
|
||||
|
||||
private Optional<PluginMetric> fromTimer(String type, Timer timer) {
|
||||
if (timer == null || timer.count() == 0) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
double count = timer.count();
|
||||
double totalTime = timer.totalTime(TimeUnit.MILLISECONDS);
|
||||
double meanTime = timer.mean(TimeUnit.MILLISECONDS);
|
||||
|
||||
return Optional.of(new PluginMetric(type, count, totalTime, meanTime));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,51 @@
|
||||
package io.kestra.core.reporter.reports;
|
||||
|
||||
import io.kestra.core.contexts.KestraContext;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.models.collectors.PluginUsage;
|
||||
import io.kestra.core.plugins.PluginRegistry;
|
||||
import io.kestra.core.reporter.AbstractReportable;
|
||||
import io.kestra.core.reporter.Schedules;
|
||||
import io.kestra.core.reporter.Types;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.Builder;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
|
||||
@Singleton
|
||||
public class PluginUsageReport extends AbstractReportable<PluginUsageReport.PluginUsageEvent> {
|
||||
|
||||
private final PluginRegistry pluginRegistry;
|
||||
private final boolean enabled;
|
||||
@Inject
|
||||
public PluginUsageReport(PluginRegistry pluginRegistry) {
|
||||
super(Types.PLUGIN_USAGE, Schedules.daily(), false);
|
||||
this.pluginRegistry = pluginRegistry;
|
||||
|
||||
ServerType serverType = KestraContext.getContext().getServerType();
|
||||
this.enabled = ServerType.EXECUTOR.equals(serverType) || ServerType.STANDALONE.equals(serverType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PluginUsageEvent report(final Instant now, final TimeInterval period) {
|
||||
return PluginUsageEvent
|
||||
.builder()
|
||||
.plugins(PluginUsage.of(pluginRegistry))
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEnabled() {
|
||||
return enabled;
|
||||
}
|
||||
|
||||
@Builder
|
||||
@Introspected
|
||||
public record PluginUsageEvent(
|
||||
List<PluginUsage> plugins
|
||||
) implements Event {
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
package io.kestra.core.reporter.reports;
|
||||
|
||||
import io.kestra.core.contexts.KestraContext;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.models.collectors.ServiceUsage;
|
||||
import io.kestra.core.reporter.AbstractReportable;
|
||||
import io.kestra.core.reporter.Schedules;
|
||||
import io.kestra.core.reporter.Types;
|
||||
import io.kestra.core.repositories.ServiceInstanceRepositoryInterface;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.Builder;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
|
||||
@Singleton
|
||||
public class ServiceUsageReport extends AbstractReportable<ServiceUsageReport.ServiceUsageEvent> {
|
||||
|
||||
private final ServiceInstanceRepositoryInterface serviceInstanceRepository;
|
||||
private final boolean isEnabled;
|
||||
|
||||
@Inject
|
||||
public ServiceUsageReport(ServiceInstanceRepositoryInterface serviceInstanceRepository) {
|
||||
super(Types.SERVICE_USAGE, Schedules.daily(), false);
|
||||
this.serviceInstanceRepository = serviceInstanceRepository;
|
||||
|
||||
ServerType serverType = KestraContext.getContext().getServerType();
|
||||
this.isEnabled = ServerType.STANDALONE.equals(serverType) || ServerType.EXECUTOR.equals(serverType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServiceUsageEvent report(final Instant now, final TimeInterval period) {
|
||||
|
||||
return ServiceUsageEvent
|
||||
.builder()
|
||||
.services(ServiceUsage.of(period.from().toInstant(), period.to().toInstant(), serviceInstanceRepository, Duration.ofMinutes(5)))
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEnabled() {
|
||||
return isEnabled;
|
||||
}
|
||||
|
||||
@Builder
|
||||
@Introspected
|
||||
public record ServiceUsageEvent(
|
||||
ServiceUsage services
|
||||
) implements Event {
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,63 @@
|
||||
package io.kestra.core.reporter.reports;
|
||||
|
||||
import io.kestra.core.models.collectors.ConfigurationUsage;
|
||||
import io.kestra.core.models.collectors.HostUsage;
|
||||
import io.kestra.core.reporter.AbstractReportable;
|
||||
import io.kestra.core.reporter.Schedules;
|
||||
import io.kestra.core.reporter.Types;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.Builder;
|
||||
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.time.Instant;
|
||||
import java.util.Set;
|
||||
|
||||
@Singleton
|
||||
public class SystemInformationReport extends AbstractReportable<SystemInformationReport.SystemInformationEvent> {
|
||||
|
||||
private final Environment environment;
|
||||
private final ApplicationContext applicationContext;
|
||||
private final String kestraUrl;
|
||||
private final Instant startTime;
|
||||
|
||||
@Inject
|
||||
public SystemInformationReport(ApplicationContext applicationContext) {
|
||||
super(Types.SYSTEM_INFORMATION, Schedules.daily(), false);
|
||||
this.environment = applicationContext.getEnvironment();
|
||||
this.applicationContext = applicationContext;
|
||||
this.kestraUrl = applicationContext.getProperty("kestra.url", String.class).orElse(null);
|
||||
this.startTime = Instant.ofEpochMilli(ManagementFactory.getRuntimeMXBean().getStartTime());
|
||||
}
|
||||
|
||||
@Override
|
||||
public SystemInformationEvent report(final Instant now, final TimeInterval timeInterval) {
|
||||
return SystemInformationEvent
|
||||
.builder()
|
||||
.environments(environment.getActiveNames())
|
||||
.configurations(ConfigurationUsage.of(applicationContext))
|
||||
.startTime(startTime)
|
||||
.host(HostUsage.of())
|
||||
.uri(kestraUrl)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEnabled() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Builder
|
||||
@Introspected
|
||||
public record SystemInformationEvent(
|
||||
Set<String> environments,
|
||||
HostUsage host,
|
||||
ConfigurationUsage configurations,
|
||||
Instant startTime,
|
||||
String uri
|
||||
) implements Event {
|
||||
}
|
||||
}
|
||||
@@ -16,6 +16,14 @@ import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
public interface DashboardRepositoryInterface {
|
||||
|
||||
/**
|
||||
* Gets the total number of Dashboards.
|
||||
*
|
||||
* @return the total number.
|
||||
*/
|
||||
long count();
|
||||
|
||||
Boolean isEnabled();
|
||||
|
||||
Optional<Dashboard> get(String tenantId, String id);
|
||||
|
||||
@@ -161,7 +161,7 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
|
||||
}
|
||||
|
||||
List<Execution> lastExecutions(
|
||||
@Nullable String tenantId,
|
||||
String tenantId,
|
||||
@Nullable List<FlowFilter> flows
|
||||
);
|
||||
}
|
||||
|
||||
@@ -81,9 +81,7 @@ public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry
|
||||
|
||||
Flux<LogEntry> findAsync(
|
||||
@Nullable String tenantId,
|
||||
@Nullable String namespace,
|
||||
@Nullable Level minLevel,
|
||||
ZonedDateTime startDate
|
||||
List<QueryFilter> filters
|
||||
);
|
||||
|
||||
Flux<LogEntry> findAllAsync(@Nullable String tenantId);
|
||||
@@ -96,5 +94,7 @@ public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry
|
||||
|
||||
void deleteByQuery(String tenantId, String namespace, String flowId, String triggerId);
|
||||
|
||||
void deleteByFilters(String tenantId, List<QueryFilter> filters);
|
||||
|
||||
int deleteByQuery(String tenantId, String namespace, String flowId, String executionId, List<Level> logLevels, ZonedDateTime startDate, ZonedDateTime endDate);
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import io.kestra.core.utils.Exceptions;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.Synchronized;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
@@ -32,6 +33,7 @@ public abstract class AbstractWorkerCallable implements Callable<State.Type> {
|
||||
String uid;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
Throwable exception;
|
||||
|
||||
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
|
||||
@@ -80,7 +82,7 @@ public abstract class AbstractWorkerCallable implements Callable<State.Type> {
|
||||
*
|
||||
* @see WorkerJobLifecycle#stop()
|
||||
*/
|
||||
protected abstract void signalStop();
|
||||
public abstract void signalStop();
|
||||
|
||||
/**
|
||||
* Wait for this worker task to complete stopping.
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
@@ -11,7 +12,7 @@ import lombok.With;
|
||||
@Value
|
||||
@AllArgsConstructor
|
||||
@Builder
|
||||
public class ExecutionRunning {
|
||||
public class ExecutionRunning implements HasUID {
|
||||
String tenantId;
|
||||
|
||||
@NotNull
|
||||
@@ -26,6 +27,7 @@ public class ExecutionRunning {
|
||||
@With
|
||||
ConcurrencyState concurrencyState;
|
||||
|
||||
@Override
|
||||
public String uid() {
|
||||
return IdUtils.fromPartsAndSeparator('|', this.tenantId, this.namespace, this.flowId, this.execution.getId());
|
||||
}
|
||||
|
||||
@@ -86,7 +86,7 @@ public class Executor {
|
||||
|
||||
public Boolean canBeProcessed() {
|
||||
return !(this.getException() != null || this.getFlow() == null || this.getFlow() instanceof FlowWithException || this.getFlow().getTasks() == null ||
|
||||
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint());
|
||||
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint() || this.getExecution().getState().isQueued());
|
||||
}
|
||||
|
||||
public Executor withFlow(FlowWithSource flow) {
|
||||
|
||||
@@ -102,49 +102,39 @@ public class ExecutorService {
|
||||
return this.flowExecutorInterface;
|
||||
}
|
||||
|
||||
public Executor checkConcurrencyLimit(Executor executor, FlowInterface flow, Execution execution, long count) {
|
||||
// if above the limit, handle concurrency limit based on its behavior
|
||||
if (count >= flow.getConcurrency().getLimit()) {
|
||||
public ExecutionRunning processExecutionRunning(FlowInterface flow, int runningCount, ExecutionRunning executionRunning) {
|
||||
// if concurrency was removed, it can be null as we always get the latest flow definition
|
||||
if (flow.getConcurrency() != null && runningCount >= flow.getConcurrency().getLimit()) {
|
||||
return switch (flow.getConcurrency().getBehavior()) {
|
||||
case QUEUE -> {
|
||||
var newExecution = execution.withState(State.Type.QUEUED);
|
||||
|
||||
ExecutionRunning executionRunning = ExecutionRunning.builder()
|
||||
.tenantId(flow.getTenantId())
|
||||
.namespace(flow.getNamespace())
|
||||
.flowId(flow.getId())
|
||||
.execution(newExecution)
|
||||
.concurrencyState(ExecutionRunning.ConcurrencyState.QUEUED)
|
||||
.build();
|
||||
|
||||
// when max concurrency is reached, we throttle the execution and stop processing
|
||||
logService.logExecution(
|
||||
newExecution,
|
||||
executionRunning.getExecution(),
|
||||
Level.INFO,
|
||||
"Flow is queued due to concurrency limit exceeded, {} running(s)",
|
||||
count
|
||||
"Execution is queued due to concurrency limit exceeded, {} running(s)",
|
||||
runningCount
|
||||
);
|
||||
// return the execution queued
|
||||
yield executor
|
||||
.withExecutionRunning(executionRunning)
|
||||
.withExecution(newExecution, "checkConcurrencyLimit");
|
||||
var newExecution = executionRunning.getExecution().withState(State.Type.QUEUED);
|
||||
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_QUEUED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_QUEUED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
|
||||
yield executionRunning
|
||||
.withExecution(newExecution)
|
||||
.withConcurrencyState(ExecutionRunning.ConcurrencyState.QUEUED);
|
||||
}
|
||||
case CANCEL ->
|
||||
executor.withExecution(execution.withState(State.Type.CANCELLED), "checkConcurrencyLimit");
|
||||
executionRunning
|
||||
.withExecution(executionRunning.getExecution().withState(State.Type.CANCELLED))
|
||||
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
|
||||
case FAIL ->
|
||||
executor.withException(new IllegalStateException("Flow is FAILED due to concurrency limit exceeded"), "checkConcurrencyLimit");
|
||||
executionRunning
|
||||
.withExecution(executionRunning.getExecution().failedExecutionFromExecutor(new IllegalStateException("Execution is FAILED due to concurrency limit exceeded")).getExecution())
|
||||
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
// if under the limit, update the executor with a RUNNING ExecutionRunning to track them
|
||||
var executionRunning = new ExecutionRunning(
|
||||
flow.getTenantId(),
|
||||
flow.getNamespace(),
|
||||
flow.getId(),
|
||||
executor.getExecution(),
|
||||
ExecutionRunning.ConcurrencyState.RUNNING
|
||||
);
|
||||
return executor.withExecutionRunning(executionRunning);
|
||||
// if under the limit, run it!
|
||||
return executionRunning
|
||||
.withExecution(executionRunning.getExecution().withState(State.Type.RUNNING))
|
||||
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
|
||||
}
|
||||
|
||||
public Executor process(Executor executor) {
|
||||
@@ -247,9 +237,9 @@ public class ExecutorService {
|
||||
try {
|
||||
state = flowableParent.resolveState(runContext, execution, parentTaskRun);
|
||||
} catch (Exception e) {
|
||||
// This will lead to the next task being still executed but at least Kestra will not crash.
|
||||
// This will lead to the next task being still executed, but at least Kestra will not crash.
|
||||
// This is the best we can do, Flowable task should not fail, so it's a kind of panic mode.
|
||||
runContext.logger().error("Unable to resolve state from the Flowable task: " + e.getMessage(), e);
|
||||
runContext.logger().error("Unable to resolve state from the Flowable task: {}", e.getMessage(), e);
|
||||
state = Optional.of(State.Type.FAILED);
|
||||
}
|
||||
Optional<WorkerTaskResult> endedTask = childWorkerTaskTypeToWorkerTask(
|
||||
@@ -390,11 +380,9 @@ public class ExecutorService {
|
||||
|
||||
if (flow.getOutputs() != null) {
|
||||
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
|
||||
|
||||
try {
|
||||
Map<String, Object> outputs = flow.getOutputs()
|
||||
.stream()
|
||||
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
|
||||
outputs = runContext.render(outputs);
|
||||
Map<String, Object> outputs = FlowInputOutput.renderFlowOutputs(flow.getOutputs(), runContext);
|
||||
outputs = flowInputOutput.typedOutputs(flow, executor.getExecution(), outputs);
|
||||
newExecution = newExecution.withOutputs(outputs);
|
||||
} catch (Exception e) {
|
||||
@@ -599,6 +587,23 @@ public class ExecutorService {
|
||||
list = list.stream().filter(workerTaskResult -> !workerTaskResult.getTaskRun().getId().equals(taskRun.getParentTaskRunId()))
|
||||
.collect(Collectors.toCollection(ArrayList::new));
|
||||
}
|
||||
|
||||
// If the task is a flowable and its terminated, check that all children are terminated.
|
||||
// This may not be the case for parallel flowable tasks like Parallel, Dag, ForEach...
|
||||
// After a fail task, some child flowable may not be correctly terminated.
|
||||
if (task instanceof FlowableTask<?> && taskRun.getState().isTerminated()) {
|
||||
List<TaskRun> updated = executor.getExecution().findChildren(taskRun).stream()
|
||||
.filter(child -> !child.getState().isTerminated())
|
||||
.map(throwFunction(child -> child.withState(taskRun.getState().getCurrent())))
|
||||
.toList();
|
||||
if (!updated.isEmpty()) {
|
||||
Execution execution = executor.getExecution();
|
||||
for (TaskRun child : updated) {
|
||||
execution = execution.withTaskRun(child);
|
||||
}
|
||||
executor = executor.withExecution(execution, "handledTerminatedFlowableTasks");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
metricRegistry
|
||||
|
||||
@@ -2,7 +2,6 @@ package io.kestra.core.runners;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.encryption.EncryptionService;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.exceptions.KestraRuntimeException;
|
||||
@@ -12,11 +11,14 @@ import io.kestra.core.models.flows.DependsOn;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.flows.Output;
|
||||
import io.kestra.core.models.flows.RenderableInput;
|
||||
import io.kestra.core.models.flows.Type;
|
||||
import io.kestra.core.models.flows.input.FileInput;
|
||||
import io.kestra.core.models.flows.input.InputAndValue;
|
||||
import io.kestra.core.models.flows.input.ItemTypeInterface;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.property.PropertyContext;
|
||||
import io.kestra.core.models.property.URIFetcher;
|
||||
import io.kestra.core.models.tasks.common.EncryptedString;
|
||||
import io.kestra.core.models.validations.ManualConstraintViolation;
|
||||
@@ -75,16 +77,19 @@ public class FlowInputOutput {
|
||||
private final StorageInterface storageInterface;
|
||||
private final Optional<String> secretKey;
|
||||
private final RunContextFactory runContextFactory;
|
||||
private final VariableRenderer variableRenderer;
|
||||
|
||||
@Inject
|
||||
public FlowInputOutput(
|
||||
StorageInterface storageInterface,
|
||||
RunContextFactory runContextFactory,
|
||||
VariableRenderer variableRenderer,
|
||||
@Nullable @Value("${kestra.encryption.secret-key}") String secretKey
|
||||
) {
|
||||
this.storageInterface = storageInterface;
|
||||
this.runContextFactory = runContextFactory;
|
||||
this.secretKey = Optional.ofNullable(secretKey);
|
||||
this.variableRenderer = variableRenderer;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -249,11 +254,7 @@ public class FlowInputOutput {
|
||||
}
|
||||
|
||||
final Map<String, ResolvableInput> resolvableInputMap = Collections.unmodifiableMap(inputs.stream()
|
||||
.map(input -> {
|
||||
// get value or default
|
||||
Object value = Optional.ofNullable((Object) data.get(input.getId())).orElseGet(input::getDefaults);
|
||||
return ResolvableInput.of(input, value);
|
||||
})
|
||||
.map(input -> ResolvableInput.of(input,data.get(input.getId())))
|
||||
.collect(Collectors.toMap(it -> it.get().input().getId(), Function.identity(), (o1, o2) -> o1, LinkedHashMap::new)));
|
||||
|
||||
resolvableInputMap.values().forEach(input -> resolveInputValue(input, flow, execution, resolvableInputMap));
|
||||
@@ -312,8 +313,16 @@ public class FlowInputOutput {
|
||||
});
|
||||
resolvable.setInput(input);
|
||||
|
||||
|
||||
Object value = resolvable.get().value();
|
||||
|
||||
// resolve default if needed
|
||||
if (value == null && input.getDefaults() != null) {
|
||||
value = resolveDefaultValue(input, runContext);
|
||||
resolvable.isDefault(true);
|
||||
}
|
||||
|
||||
// validate and parse input value
|
||||
final Object value = resolvable.get().value();
|
||||
if (value == null) {
|
||||
if (input.getRequired()) {
|
||||
resolvable.resolveWithError(input.toConstraintViolationException("missing required input", null));
|
||||
@@ -341,7 +350,33 @@ public class FlowInputOutput {
|
||||
|
||||
return resolvable.get();
|
||||
}
|
||||
|
||||
|
||||
public static Object resolveDefaultValue(Input<?> input, PropertyContext renderer) throws IllegalVariableEvaluationException {
|
||||
return switch (input.getType()) {
|
||||
case STRING, ENUM, SELECT, SECRET, EMAIL -> resolveDefaultPropertyAs(input, renderer, String.class);
|
||||
case INT -> resolveDefaultPropertyAs(input, renderer, Integer.class);
|
||||
case FLOAT -> resolveDefaultPropertyAs(input, renderer, Float.class);
|
||||
case BOOLEAN, BOOL -> resolveDefaultPropertyAs(input, renderer, Boolean.class);
|
||||
case DATETIME -> resolveDefaultPropertyAs(input, renderer, Instant.class);
|
||||
case DATE -> resolveDefaultPropertyAs(input, renderer, LocalDate.class);
|
||||
case TIME -> resolveDefaultPropertyAs(input, renderer, LocalTime.class);
|
||||
case DURATION -> resolveDefaultPropertyAs(input, renderer, Duration.class);
|
||||
case FILE, URI -> resolveDefaultPropertyAs(input, renderer, URI.class);
|
||||
case JSON, YAML -> resolveDefaultPropertyAs(input, renderer, Object.class);
|
||||
case ARRAY -> resolveDefaultPropertyAsList(input, renderer, Object.class);
|
||||
case MULTISELECT -> resolveDefaultPropertyAsList(input, renderer, String.class);
|
||||
};
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static <T> Object resolveDefaultPropertyAs(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||
return Property.as((Property<T>) input.getDefaults(), renderer, clazz);
|
||||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
private static <T> Object resolveDefaultPropertyAsList(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||
return Property.asList((Property<List<T>>) input.getDefaults(), renderer, clazz);
|
||||
}
|
||||
|
||||
private RunContext buildRunContextForExecutionAndInputs(final FlowInterface flow, final Execution execution, Map<String, InputAndValue> dependencies) {
|
||||
Map<String, Object> flattenInputs = MapUtils.flattenToNestedMap(dependencies.entrySet()
|
||||
.stream()
|
||||
@@ -368,7 +403,7 @@ public class FlowInputOutput {
|
||||
final Map<String, Object> in
|
||||
) {
|
||||
if (flow.getOutputs() == null) {
|
||||
return ImmutableMap.of();
|
||||
return Map.of();
|
||||
}
|
||||
Map<String, Object> results = flow
|
||||
.getOutputs()
|
||||
@@ -376,6 +411,9 @@ public class FlowInputOutput {
|
||||
.map(output -> {
|
||||
Object current = in == null ? null : in.get(output.getId());
|
||||
try {
|
||||
if (current == null && Boolean.FALSE.equals(output.getRequired())) {
|
||||
return Optional.of(new AbstractMap.SimpleEntry<>(output.getId(), null));
|
||||
}
|
||||
return parseData(execution, output, current)
|
||||
.map(entry -> {
|
||||
if (output.getType().equals(Type.SECRET)) {
|
||||
@@ -406,7 +444,7 @@ public class FlowInputOutput {
|
||||
if (data.getType() == null) {
|
||||
return Optional.of(new AbstractMap.SimpleEntry<>(data.getId(), current));
|
||||
}
|
||||
|
||||
|
||||
final Type elementType = data instanceof ItemTypeInterface itemTypeInterface ? itemTypeInterface.getItemType() : null;
|
||||
|
||||
return Optional.of(new AbstractMap.SimpleEntry<>(
|
||||
@@ -483,6 +521,30 @@ public class FlowInputOutput {
|
||||
throw new Exception("Expected `" + type + "` but received `" + current + "` with errors:\n```\n" + e.getMessage() + "\n```");
|
||||
}
|
||||
}
|
||||
|
||||
public static Map<String, Object> renderFlowOutputs(List<Output> outputs, RunContext runContext) throws IllegalVariableEvaluationException {
|
||||
if (outputs == null) return Map.of();
|
||||
|
||||
// render required outputs
|
||||
Map<String, Object> outputsById = outputs
|
||||
.stream()
|
||||
.filter(output -> output.getRequired() == null || output.getRequired())
|
||||
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
|
||||
outputsById = runContext.render(outputsById);
|
||||
|
||||
// render optional outputs one by one to catch, log, and skip any error.
|
||||
for (io.kestra.core.models.flows.Output output : outputs) {
|
||||
if (Boolean.FALSE.equals(output.getRequired())) {
|
||||
try {
|
||||
outputsById.putAll(runContext.render(Map.of(output.getId(), output.getValue())));
|
||||
} catch (Exception e) {
|
||||
runContext.logger().warn("Failed to render optional flow output '{}'. Output is ignored.", output.getId(), e);
|
||||
outputsById.put(output.getId(), null);
|
||||
}
|
||||
}
|
||||
}
|
||||
return outputsById;
|
||||
}
|
||||
|
||||
/**
|
||||
* Mutable wrapper to hold a flow's input, and it's resolved value.
|
||||
@@ -511,22 +573,26 @@ public class FlowInputOutput {
|
||||
return input;
|
||||
}
|
||||
|
||||
public void isDefault(boolean isDefault) {
|
||||
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), isDefault, this.input.exception());
|
||||
}
|
||||
|
||||
public void setInput(final Input<?> input) {
|
||||
this.input = new InputAndValue(input, this.input.value(), this.input.enabled(), this.input.exception());
|
||||
this.input = new InputAndValue(input, this.input.value(), this.input.enabled(), this.input.isDefault(), this.input.exception());
|
||||
}
|
||||
|
||||
public void resolveWithEnabled(boolean enabled) {
|
||||
this.input = new InputAndValue(this.input.input(), input.value(), enabled, this.input.exception());
|
||||
this.input = new InputAndValue(this.input.input(), input.value(), enabled, this.input.isDefault(), this.input.exception());
|
||||
markAsResolved();
|
||||
}
|
||||
|
||||
public void resolveWithValue(@Nullable Object value) {
|
||||
this.input = new InputAndValue(this.input.input(), value, this.input.enabled(), this.input.exception());
|
||||
this.input = new InputAndValue(this.input.input(), value, this.input.enabled(), this.input.isDefault(), this.input.exception());
|
||||
markAsResolved();
|
||||
}
|
||||
|
||||
public void resolveWithError(@Nullable ConstraintViolationException exception) {
|
||||
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), exception);
|
||||
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), this.input.isDefault(), exception);
|
||||
markAsResolved();
|
||||
}
|
||||
|
||||
|
||||
@@ -286,18 +286,10 @@ public class FlowableUtils {
|
||||
|
||||
// start as many tasks as we have concurrency slots
|
||||
return collect.values().stream()
|
||||
.map(resolvedTasks -> filterCreated(resolvedTasks, taskRuns, parentTaskRun))
|
||||
.map(resolvedTasks -> resolveSequentialNexts(execution, resolvedTasks, null, null, parentTaskRun))
|
||||
.filter(resolvedTasks -> !resolvedTasks.isEmpty())
|
||||
.limit(concurrencySlots)
|
||||
.map(resolvedTasks -> resolvedTasks.getFirst().toNextTaskRun(execution))
|
||||
.toList();
|
||||
}
|
||||
|
||||
private static List<ResolvedTask> filterCreated(List<ResolvedTask> tasks, List<TaskRun> taskRuns, TaskRun parentTaskRun) {
|
||||
return tasks.stream()
|
||||
.filter(resolvedTask -> taskRuns.stream()
|
||||
.noneMatch(taskRun -> FlowableUtils.isTaskRunFor(resolvedTask, taskRun, parentTaskRun))
|
||||
)
|
||||
.map(resolvedTasks -> resolvedTasks.getFirst())
|
||||
.toList();
|
||||
}
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ import io.kestra.core.encryption.EncryptionService;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.executions.AbstractMetricEntry;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.property.PropertyContext;
|
||||
import io.kestra.core.storages.StateStore;
|
||||
import io.kestra.core.storages.Storage;
|
||||
import io.kestra.core.storages.kv.KVStore;
|
||||
@@ -18,7 +19,7 @@ import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
public abstract class RunContext {
|
||||
public abstract class RunContext implements PropertyContext {
|
||||
|
||||
/**
|
||||
* Returns the trigger execution id attached to this context.
|
||||
|
||||
@@ -9,6 +9,7 @@ import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.Type;
|
||||
import io.kestra.core.models.property.PropertyContext;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.plugins.PluginConfigurations;
|
||||
@@ -77,7 +78,7 @@ public class RunContextFactory {
|
||||
public RunContextInitializer initializer() {
|
||||
return applicationContext.getBean(RunContextInitializer.class);
|
||||
}
|
||||
|
||||
|
||||
public RunContext of(FlowInterface flow, Execution execution) {
|
||||
return of(flow, execution, Function.identity());
|
||||
}
|
||||
@@ -98,7 +99,7 @@ public class RunContextFactory {
|
||||
.withDecryptVariables(true)
|
||||
.withSecretInputs(secretInputsFromFlow(flow))
|
||||
)
|
||||
.build(runContextLogger))
|
||||
.build(runContextLogger, PropertyContext.create(variableRenderer)))
|
||||
.withSecretInputs(secretInputsFromFlow(flow))
|
||||
.build();
|
||||
}
|
||||
@@ -127,7 +128,7 @@ public class RunContextFactory {
|
||||
.withTaskRun(taskRun)
|
||||
.withDecryptVariables(decryptVariables)
|
||||
.withSecretInputs(secretInputsFromFlow(flow))
|
||||
.build(runContextLogger))
|
||||
.build(runContextLogger, PropertyContext.create(variableRenderer)))
|
||||
.withKvStoreService(kvStoreService)
|
||||
.withSecretInputs(secretInputsFromFlow(flow))
|
||||
.withTask(task)
|
||||
@@ -146,7 +147,7 @@ public class RunContextFactory {
|
||||
.withFlow(flow)
|
||||
.withTrigger(trigger)
|
||||
.withSecretInputs(secretInputsFromFlow(flow))
|
||||
.build(runContextLogger)
|
||||
.build(runContextLogger, PropertyContext.create(variableRenderer))
|
||||
)
|
||||
.withSecretInputs(secretInputsFromFlow(flow))
|
||||
.withTrigger(trigger)
|
||||
|
||||
@@ -4,15 +4,11 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import jakarta.validation.ConstraintViolation;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import jakarta.validation.Validator;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
|
||||
@@ -27,12 +23,19 @@ public class RunContextProperty<T> {
|
||||
private final RunContext runContext;
|
||||
private final Task task;
|
||||
private final AbstractTrigger trigger;
|
||||
|
||||
private final boolean skipCache;
|
||||
|
||||
RunContextProperty(Property<T> property, RunContext runContext) {
|
||||
this(property, runContext, false);
|
||||
}
|
||||
|
||||
RunContextProperty(Property<T> property, RunContext runContext, boolean skipCache) {
|
||||
this.property = property;
|
||||
this.runContext = runContext;
|
||||
this.task = ((DefaultRunContext) runContext).getTask();
|
||||
this.trigger = ((DefaultRunContext) runContext).getTrigger();
|
||||
this.skipCache = skipCache;
|
||||
}
|
||||
|
||||
private void validate() {
|
||||
@@ -45,6 +48,19 @@ public class RunContextProperty<T> {
|
||||
log.trace("Unable to do validation: no task or trigger found");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new {@link RunContextProperty} that will always be rendered by evaluating
|
||||
* its original Pebble expression, without using any previously cached value.
|
||||
* <p>
|
||||
* This ensures that each time the property is rendered, the underlying
|
||||
* expression is re-evaluated to produce a fresh result.
|
||||
*
|
||||
* @return a new {@link Property} that bypasses the cache
|
||||
*/
|
||||
public RunContextProperty<T> skipCache() {
|
||||
return new RunContextProperty<>(this.property, this.runContext, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Render a property then convert it to its target type and validate it.<br>
|
||||
@@ -55,13 +71,13 @@ public class RunContextProperty<T> {
|
||||
* Warning, due to the caching mechanism, this method is not thread-safe.
|
||||
*/
|
||||
public Optional<T> as(Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.as(prop, this.runContext, clazz)));
|
||||
|
||||
validate();
|
||||
return as;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Render a property with additional variables, then convert it to its target type and validate it.<br>
|
||||
*
|
||||
@@ -71,7 +87,7 @@ public class RunContextProperty<T> {
|
||||
* Warning, due to the caching mechanism, this method is not thread-safe.
|
||||
*/
|
||||
public Optional<T> as(Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.as(prop, this.runContext, clazz, variables)));
|
||||
|
||||
validate();
|
||||
@@ -89,7 +105,7 @@ public class RunContextProperty<T> {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public <I> T asList(Class<I> itemClazz) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz)))
|
||||
.orElse((T) Collections.emptyList());
|
||||
|
||||
@@ -108,7 +124,7 @@ public class RunContextProperty<T> {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public <I> T asList(Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz, variables)))
|
||||
.orElse((T) Collections.emptyList());
|
||||
|
||||
@@ -127,7 +143,7 @@ public class RunContextProperty<T> {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass)))
|
||||
.orElse((T) Collections.emptyMap());
|
||||
|
||||
@@ -146,11 +162,15 @@ public class RunContextProperty<T> {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
var as = Optional.ofNullable(this.property)
|
||||
var as = Optional.ofNullable(getProperty())
|
||||
.map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass, variables)))
|
||||
.orElse((T) Collections.emptyMap());
|
||||
|
||||
validate();
|
||||
return as;
|
||||
}
|
||||
|
||||
private Property<T> getProperty() {
|
||||
return skipCache ? this.property.skipCache() : this.property;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
@@ -9,6 +10,7 @@ import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.flows.input.SecretInput;
|
||||
import io.kestra.core.models.property.PropertyContext;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
@@ -138,10 +140,10 @@ public final class RunVariables {
|
||||
* @param logger The {@link RunContextLogger logger}
|
||||
* @return The immutable map of variables.
|
||||
*/
|
||||
Map<String, Object> build(final RunContextLogger logger);
|
||||
Map<String, Object> build(RunContextLogger logger, PropertyContext propertyContext);
|
||||
}
|
||||
|
||||
public record KestraConfiguration(String environment, String url) { }
|
||||
public record KestraConfiguration(String environment, String url) { }
|
||||
|
||||
/**
|
||||
* Default builder class for constructing variables.
|
||||
@@ -174,7 +176,7 @@ public final class RunVariables {
|
||||
|
||||
// Note: for performance reason, cloning maps should be avoided as much as possible.
|
||||
@Override
|
||||
public Map<String, Object> build(final RunContextLogger logger) {
|
||||
public Map<String, Object> build(final RunContextLogger logger, final PropertyContext propertyContext) {
|
||||
ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
|
||||
|
||||
builder.put("envs", envs != null ? envs : Map.of());
|
||||
@@ -280,9 +282,15 @@ public final class RunVariables {
|
||||
|
||||
if (flow != null && flow.getInputs() != null) {
|
||||
// we add default inputs value from the flow if not already set, this will be useful for triggers
|
||||
flow.getInputs().stream()
|
||||
.filter(input -> input.getDefaults() != null && !inputs.containsKey(input.getId()))
|
||||
.forEach(input -> inputs.put(input.getId(), input.getDefaults()));
|
||||
flow.getInputs().stream()
|
||||
.filter(input -> input.getDefaults() != null && !inputs.containsKey(input.getId()))
|
||||
.forEach(input -> {
|
||||
try {
|
||||
inputs.put(input.getId(), FlowInputOutput.resolveDefaultValue(input, propertyContext));
|
||||
} catch (IllegalVariableEvaluationException e) {
|
||||
throw new RuntimeException("Unable to inject default value for input '" + input.getId() + "'", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (!inputs.isEmpty()) {
|
||||
|
||||
@@ -85,7 +85,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.WORKERJOB_NAMED)
|
||||
private QueueInterface<WorkerJob> workerJobQueue;
|
||||
private WorkerJobQueueInterface workerJobQueue;
|
||||
|
||||
@Inject
|
||||
@Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED)
|
||||
@@ -274,12 +274,11 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
}
|
||||
}));
|
||||
|
||||
this.receiveCancellations.addFirst(this.workerJobQueue.receive(
|
||||
this.receiveCancellations.addFirst(this.workerJobQueue.subscribe(
|
||||
this.id,
|
||||
this.workerGroup,
|
||||
Worker.class,
|
||||
either -> {
|
||||
pendingJobCount.incrementAndGet();
|
||||
|
||||
executorService.execute(() -> {
|
||||
pendingJobCount.decrementAndGet();
|
||||
runningJobCount.incrementAndGet();
|
||||
@@ -764,6 +763,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
workerTask = workerTask.withTaskRun(workerTask.getTaskRun().withState(state));
|
||||
|
||||
WorkerTaskResult workerTaskResult = new WorkerTaskResult(workerTask.getTaskRun(), dynamicTaskRuns);
|
||||
|
||||
this.workerTaskResultQueue.emit(workerTaskResult);
|
||||
|
||||
// upload the cache file, hash may not be present if we didn't succeed in computing it
|
||||
@@ -796,6 +796,10 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
// If it's a message too big, we remove the outputs
|
||||
failed = failed.withOutputs(Variables.empty());
|
||||
}
|
||||
if (e instanceof UnsupportedMessageException) {
|
||||
// we expect the offending char is in the output so we remove it
|
||||
failed = failed.withOutputs(Variables.empty());
|
||||
}
|
||||
WorkerTaskResult workerTaskResult = new WorkerTaskResult(failed);
|
||||
RunContextLogger contextLogger = runContextLoggerFactory.create(workerTask);
|
||||
contextLogger.logger().error("Unable to emit the worker task result to the queue: {}", e.getMessage(), e);
|
||||
@@ -818,7 +822,11 @@ public class Worker implements Service, Runnable, AutoCloseable {
|
||||
private Optional<String> hashTask(RunContext runContext, Task task) {
|
||||
try {
|
||||
var map = JacksonMapper.toMap(task);
|
||||
var rMap = runContext.render(map);
|
||||
// If there are task provided variables, rendering the task may fail.
|
||||
// The best we can do is to add a fake 'workingDir' as it's an often added variables,
|
||||
// and it should not be part of the task hash.
|
||||
Map<String, Object> variables = Map.of("workingDir", "workingDir");
|
||||
var rMap = runContext.render(map, variables);
|
||||
var json = JacksonMapper.ofJson().writeValueAsBytes(rMap);
|
||||
MessageDigest digest = MessageDigest.getInstance("SHA-256");
|
||||
digest.update(json);
|
||||
|
||||
@@ -17,4 +17,6 @@ public interface WorkerJobRunningStateStore {
|
||||
* @param key the key of the worker job to be deleted.
|
||||
*/
|
||||
void deleteByKey(String key);
|
||||
|
||||
void put(WorkerJobRunning workerJobRunning);
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ public class WorkerTaskCallable extends AbstractWorkerCallable {
|
||||
@Getter
|
||||
Output taskOutput;
|
||||
|
||||
WorkerTaskCallable(WorkerTask workerTask, RunnableTask<?> task, RunContext runContext, MetricRegistry metricRegistry) {
|
||||
public WorkerTaskCallable(WorkerTask workerTask, RunnableTask<?> task, RunContext runContext, MetricRegistry metricRegistry) {
|
||||
super(runContext, task.getClass().getName(), workerTask.uid(), task.getClass().getClassLoader());
|
||||
this.workerTask = workerTask;
|
||||
this.task = task;
|
||||
|
||||
@@ -15,7 +15,7 @@ public class WorkerTriggerCallable extends AbstractWorkerTriggerCallable {
|
||||
@Getter
|
||||
Optional<Execution> evaluate;
|
||||
|
||||
WorkerTriggerCallable(RunContext runContext, WorkerTrigger workerTrigger, PollingTriggerInterface pollingTrigger) {
|
||||
public WorkerTriggerCallable(RunContext runContext, WorkerTrigger workerTrigger, PollingTriggerInterface pollingTrigger) {
|
||||
super(runContext, pollingTrigger.getClass().getName(), workerTrigger);
|
||||
this.pollingTrigger = pollingTrigger;
|
||||
}
|
||||
|
||||
@@ -16,7 +16,7 @@ public class WorkerTriggerRealtimeCallable extends AbstractWorkerTriggerCallable
|
||||
Consumer<? super Throwable> onError;
|
||||
Consumer<Execution> onNext;
|
||||
|
||||
WorkerTriggerRealtimeCallable(
|
||||
public WorkerTriggerRealtimeCallable(
|
||||
RunContext runContext,
|
||||
WorkerTrigger workerTrigger,
|
||||
RealtimeTriggerInterface realtimeTrigger,
|
||||
|
||||
@@ -102,6 +102,19 @@ public abstract class AbstractDate {
|
||||
}
|
||||
|
||||
if (value instanceof Long longValue) {
|
||||
if(value.toString().length() == 13) {
|
||||
return Instant.ofEpochMilli(longValue).atZone(zoneId);
|
||||
}else if(value.toString().length() == 19 ){
|
||||
if(value.toString().endsWith("000")){
|
||||
long seconds = longValue/1_000_000_000;
|
||||
int nanos = (int) (longValue%1_000_000_000);
|
||||
return Instant.ofEpochSecond(seconds,nanos).atZone(zoneId);
|
||||
}else{
|
||||
long milliseconds = longValue/1_000_000;
|
||||
int micros = (int) (longValue%1_000_000);
|
||||
return Instant.ofEpochMilli(milliseconds).atZone(zoneId).withNano(micros*1000);
|
||||
}
|
||||
}
|
||||
return Instant.ofEpochSecond(longValue).atZone(zoneId);
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ import io.kestra.core.events.CrudEventType;
|
||||
import io.kestra.core.exceptions.DeserializationException;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
@@ -29,10 +30,7 @@ import io.kestra.core.server.Service;
|
||||
import io.kestra.core.server.ServiceStateChangeEvent;
|
||||
import io.kestra.core.server.ServiceType;
|
||||
import io.kestra.core.services.*;
|
||||
import io.kestra.core.utils.Await;
|
||||
import io.kestra.core.utils.Either;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.*;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.event.ApplicationEventPublisher;
|
||||
import io.micronaut.core.util.CollectionUtils;
|
||||
@@ -91,7 +89,9 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
private volatile Boolean isReady = false;
|
||||
|
||||
private final ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||
private ScheduledFuture<?> scheduledFuture;
|
||||
private final ScheduledExecutorService executionMonitorExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||
private ScheduledFuture<?> executionMonitorFuture;
|
||||
|
||||
@Getter
|
||||
protected SchedulerTriggerStateInterface triggerState;
|
||||
@@ -152,7 +152,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
this.flowListeners.run();
|
||||
this.flowListeners.listen(this::initializedTriggers);
|
||||
|
||||
ScheduledFuture<?> evaluationLoop = scheduleExecutor.scheduleAtFixedRate(
|
||||
scheduledFuture = scheduleExecutor.scheduleAtFixedRate(
|
||||
this::handle,
|
||||
0,
|
||||
1,
|
||||
@@ -162,10 +162,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
// look at exception on the evaluation loop thread
|
||||
Thread.ofVirtual().name("scheduler-evaluation-loop-watch").start(
|
||||
() -> {
|
||||
Await.until(evaluationLoop::isDone);
|
||||
Await.until(scheduledFuture::isDone);
|
||||
|
||||
try {
|
||||
evaluationLoop.get();
|
||||
scheduledFuture.get();
|
||||
} catch (CancellationException ignored) {
|
||||
|
||||
} catch (ExecutionException | InterruptedException e) {
|
||||
@@ -177,7 +177,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
);
|
||||
|
||||
// Periodically report metrics and logs of running executions
|
||||
ScheduledFuture<?> monitoringLoop = executionMonitorExecutor.scheduleWithFixedDelay(
|
||||
executionMonitorFuture = executionMonitorExecutor.scheduleWithFixedDelay(
|
||||
this::executionMonitor,
|
||||
30,
|
||||
10,
|
||||
@@ -187,10 +187,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
// look at exception on the monitoring loop thread
|
||||
Thread.ofVirtual().name("scheduler-monitoring-loop-watch").start(
|
||||
() -> {
|
||||
Await.until(monitoringLoop::isDone);
|
||||
Await.until(executionMonitorFuture::isDone);
|
||||
|
||||
try {
|
||||
monitoringLoop.get();
|
||||
executionMonitorFuture.get();
|
||||
} catch (CancellationException ignored) {
|
||||
|
||||
} catch (ExecutionException | InterruptedException e) {
|
||||
@@ -318,7 +318,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
}
|
||||
|
||||
synchronized (this) { // we need a sync block as we read then update so we should not do it in multiple threads concurrently
|
||||
List<Trigger> triggers = triggerState.findAllForAllTenants();
|
||||
Map<String, Trigger> triggers = triggerState.findAllForAllTenants().stream().collect(Collectors.toMap(HasUID::uid, Function.identity()));
|
||||
|
||||
flows
|
||||
.stream()
|
||||
@@ -328,7 +328,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
.flatMap(flow -> flow.getTriggers().stream().filter(trigger -> trigger instanceof WorkerTriggerInterface).map(trigger -> new FlowAndTrigger(flow, trigger)))
|
||||
.distinct()
|
||||
.forEach(flowAndTrigger -> {
|
||||
Optional<Trigger> trigger = triggers.stream().filter(t -> t.uid().equals(Trigger.uid(flowAndTrigger.flow(), flowAndTrigger.trigger()))).findFirst(); // must have one or none
|
||||
String triggerUid = Trigger.uid(flowAndTrigger.flow(), flowAndTrigger.trigger());
|
||||
Optional<Trigger> trigger = Optional.ofNullable(triggers.get(triggerUid));
|
||||
if (trigger.isEmpty()) {
|
||||
RunContext runContext = runContextFactory.of(flowAndTrigger.flow(), flowAndTrigger.trigger());
|
||||
ConditionContext conditionContext = conditionService.conditionContext(runContext, flowAndTrigger.flow(), null);
|
||||
@@ -467,9 +468,12 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
|
||||
private List<FlowWithTriggers> computeSchedulable(List<FlowWithSource> flows, List<Trigger> triggerContextsToEvaluate, ScheduleContextInterface scheduleContext) {
|
||||
List<String> flowToKeep = triggerContextsToEvaluate.stream().map(Trigger::getFlowId).toList();
|
||||
List<String> flowIds = flows.stream().map(FlowId::uidWithoutRevision).toList();
|
||||
Map<String, Trigger> triggerById = triggerContextsToEvaluate.stream().collect(Collectors.toMap(HasUID::uid, Function.identity()));
|
||||
|
||||
// delete trigger which flow has been deleted
|
||||
triggerContextsToEvaluate.stream()
|
||||
.filter(trigger -> !flows.stream().map(FlowId::uidWithoutRevision).toList().contains(FlowId.uid(trigger)))
|
||||
.filter(trigger -> !flowIds.contains(FlowId.uid(trigger)))
|
||||
.forEach(trigger -> {
|
||||
try {
|
||||
this.triggerState.delete(trigger);
|
||||
@@ -491,12 +495,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
.map(abstractTrigger -> {
|
||||
RunContext runContext = runContextFactory.of(flow, abstractTrigger);
|
||||
ConditionContext conditionContext = conditionService.conditionContext(runContext, flow, null);
|
||||
Trigger triggerContext = null;
|
||||
Trigger lastTrigger = triggerContextsToEvaluate
|
||||
.stream()
|
||||
.filter(triggerContextToFind -> triggerContextToFind.uid().equals(Trigger.uid(flow, abstractTrigger)))
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
Trigger triggerContext;
|
||||
Trigger lastTrigger = triggerById.get(Trigger.uid(flow, abstractTrigger));
|
||||
// If a trigger is not found in triggers to evaluate, then we ignore it
|
||||
if (lastTrigger == null) {
|
||||
return null;
|
||||
@@ -1006,8 +1006,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
|
||||
|
||||
setState(ServiceState.TERMINATING);
|
||||
this.receiveCancellations.forEach(Runnable::run);
|
||||
this.scheduleExecutor.shutdown();
|
||||
this.executionMonitorExecutor.shutdown();
|
||||
ExecutorsUtils.closeScheduledThreadPool(this.scheduleExecutor, Duration.ofSeconds(5), List.of(scheduledFuture));
|
||||
ExecutorsUtils.closeScheduledThreadPool(executionMonitorExecutor, Duration.ofSeconds(5), List.of(executionMonitorFuture));
|
||||
try {
|
||||
if (onClose != null) {
|
||||
onClose.run();
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.core.server;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.kestra.core.utils.ExecutorsUtils;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
@@ -8,9 +9,11 @@ import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
@@ -25,6 +28,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
|
||||
protected final ServerConfig serverConfig;
|
||||
private final AtomicBoolean isStopped = new AtomicBoolean(false);
|
||||
private ScheduledExecutorService scheduledExecutorService;
|
||||
private ScheduledFuture<?> scheduledFuture;
|
||||
private Instant lastScheduledExecution;
|
||||
|
||||
/**
|
||||
@@ -98,7 +102,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
|
||||
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, name));
|
||||
Duration scheduleInterval = getScheduleInterval();
|
||||
log.debug("Scheduling '{}' at fixed rate {}.", name, scheduleInterval);
|
||||
scheduledExecutorService.scheduleAtFixedRate(
|
||||
scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(
|
||||
this,
|
||||
0,
|
||||
scheduleInterval.toSeconds(),
|
||||
@@ -133,20 +137,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
|
||||
@Override
|
||||
public void close() {
|
||||
if (isStopped.compareAndSet(false, true) && scheduledExecutorService != null) {
|
||||
scheduledExecutorService.shutdown();
|
||||
if (scheduledExecutorService.isTerminated()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (!scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS)) {
|
||||
log.debug("Failed to wait for scheduled '{}' task termination. Cause: Timeout", name);
|
||||
}
|
||||
log.debug("Stopped scheduled '{}' task.", name);
|
||||
} catch (InterruptedException e) {
|
||||
scheduledExecutorService.shutdownNow();
|
||||
Thread.currentThread().interrupt();
|
||||
log.debug("Failed to wait for scheduled '{}' task termination. Cause: Interrupted.", name);
|
||||
}
|
||||
ExecutorsUtils.closeScheduledThreadPool(scheduledExecutorService, Duration.ofSeconds(5), List.of(scheduledFuture));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@ import java.util.stream.Collectors;
|
||||
/**
|
||||
* Runtime information about a Kestra's service (e.g., WORKER, EXECUTOR, etc.).
|
||||
*
|
||||
* @param uid The service unique identifier.
|
||||
* @param uid The service unique identifier.
|
||||
* @param type The service type.
|
||||
* @param state The state of the service.
|
||||
* @param server The server running this service.
|
||||
|
||||
@@ -250,9 +250,10 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
|
||||
stateLock.lock();
|
||||
// Optional callback to be executed at the end.
|
||||
Runnable returnCallback = null;
|
||||
|
||||
localServiceState = localServiceState(service);
|
||||
try {
|
||||
localServiceState = localServiceState(service);
|
||||
|
||||
|
||||
if (localServiceState == null) {
|
||||
return null; // service has been unregistered.
|
||||
}
|
||||
@@ -301,7 +302,7 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
|
||||
// Update the local instance
|
||||
this.serviceRegistry.register(localServiceState.with(remoteInstance));
|
||||
} catch (Exception e) {
|
||||
final ServiceInstance localInstance = localServiceState(service).instance();
|
||||
final ServiceInstance localInstance = localServiceState.instance();
|
||||
log.error("[Service id={}, type='{}', hostname='{}'] Failed to update state to {}. Error: {}",
|
||||
localInstance.uid(),
|
||||
localInstance.type(),
|
||||
@@ -317,7 +318,7 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
|
||||
returnCallback.run();
|
||||
}
|
||||
}
|
||||
return localServiceState(service).instance();
|
||||
return Optional.ofNullable(localServiceState(service)).map(LocalServiceState::instance).orElse(null);
|
||||
}
|
||||
|
||||
private void mayDisableStateUpdate(final Service service, final ServiceInstance instance) {
|
||||
@@ -371,9 +372,11 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
|
||||
final Service service,
|
||||
final ServiceInstance instance,
|
||||
final boolean isLivenessEnabled) {
|
||||
// Never shutdown STANDALONE server or WEB_SERVER service.
|
||||
if (instance.server().type().equals(ServerInstance.Type.STANDALONE) ||
|
||||
instance.is(ServiceType.WEBSERVER)) {
|
||||
// Never shutdown STANDALONE server or WEBSERVER and INDEXER services.
|
||||
if (ServerInstance.Type.STANDALONE.equals(instance.server().type()) ||
|
||||
instance.is(ServiceType.INDEXER) ||
|
||||
instance.is(ServiceType.WEBSERVER)
|
||||
) {
|
||||
// Force the RUNNING state.
|
||||
return Optional.of(instance.state(Service.ServiceState.RUNNING, now, null));
|
||||
}
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
package io.kestra.core.server;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import io.kestra.core.utils.Enums;
|
||||
|
||||
/**
|
||||
* Supported Kestra's service types.
|
||||
*/
|
||||
@@ -9,4 +12,16 @@ public enum ServiceType {
|
||||
SCHEDULER,
|
||||
WEBSERVER,
|
||||
WORKER,
|
||||
WORKER_AGENT,
|
||||
CONTROLLER,
|
||||
INVALID;
|
||||
|
||||
@JsonCreator
|
||||
public static ServiceType fromString(final String value) {
|
||||
try {
|
||||
return Enums.getForNameIgnoreCase(value, ServiceType.class, INVALID);
|
||||
} catch (IllegalArgumentException e) {
|
||||
return INVALID;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,22 +0,0 @@
|
||||
package io.kestra.core.services;
|
||||
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
import io.micronaut.scheduling.annotation.Scheduled;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
@Requires(property = "kestra.anonymous-usage-report.enabled", value = "true")
|
||||
@Requires(property = "kestra.server-type")
|
||||
public class CollectorScheduler {
|
||||
@Inject
|
||||
protected CollectorService collectorService;
|
||||
|
||||
@Scheduled(initialDelay = "${kestra.anonymous-usage-report.initial-delay}", fixedDelay = "${kestra.anonymous-usage-report.fixed-delay}")
|
||||
public void report() {
|
||||
collectorService.report();
|
||||
}
|
||||
}
|
||||
@@ -1,220 +0,0 @@
|
||||
package io.kestra.core.services;
|
||||
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.ServerType;
|
||||
import io.kestra.core.models.collectors.*;
|
||||
import io.kestra.core.plugins.PluginRegistry;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.repositories.ServiceInstanceRepositoryInterface;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.VersionProvider;
|
||||
import io.micrometer.core.instrument.Timer;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.annotation.Value;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.micronaut.core.type.Argument;
|
||||
import io.micronaut.http.HttpRequest;
|
||||
import io.micronaut.http.MutableHttpRequest;
|
||||
import io.micronaut.http.client.annotation.Client;
|
||||
import io.micronaut.http.client.exceptions.HttpClientResponseException;
|
||||
import io.micronaut.http.hateoas.JsonError;
|
||||
import io.micronaut.reactor.http.client.ReactorHttpClient;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.net.URI;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class CollectorService {
|
||||
protected static final String UUID = IdUtils.create();
|
||||
|
||||
@Inject
|
||||
@Client
|
||||
protected ReactorHttpClient client;
|
||||
|
||||
@Inject
|
||||
protected ApplicationContext applicationContext;
|
||||
|
||||
@Inject
|
||||
private FlowRepositoryInterface flowRepository;
|
||||
|
||||
@Inject
|
||||
private ExecutionRepositoryInterface executionRepository;
|
||||
|
||||
@Inject
|
||||
protected InstanceService instanceService;
|
||||
|
||||
@Inject
|
||||
protected VersionProvider versionProvider;
|
||||
|
||||
@Inject
|
||||
protected PluginRegistry pluginRegistry;
|
||||
|
||||
@Nullable
|
||||
@Value("${kestra.server-type}")
|
||||
protected ServerType serverType;
|
||||
|
||||
@Nullable
|
||||
@Value("${kestra.url:}")
|
||||
protected String kestraUrl;
|
||||
|
||||
@Value("${kestra.anonymous-usage-report.uri}")
|
||||
protected URI url;
|
||||
|
||||
@Inject
|
||||
private ServiceInstanceRepositoryInterface serviceRepository;
|
||||
|
||||
@Inject
|
||||
private MetricRegistry metricRegistry;
|
||||
|
||||
private transient Usage defaultUsage;
|
||||
|
||||
protected synchronized Usage defaultUsage() {
|
||||
boolean first = defaultUsage == null;
|
||||
|
||||
if (first) {
|
||||
defaultUsage = Usage.builder()
|
||||
.startUuid(UUID)
|
||||
.instanceUuid(instanceService.fetch())
|
||||
.serverType(serverType)
|
||||
.version(versionProvider.getVersion())
|
||||
.zoneId(ZoneId.systemDefault())
|
||||
.uri(kestraUrl == null ? null : kestraUrl)
|
||||
.environments(applicationContext.getEnvironment().getActiveNames())
|
||||
.startTime(Instant.ofEpochMilli(ManagementFactory.getRuntimeMXBean().getStartTime()))
|
||||
.host(HostUsage.of())
|
||||
.configurations(ConfigurationUsage.of(applicationContext))
|
||||
.plugins(PluginUsage.of(pluginRegistry))
|
||||
.build();
|
||||
}
|
||||
|
||||
return defaultUsage;
|
||||
}
|
||||
|
||||
public Usage metrics(boolean details) {
|
||||
return metrics(details, serverType == ServerType.WORKER || serverType == ServerType.SCHEDULER || serverType == ServerType.STANDALONE);
|
||||
}
|
||||
|
||||
public Usage metrics(boolean details, boolean metrics) {
|
||||
ZonedDateTime to = ZonedDateTime.now();
|
||||
|
||||
ZonedDateTime from = to
|
||||
.toLocalDate()
|
||||
.atStartOfDay(ZoneId.systemDefault())
|
||||
.minusDays(1);
|
||||
|
||||
return metrics(details, metrics, from, to);
|
||||
}
|
||||
|
||||
public Usage metrics(boolean details, boolean metrics, ZonedDateTime from, ZonedDateTime to) {
|
||||
Usage.UsageBuilder<?, ?> builder = defaultUsage()
|
||||
.toBuilder()
|
||||
.uuid(IdUtils.create());
|
||||
|
||||
if (details) {
|
||||
builder = builder
|
||||
.flows(FlowUsage.of(flowRepository))
|
||||
.executions(ExecutionUsage.of(executionRepository, from, to))
|
||||
.services(ServiceUsage.of(from.toInstant(), to.toInstant(), serviceRepository, Duration.ofMinutes(5)));
|
||||
}
|
||||
|
||||
if (metrics) {
|
||||
builder = builder.pluginMetrics(pluginMetrics());
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public void report() {
|
||||
try {
|
||||
Usage metrics = this.metrics(serverType == ServerType.EXECUTOR || serverType == ServerType.STANDALONE);
|
||||
MutableHttpRequest<Usage> post = this.request(metrics);
|
||||
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Report anonymous usage: '{}'", JacksonMapper.ofJson().writeValueAsString(metrics));
|
||||
}
|
||||
|
||||
Result result = client.toBlocking()
|
||||
.retrieve(
|
||||
post,
|
||||
Argument.of(Result.class),
|
||||
Argument.of(JsonError.class)
|
||||
);
|
||||
this.handleResponse(result);
|
||||
} catch (HttpClientResponseException t) {
|
||||
log.debug("Unable to report anonymous usage with body '{}'", t.getResponse().getBody(String.class), t);
|
||||
} catch (Exception t) {
|
||||
log.debug("Unable to handle anonymous usage", t);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleResponse(Result result) {
|
||||
|
||||
}
|
||||
|
||||
protected MutableHttpRequest<Usage> request(Usage metrics) throws Exception {
|
||||
return HttpRequest.POST(this.url, metrics)
|
||||
.header("User-Agent", "Kestra/" + versionProvider.getVersion());
|
||||
}
|
||||
|
||||
private List<PluginMetric> pluginMetrics() {
|
||||
List<PluginMetric> taskMetrics = pluginRegistry.plugins().stream()
|
||||
.flatMap(registeredPlugin -> registeredPlugin.getTasks().stream())
|
||||
.map(cls -> cls.getName())
|
||||
.map(type -> taskMetric(type))
|
||||
.filter(opt -> opt.isPresent())
|
||||
.map(opt -> opt.get())
|
||||
.toList();
|
||||
|
||||
List<PluginMetric> triggerMetrics = pluginRegistry.plugins().stream()
|
||||
.flatMap(registeredPlugin -> registeredPlugin.getTriggers().stream())
|
||||
.map(cls -> cls.getName())
|
||||
.map(type -> triggerMetric(type))
|
||||
.filter(opt -> opt.isPresent())
|
||||
.map(opt -> opt.get())
|
||||
.toList();
|
||||
|
||||
return ListUtils.concat(taskMetrics, triggerMetrics);
|
||||
}
|
||||
|
||||
private Optional<PluginMetric> taskMetric(String type) {
|
||||
Timer duration = metricRegistry.find(MetricRegistry.METRIC_WORKER_ENDED_DURATION).tag(MetricRegistry.TAG_TASK_TYPE, type).timer();
|
||||
return fromTimer(type, duration);
|
||||
}
|
||||
|
||||
private Optional<PluginMetric> triggerMetric(String type) {
|
||||
Timer duration = metricRegistry.find(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION).tag(MetricRegistry.TAG_TRIGGER_TYPE, type).timer();
|
||||
|
||||
if (duration == null) {
|
||||
// this may be because this is a trigger executed by the scheduler, we search there instead
|
||||
duration = metricRegistry.find(MetricRegistry.METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION).tag(MetricRegistry.TAG_TRIGGER_TYPE, type).timer();
|
||||
}
|
||||
return fromTimer(type, duration);
|
||||
}
|
||||
|
||||
private Optional<PluginMetric> fromTimer(String type, Timer timer) {
|
||||
if (timer == null || timer.count() == 0) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
double count = timer.count();
|
||||
double totalTime = timer.totalTime(TimeUnit.MILLISECONDS);
|
||||
double meanTime = timer.mean(TimeUnit.MILLISECONDS);
|
||||
|
||||
return Optional.of(new PluginMetric(type, count, totalTime, meanTime));
|
||||
}
|
||||
}
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user