Compare commits

...

115 Commits

Author SHA1 Message Date
YannC.
747c424f1f chore(version): upgrade to v0.23.6 2025-07-15 14:52:54 +02:00
brian-mulier-p
33bfc979c5 fix(core): trim expressions in select & multiselect to be able to use '|' instead of '>-' (#10017)
closes #10016
2025-07-09 16:39:02 +02:00
nKwiatkowski
58ceb66cfb chore(version): upgrade to v0.23.5 2025-07-08 15:18:25 +02:00
Loïc Mathieu
a08266593f fix(webserver)*: bulk set labels remove existing labels
FIxes #9764
2025-07-07 15:26:09 +02:00
Loïc Mathieu
d5d5f457b4 fix(system): force running after execution tasks even if the execution is killed
Fixes #9852
2025-07-07 12:41:31 +02:00
François Delbrayelle
cacac2239d fix(taskrunner): abstract task runner (#9769) 2025-07-04 09:50:08 +02:00
nKwiatkowski
5c45bd5eb5 feat(cicd): #4006 add javadoc and sources to cli release 2025-07-03 14:58:12 +02:00
Miloš Paunović
fdf126202c fix(namespaces)*: take pagination into account when browsing namespace flows (#9849)
Closes https://github.com/kestra-io/kestra/issues/9805.
2025-07-02 11:48:53 +02:00
Nicolas K.
0f3c745bb9 feat(cicd): #4006 change signing method (#9854)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-07-02 11:14:07 +02:00
YannC.
5a6a0ff3e3 chore(version): upgrade to v0.23.4 2025-07-01 17:56:42 +02:00
Loïc Mathieu
f5f88e18ce feat(cluster): persist maintenance mode in the database
Part-of: https://github.com/kestra-io/kestra-ee/issues/3735
2025-07-01 17:56:42 +02:00
Nicolas K.
12f521860e feat(cicd): #4006 migrate to maven central (#9807)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-07-01 16:19:11 +02:00
Nicolas K.
b6cf3e1f93 feat(cicd): #4006 migrate sonatype to maven central (#9803)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-07-01 15:02:05 +02:00
YannC.
7125885ea9 fix(triggers): correctly replace the update triggers when disabling 2025-07-01 14:21:22 +02:00
YannC.
0b29a4a247 feat(triggers): avoid clearing selection when refreshing in triggers list 2025-07-01 14:21:22 +02:00
Piyush Bhaskar
0377f87c66 feat(tenant): all routes on /main tenant 2025-07-01 11:57:13 +05:30
Loïc Mathieu
06bd0c6380 fix(system)*: mitigate possible deadlock for execution delay and SLA
In case multiple instances of the executor are started, the execution delay loop and the monitoring SLA loop have a risk of duplicate execution resume or execution SLA violation computation.
This could create some race conditions and duplicate execution update.
But this may also risk to create some deadlocks as two instances of the executor may try to lock the same exection to restart it (or fail it due to SLA).
2025-06-30 14:33:54 +02:00
brian.mulier
cd39995f24 fix(core): use namespace prefix instead of equals
On the namespace/flows, namespace/executions pages and when having a default namespace on Logs page

closes kestra-io/kestra-ee#4200
2025-06-25 17:48:54 +02:00
Loïc Mathieu
938e156bd5 chore(system): call the close runnable later 2025-06-25 14:37:46 +02:00
brian.mulier
1fb7943738 chore(version): update to version '0.23.3' 2025-06-24 17:33:04 +02:00
brian-mulier-p
09d648cf86 fix(variables): put fixtures files with arbitrary key and extract it back as root level "files" variable (#9689) 2025-06-24 17:32:37 +02:00
brian.mulier
02a22faed4 chore(version): update to version '0.23.2' 2025-06-24 14:19:20 +02:00
Ludovic DEHON
169d6610f5 test(core): fix falling test on schedule 2025-06-24 14:19:20 +02:00
Loïc Mathieu
e253958cf4 fix(system): possible NPE on trigger when computing variables 2025-06-24 14:19:20 +02:00
brian-mulier-p
c75f06a036 fix: avoid failure to deserialize json objects that have unknown fields with http client (#9668)
closes #9667
2025-06-24 14:19:20 +02:00
Loïc Mathieu
b3b1b7a5cb feat(executions)*: add tasks to set and unset execution variables
Closes #9555
2025-06-24 14:19:20 +02:00
Loïc Mathieu
34e07b9e2b fix(execution): parent flow never ends when subflow fail due to SLA
This is because the executor didn't have the flow inside it so the execution is not correctly terminated.
It may fix other issues (like flow triggers, purge, ...)

Fixes #9618
2025-06-20 18:04:12 +02:00
Loïc Mathieu
85b449c926 fix(system): flow graph fail to be created while editting a flow
Fixes #9551

It is not the validation per se that fail, it's the graph dependency computation that is also done while editing a flow that fail.
2025-06-20 12:09:18 +02:00
Loïc Mathieu
0017ead9b3 fix(system)*: runIf inside a WorkingDirectory can crash the Worker
Fixes #9639
2025-06-20 12:09:04 +02:00
Barthélémy Ledoux
b0292f02f7 fix(ui): default value for expression cannot be null (#9636) 2025-06-20 11:12:32 +02:00
Piyush Bhaskar
202dc7308d feat(namespaces): show ns description (#9610)
* feat(namespaces): show ns description

* add slot and data for description
2025-06-20 13:59:03 +05:30
François Delbrayelle
3273a9a40c fix(plugin-versioning): replace current JAR if more recent (#9629) 2025-06-20 09:51:21 +02:00
Loïc Mathieu
bd303f4529 fix(system): support allowFailure and allowWarning for the Pause task
Fixes #9416
2025-06-19 17:34:38 +02:00
Barthélémy Ledoux
db57326f0f tests: nocode editor (#9624) 2025-06-19 14:21:15 +02:00
github-actions[bot]
90a576490f chore(version): update to version '0.23.1' 2025-06-19 10:32:53 +00:00
Loïc Mathieu
2cdd968100 feat(system): store version in the settings 2025-06-19 12:23:20 +02:00
Barthélémy Ledoux
adfc3bf526 perf(ui): load a sample schema while waiting (#9558) 2025-06-19 11:34:15 +02:00
Nicolas K.
3a61f9b1ba Fix/tutorial flows with migration (#9620)
* fix(core): #9609 delete tutorial flows and triggers before migrating the database

* fix(core): #9609 delete tutorial flows and triggers before migrating the database for EE version

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-06-19 10:58:29 +02:00
YannC
64e3014426 fix: correctly use default tenant when synchronizing file with local (#9605)
close #9568
2025-06-19 10:04:58 +02:00
François Delbrayelle
1f68e5f4ed fix(podman): do not pass the tag directly to pullImageCmd (withTag) (#9607) 2025-06-18 18:50:54 +02:00
François Delbrayelle
9bfa888e36 fix(plugin): FileSystems.newFileSystem caused a Path component should be / in plugins tests (#9570) 2025-06-18 16:03:45 +02:00
github-actions[bot]
691a77538a chore(version): update to version '0.23.0' 2025-06-17 09:35:23 +00:00
Bart Ledoux
b07086f553 chore: update ui-libs 2025-06-17 11:21:21 +02:00
Ludovic DEHON
ee12c884e9 fix(tasks): sleep example are a full one 2025-06-16 15:02:34 +02:00
Barthélémy Ledoux
712d6da84f fix(ui): make file panel appear beside main panel in namespace (#9546) 2025-06-16 14:45:05 +02:00
Bart Ledoux
fcc5fa2056 fix: package-lock 2025-06-16 14:44:01 +02:00
Loïc Mathieu
dace30ded7 fix(system): compilation issue 2025-06-16 14:18:55 +02:00
github-actions[bot]
2b578f0f94 chore(version): update to version '0.23.0-rc5-SNAPSHOT' 2025-06-16 12:05:27 +00:00
Florian Hussonnois
91f958b26b fix(executor): delete WorkerJobRunning for any terminated task (#9493)
Make ExecutorService responsible for deleting WorkerJobRunning
when a terminated TaskRun is added to an execution.

Changes:
 - Remove unecessary read before delete on WorkerJobRunning table.

Close: #9493
2025-06-16 14:03:11 +02:00
Bart Ledoux
d7fc6894fe tests: fix storybook tests 2025-06-16 13:29:34 +02:00
Bart Ledoux
c286348d27 fix(ui): make array and KV Pairs work in nocode 2025-06-16 12:17:23 +02:00
brian.mulier
de4ec49721 fix(core): yaml utils migration 2025-06-16 11:18:47 +02:00
Barthélémy Ledoux
1966ac6012 fix: cleanup empty metadata to fix variable creation (#9529) 2025-06-16 11:17:52 +02:00
Barthélémy Ledoux
a293a37ec9 fix(ui): nocode API calls on EE needs tenant (#9527) 2025-06-16 11:17:43 +02:00
Barthélémy Ledoux
f295724bb6 fix: small tweaks on tabs (#9520) 2025-06-16 11:17:34 +02:00
Barthélémy Ledoux
06505ad977 fix(ui): snafu on duplicate input pair (#9514) 2025-06-16 11:15:30 +02:00
Barthélémy Ledoux
cb31ef642f fix(ui): [nocode] make dag tasks work (#9506) 2025-06-16 11:14:17 +02:00
Barthélémy Ledoux
c320323371 fix(ui): nocode updating inputs from yaml (#9430) 2025-06-16 11:12:35 +02:00
Barthélémy Ledoux
a190cdd0e7 fix(ui): add datepicker to nocode string field (#9351)
Co-authored-by: GitHub Action <actions@github.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
2025-06-16 11:12:27 +02:00
Barthélémy Ledoux
0678f7c5e9 fix(ui): rename namespace field (#9492) 2025-06-16 11:08:05 +02:00
Barthélémy Ledoux
f39ba5c95e fix(ui): prevent cursor change in Editor component when modelValue is updated from outside (#9371) 2025-06-16 11:07:55 +02:00
Karuna Tata
b4e334c5d8 feat(ui): drag and convert tabs to panels (#9198)
Co-authored-by: Barthélémy Ledoux <bledoux@kestra.io>
2025-06-16 11:07:37 +02:00
Bart Ledoux
561380c942 fix(ui): restore add button as a button 2025-06-16 11:07:25 +02:00
Satvik Kushwaha
68b4867b5a fix(ui): make download and preview visible for text ouputs (#8348)
Co-authored-by: Barthélémy Ledoux <ledouxb@me.com>
2025-06-16 11:06:24 +02:00
Barthélémy Ledoux
cb7f99d107 fix(ui): variables should work with duplicated keys (#9425) 2025-06-16 11:05:17 +02:00
Barthélémy Ledoux
efac7146ff fix: properly detect condition fields (#9353) 2025-06-16 11:02:41 +02:00
Barthélémy Ledoux
11de42c0b8 fix(ui): nocode - open onPause in a new tab (#9366) 2025-06-16 11:02:31 +02:00
Barthélémy Ledoux
b58d9e10dd fix: initialize array fields without any value (#9367) 2025-06-16 11:00:04 +02:00
Barthélémy Ledoux
e25e70d37e refactor: load nocode root form from server schema (#9327) 2025-06-16 10:59:53 +02:00
Karuna Tata
f2dac28997 fix(ui): clear selection of retry form radio buttons (#9268)
Co-authored-by: Barthélémy Ledoux <ledouxb@me.com>
thank you so much for this geat work ! ❤️
2025-06-16 10:59:44 +02:00
Barthélémy Ledoux
0ac8819d95 fix(ui): allow key of sub-tasks to be other than tasks (#9333) 2025-06-16 10:59:24 +02:00
Ludovic DEHON
d261de0df3 fix(core): robots.txt was not served
close kestra-io/kestra#9015
2025-06-13 23:01:48 +02:00
brian.mulier
02cac65614 fix(core): filters was triggering endless refresh
closes #9508
2025-06-13 16:25:34 +02:00
MilosPaunovic
5064687b7e fix(core)*: make sure tour always opens with code & topology tabs visible (#9513)
Closes https://github.com/kestra-io/kestra-ee/issues/4073.
2025-06-13 08:55:20 +02:00
YannC
7c8419b266 fix(ui): Better duplicate key management in the pair component (#9431)
* fix(ui): Better duplicate key mananage in the pair component

close #9220

* fix(ui): add a have-error prop on inputText that show a red shadow

* refactor: simplify inputpair component (#9491)

* fix: only show lock if disabled

* alertState define order

---------

Co-authored-by: Barthélémy Ledoux <bledoux@kestra.io>
2025-06-12 13:28:02 +02:00
Roman Acevedo
84e4c62c6d fix(tests): test editor was showing previous shown plugin doc
fixes https://github.com/kestra-io/kestra-ee/issues/4066
2025-06-12 13:21:29 +02:00
Nicolas K.
9aa605e23b Feat/rework compatibility layer (#9490)
* feat(core): rework compatibility layer

* feat(core): #4062 rework compatibility layer

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-06-12 10:42:49 +02:00
Roman Acevedo
faa77aed79 feat(tests): add execution url in test result 2025-06-12 10:03:05 +02:00
brian-mulier-p
fdce552528 feat(core): introduce tasksWithState autocompletion (#9485)
part of #8350
2025-06-12 09:55:57 +02:00
brian.mulier
a028a61792 fix(core): avoid infinite load upon route redirect (#9480)
closes #9479
2025-06-11 17:03:52 +02:00
brian.mulier
023a77a320 fix(core): properly map labels filters from query (#9480)
closes #9324
2025-06-11 17:03:52 +02:00
brian.mulier
bfee04bca2 fix(core): prevent incompatible timeRange & start/endDate filters + prevent multiple scope filters (#9480)
closes #9240
2025-06-11 17:03:52 +02:00
YannC
3756f01bdf fix(ui): base the required prop on the requiredProperties list (#9433)
close #9377
2025-06-11 13:09:27 +02:00
YannC
c1240d7391 feat(ui): allow to close a tab with mouse middle click like in a navigator/ide (#9434) 2025-06-11 08:55:13 +02:00
YannC
ac37ae6032 fix(core): use Min annotation instead of Positive (#9432)
close #9380
2025-06-10 17:15:11 +02:00
github-actions[bot]
9e51b100b0 chore(version): update to version '0.23.0-rc3-SNAPSHOT' 2025-06-10 12:51:54 +00:00
Miloš Paunović
bc81e01608 fix(core)*: properly display chart colors for logs (#9429) 2025-06-10 13:51:56 +02:00
YannC.
9f2162c942 feat(): add Kestra plugin in the list 2025-06-10 12:44:09 +02:00
brian-mulier-p
97992d99ee fix(core): handle properly dot in nested keys & commas in quoted filter values (#9410) 2025-06-10 11:55:30 +02:00
brian.mulier
f90f6b8429 chore(deps): bump vitest to 3.2.3 2025-06-10 11:55:30 +02:00
brian.mulier
0f7360ae81 build(tests): replace workspaces with proper storybook config + working aliases 2025-06-10 11:53:11 +02:00
Florian Hussonnois
938590f31f fix(plugins): check whether plugin registry support versioning (#9122) 2025-06-10 11:49:40 +02:00
YannC.
b2d1c84a86 fix(): display correctly doc/chart preview when editing custom dashboard
close #9411
2025-06-10 10:25:41 +02:00
Ludovic DEHON
d7ca302830 feat(system): add server_type as global metrics tags 2025-06-10 09:23:14 +02:00
Roman Acevedo
8656e852cc build(ci): fix setversion workflow not making tag push trigger main 2025-06-09 18:03:49 +02:00
brian-mulier-p
cc72336350 fix(core): avoid adding invalid keys from query parameters to filter (#9383)
closes #9364
2025-06-09 18:03:49 +02:00
Roman Acevedo
316d89764e tests(core): add storybook on executions filters (#9354) 2025-06-09 18:03:49 +02:00
Barthélémy Ledoux
4873bf4d36 chore: upgrade storybook (#9326) 2025-06-09 14:40:21 +02:00
Florian Hussonnois
204bf7f5e1 chore: add script to update gradle kestraVersion prop on plugins 2025-06-09 14:31:45 +02:00
Loïc Mathieu
1e0950fdf8 fix(system): import flow should set the tenantId 2025-06-09 13:51:53 +02:00
github-actions[bot]
4cddc704f4 chore(version): update to version '0.23.0-rc2-SNAPSHOT' 2025-06-09 10:48:43 +00:00
Miloš Paunović
f2f0e29f93 fix(namespaces): properly load flows when changing namespace (#9393)
Closes https://github.com/kestra-io/kestra/issues/9352.
2025-06-09 12:34:36 +02:00
Miloš Paunović
95011e022e fix(namespaces): reload namespace once the id parameter changes (#9372)
Closes https://github.com/kestra-io/kestra-ee/issues/3630.
2025-06-06 12:25:37 +02:00
brian.mulier
65503b708a fix(core): add DefaultFilterLanguage as default in KestraFilter
closes #9365
2025-06-05 17:42:34 +02:00
brian-mulier-p
876b8cb2e6 fix(core): avoid crashing in case of taskrun having too large value (#9359)
closes #9312
2025-06-05 14:11:37 +02:00
Nicolas K.
f3b7592dfa fix(flows): #9319 error when puase with timeout trigger an execution (#9334)
* fix(flows): #9319 error when puase with timeout trigger an execution even after it's terminated

* fix(flows): only skip paused flow when execution is terminated

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-06-05 10:15:49 +02:00
brian.mulier
4dbeaf86bb fix(core): larger debounce for filter 2025-06-05 09:48:53 +02:00
brian.mulier
f98e78399d fix(core): handle whitespaces in label key and value 2025-06-05 09:48:43 +02:00
brian.mulier
71dac0f311 fix(core): smarter autocomplete order in editor 2025-06-05 09:48:00 +02:00
brian-mulier-p
3077d0ac7a fix(core): additional plugins are now properly shown in plugin docs (#9329)
closes kestra-io/plugin-langchain4j#61
2025-06-05 09:46:57 +02:00
YannC.
9504bbaffe fix(ci): put back bump helm chart and remove if condition 2025-06-05 08:48:56 +02:00
YannC.
159c9373ad fix(ci): checkout actions from main branch 2025-06-04 21:12:56 +02:00
YannC.
55b9088b55 fix(ci): modify actions order 2025-06-04 21:06:17 +02:00
YannC.
601d1a0abb fix(ci): Correctly pass all the secrets through all workflows 2025-06-04 15:10:33 +02:00
Florian Hussonnois
4a1cf98f26 chore(version): bump to version '0.23.0-rc1-SNAPSHOT' 2025-06-04 14:07:30 +02:00
224 changed files with 5786 additions and 3690 deletions

View File

@@ -43,6 +43,9 @@ jobs:
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
end:
runs-on: ubuntu-latest

View File

@@ -22,11 +22,11 @@ jobs:
echo "Invalid release version. Must match regex: ^[0-9]+(\.[0-9]+)(\.[0-9]+)-(rc[0-9])?(-SNAPSHOT)?$"
exit 1
fi
# Extract the major and minor versions
BASE_VERSION=$(echo "$RELEASE_VERSION" | sed -E 's/^([0-9]+\.[0-9]+)\..*/\1/')
RELEASE_BRANCH="refs/heads/releases/v${BASE_VERSION}.x"
CURRENT_BRANCH="$GITHUB_REF"
if ! [[ "$CURRENT_BRANCH" == "$RELEASE_BRANCH" ]]; then
echo "Invalid release branch. Expected $RELEASE_BRANCH, was $CURRENT_BRANCH"
@@ -54,4 +54,4 @@ jobs:
git commit -m"chore(version): update to version '$RELEASE_VERSION'"
git push
git tag -a "v$RELEASE_VERSION" -m"v$RELEASE_VERSION"
git push origin "v$RELEASE_VERSION"
git push --tags

View File

@@ -6,23 +6,15 @@ on:
GH_PERSONAL_TOKEN:
description: "The Github personal token."
required: true
push:
tags:
- '*'
SLACK_RELEASES_WEBHOOK_URL:
description: "The Slack webhook URL."
required: true
jobs:
publish:
name: Github - Release
runs-on: ubuntu-latest
steps:
# Download Exec
- name: Artifacts - Download executable
uses: actions/download-artifact@v4
if: startsWith(github.ref, 'refs/tags/v')
with:
name: exe
path: build/executable
# Check out
- name: Checkout - Repository
uses: actions/checkout@v4
@@ -36,11 +28,20 @@ jobs:
with:
repository: kestra-io/actions
sparse-checkout-cone-mode: true
ref: fix/core-release
path: actions
sparse-checkout: |
.github/actions
# Download Exec
# Must be done after checkout actions
- name: Artifacts - Download executable
uses: actions/download-artifact@v4
if: startsWith(github.ref, 'refs/tags/v')
with:
name: exe
path: build/executable
# GitHub Release
- name: Create GitHub release
uses: ./actions/.github/actions/github-release
@@ -49,3 +50,16 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
# Trigger gha workflow to bump helm chart version
- name: GitHub - Trigger the Helm chart version bump
uses: peter-evans/repository-dispatch@v3
with:
token: ${{ secrets.GH_PERSONAL_TOKEN }}
repository: kestra-io/helm-charts
event-type: update-helm-chart-version
client-payload: |-
{
"new_version": "${{ github.ref_name }}",
"github_repository": "${{ github.repository }}",
"github_actor": "${{ github.actor }}"
}

View File

@@ -39,8 +39,8 @@ jobs:
- name: Publish - Release package to Maven Central
shell: bash
env:
ORG_GRADLE_PROJECT_sonatypeUsername: ${{ secrets.SONATYPE_USER }}
ORG_GRADLE_PROJECT_sonatypePassword: ${{ secrets.SONATYPE_PASSWORD }}
ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.SONATYPE_USER }}
ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }}
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE}}
@@ -50,7 +50,7 @@ jobs:
echo "signing.password=${SONATYPE_GPG_PASSWORD}" >> ~/.gradle/gradle.properties
echo "signing.secretKeyRingFile=${HOME}/.gradle/secring.gpg" >> ~/.gradle/gradle.properties
echo ${SONATYPE_GPG_FILE} | base64 -d > ~/.gradle/secring.gpg
./gradlew publishToSonatype ${{ startsWith(github.ref, 'refs/tags/v') && 'closeAndReleaseSonatypeStagingRepository' || '' }}
./gradlew publishToMavenCentral
# Gradle dependency
- name: Java - Gradle dependency graph

View File

@@ -42,6 +42,12 @@ on:
SONATYPE_GPG_FILE:
description: "The Sonatype GPG file."
required: true
GH_PERSONAL_TOKEN:
description: "The Github personal token."
required: true
SLACK_RELEASES_WEBHOOK_URL:
description: "The Slack webhook URL."
required: true
jobs:
build-artifacts:
name: Build - Artifacts
@@ -77,4 +83,5 @@ jobs:
if: startsWith(github.ref, 'refs/tags/v')
uses: ./.github/workflows/workflow-github-release.yml
secrets:
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}

View File

@@ -61,6 +61,7 @@
#plugin-jenkins:io.kestra.plugin:plugin-jenkins:LATEST
#plugin-jira:io.kestra.plugin:plugin-jira:LATEST
#plugin-kafka:io.kestra.plugin:plugin-kafka:LATEST
#plugin-kestra:io.kestra.plugin:plugin-kestra:LATEST
#plugin-kubernetes:io.kestra.plugin:plugin-kubernetes:LATEST
#plugin-langchain4j:io.kestra.plugin:plugin-langchain4j:LATEST
#plugin-ldap:io.kestra.plugin:plugin-ldap:LATEST

View File

@@ -31,12 +31,10 @@ plugins {
id 'com.github.node-gradle.node' version '7.1.0'
// release
id "io.github.gradle-nexus.publish-plugin" version "2.0.0"
id 'net.researchgate.release' version '3.1.0'
id "com.gorylenko.gradle-git-properties" version "2.5.0"
id 'signing'
id 'ru.vyarus.pom' version '3.0.0' apply false
id 'ru.vyarus.github-info' version '2.0.0' apply false
id "com.vanniktech.maven.publish" version "0.33.0"
// OWASP dependency check
id "org.owasp.dependencycheck" version "12.1.1" apply false
@@ -414,6 +412,7 @@ distTar.dependsOn shadowJar
startScripts.dependsOn shadowJar
startShadowScripts.dependsOn jar
shadowJar.dependsOn 'ui:assembleFrontend'
shadowJar.dependsOn jar
/**********************************************************************************************************************\
* Executable Jar
@@ -484,24 +483,11 @@ tasks.register('runStandalone', JavaExec) {
/**********************************************************************************************************************\
* Publish
**********************************************************************************************************************/
nexusPublishing {
repositoryDescription = "${project.group}:${rootProject.name}:${project.version}"
useStaging = !project.version.endsWith("-SNAPSHOT")
repositories {
sonatype {
nexusUrl.set(uri("https://s01.oss.sonatype.org/service/local/"))
snapshotRepositoryUrl.set(uri("https://s01.oss.sonatype.org/content/repositories/snapshots/"))
}
}
}
subprojects {subProject ->
subprojects {
if (it.name != 'jmh-benchmarks') {
apply plugin: "maven-publish"
if (subProject.name != 'jmh-benchmarks' && subProject.name != rootProject.name) {
apply plugin: 'signing'
apply plugin: 'ru.vyarus.pom'
apply plugin: 'ru.vyarus.github-info'
apply plugin: "com.vanniktech.maven.publish"
javadoc {
options {
@@ -535,56 +521,98 @@ subprojects {
}
}
github {
user 'kestra-io'
license 'Apache'
repository 'kestra'
site 'https://kestra.io'
//These modules should not be published
def unpublishedModules = ["jdbc-mysql", "jdbc-postgres", "webserver"]
if (subProject.name in unpublishedModules){
return
}
maven.pom {
description = 'The modern, scalable orchestrator & scheduler open source platform'
mavenPublishing {
publishToMavenCentral(true)
signAllPublications()
developers {
developer {
id = "tchiotludo"
name = "Ludovic Dehon"
coordinates(
"${rootProject.group}",
subProject.name == "cli" ? rootProject.name : subProject.name,
"${rootProject.version}"
)
pom {
name = project.name
description = "${project.group}:${project.name}:${rootProject.version}"
url = "https://github.com/kestra-io/${rootProject.name}"
licenses {
license {
name = "The Apache License, Version 2.0"
url = "http://www.apache.org/licenses/LICENSE-2.0.txt"
}
}
developers {
developer {
id = "tchiotludo"
name = "Ludovic Dehon"
email = "ldehon@kestra.io"
}
}
scm {
connection = 'scm:git:'
url = "https://github.com/kestra-io/${rootProject.name}"
}
}
}
publishing {
publications {
sonatypePublication(MavenPublication) {
version project.version
afterEvaluate {
publishing {
publications {
withType(MavenPublication).configureEach { publication ->
if (project.name.contains('cli')) {
groupId "io.kestra"
artifactId "kestra"
artifact shadowJar
artifact executableJar
} else if (project.name.contains('platform')){
groupId project.group
artifactId project.name
} else {
from components.java
groupId project.group
artifactId project.name
artifact sourcesJar
artifact javadocJar
artifact testsJar
if (subProject.name == "platform") {
// Clear all artifacts except the BOM
publication.artifacts.clear()
}
}
}
}
}
signing {
// only sign JARs that we publish to Sonatype
required { gradle.taskGraph.hasTask("publishSonatypePublicationPublicationToSonatypeRepository") }
sign publishing.publications.sonatypePublication
if (subProject.name == 'cli') {
/* Make sure the special publication is wired *after* every plugin */
subProject.afterEvaluate {
/* 1. Remove the default java component so Gradle stops expecting
the standard cli-*.jar, sources, javadoc, etc. */
components.removeAll { it.name == "java" }
/* 2. Replace the publications artifacts with shadow + exec */
publishing.publications.withType(MavenPublication).configureEach { pub ->
pub.artifacts.clear()
// main shadow JAR built at root
pub.artifact(rootProject.tasks.named("shadowJar").get()) {
extension = "jar"
}
// executable ZIP built at root
pub.artifact(rootProject.tasks.named("executableJar").get().archiveFile) {
classifier = "exec"
extension = "zip"
}
pub.artifact(tasks.named("sourcesJar").get())
pub.artifact(tasks.named("javadocJar").get())
}
/* 3. Disable Gradle-module metadata for this publication to
avoid the “artifact removed from java component” error. */
tasks.withType(GenerateModuleMetadata).configureEach { it.enabled = false }
/* 4. Make every publish task in :cli wait for the two artifacts */
tasks.matching { it.name.startsWith("publish") }.configureEach {
dependsOn rootProject.tasks.named("shadowJar")
dependsOn rootProject.tasks.named("executableJar")
}
}
}
tasks.withType(GenerateModuleMetadata).configureEach {
@@ -595,6 +623,7 @@ subprojects {
}
/**********************************************************************************************************************\
* Version
**********************************************************************************************************************/

View File

@@ -37,4 +37,4 @@ dependencies {
//test
testImplementation "org.wiremock:wiremock-jetty12"
}
}

View File

@@ -12,8 +12,8 @@ import io.kestra.core.services.PluginDefaultService;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
import io.micronaut.scheduling.io.watch.FileWatchConfiguration;
import jakarta.inject.Inject;
import jakarta.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import jakarta.validation.ConstraintViolationException;
import lombok.extern.slf4j.Slf4j;
@@ -26,6 +26,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
@Singleton
@Slf4j
@Requires(property = "micronaut.io.watch.enabled", value = "true")
@@ -111,6 +113,8 @@ public class FileChangedEventListener {
}
public void startListening(List<Path> paths) throws IOException, InterruptedException {
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
for (Path path : paths) {
path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY);
}
@@ -189,6 +193,8 @@ public class FileChangedEventListener {
}
private void loadFlowsFromFolder(Path folder) {
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
try {
Files.walkFileTree(folder, new SimpleFileVisitor<Path>() {
@Override
@@ -232,6 +238,8 @@ public class FileChangedEventListener {
}
private Optional<FlowWithSource> parseFlow(String content, Path entry) {
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
try {
FlowWithSource flow = pluginDefaultService.parseFlowWithAllDefaults(tenantId, content, false);
modelValidator.validate(flow);

View File

@@ -15,6 +15,9 @@ micronaut:
static:
paths: classpath:static
mapping: /static/**
root:
paths: classpath:root
mapping: /**
server:
max-request-size: 10GB
multipart:

View File

@@ -21,7 +21,7 @@ import static org.assertj.core.api.Assertions.assertThat;
class PluginDocCommandTest {
public static final String PLUGIN_TEMPLATE_TEST = "plugin-template-test-0.18.0-SNAPSHOT.jar";
public static final String PLUGIN_TEMPLATE_TEST = "plugin-template-test-0.24.0-SNAPSHOT.jar";
@Test
void run() throws IOException, URISyntaxException {

View File

@@ -20,7 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat;
class PluginListCommandTest {
private static final String PLUGIN_TEMPLATE_TEST = "plugin-template-test-0.18.0-SNAPSHOT.jar";
private static final String PLUGIN_TEMPLATE_TEST = "plugin-template-test-0.24.0-SNAPSHOT.jar";
@Test
void shouldListPluginsInstalledLocally() throws IOException, URISyntaxException {

View File

@@ -284,7 +284,7 @@ public class HttpClient implements Closeable {
} else if (cls.isAssignableFrom(Byte[].class)) {
return (T) ArrayUtils.toObject(EntityUtils.toByteArray(entity));
} else {
return (T) JacksonMapper.ofJson().readValue(entity.getContent(), cls);
return (T) JacksonMapper.ofJson(false).readValue(entity.getContent(), cls);
}
}

View File

@@ -1,11 +1,14 @@
package io.kestra.core.metrics;
import io.kestra.core.models.ServerType;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.micronaut.configuration.metrics.aggregator.MeterRegistryConfigurer;
import io.micronaut.context.annotation.Requires;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import io.micronaut.context.annotation.Value;
import io.micronaut.core.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@@ -15,20 +18,26 @@ public class GlobalTagsConfigurer implements MeterRegistryConfigurer<SimpleMeter
@Inject
MetricConfig metricConfig;
@Nullable
@Value("${kestra.server-type}")
ServerType serverType;
@Override
public void configure(SimpleMeterRegistry meterRegistry) {
if (metricConfig.getTags() != null) {
meterRegistry
.config()
.commonTags(
metricConfig.getTags()
.entrySet()
.stream()
.flatMap(e -> Stream.of(e.getKey(), e.getValue()))
.toList()
.toArray(String[]::new)
);
}
String[] tags = Stream
.concat(
metricConfig.getTags() != null ? metricConfig.getTags()
.entrySet()
.stream()
.flatMap(e -> Stream.of(e.getKey(), e.getValue())) : Stream.empty(),
serverType != null ? Stream.of("server_type", serverType.name()) : Stream.empty()
)
.toList()
.toArray(String[]::new);
meterRegistry
.config()
.commonTags(tags);
}
@Override

View File

@@ -15,6 +15,8 @@ import jakarta.validation.constraints.NotNull;
@NoArgsConstructor
public class Setting {
public static final String INSTANCE_UUID = "instance.uuid";
public static final String INSTANCE_VERSION = "instance.version";
@NotNull
private String key;

View File

@@ -156,6 +156,7 @@ public class Execution implements DeletedInterface, TenantInterface {
.flowRevision(flow.getRevision())
.state(new State())
.scheduleDate(scheduleDate.map(ChronoZonedDateTime::toInstant).orElse(null))
.variables(flow.getVariables())
.build();
List<Label> executionLabels = new ArrayList<>(LabelService.labelsExcludingSystem(flow));

View File

@@ -7,6 +7,7 @@ import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.utils.IdUtils;
import io.swagger.v3.oas.annotations.Hidden;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import lombok.*;
@@ -62,6 +63,11 @@ public class TaskRun implements TenantInterface {
@With
Boolean dynamic;
// Set it to true to force execution even if the execution is killed
@Nullable
@With
Boolean forceExecution;
@Deprecated
public void setItems(String items) {
// no-op for backward compatibility
@@ -81,7 +87,8 @@ public class TaskRun implements TenantInterface {
this.outputs,
this.state.withState(state),
this.iteration,
this.dynamic
this.dynamic,
this.forceExecution
);
}
@@ -99,7 +106,8 @@ public class TaskRun implements TenantInterface {
this.outputs,
newState,
this.iteration,
this.dynamic
this.dynamic,
this.forceExecution
);
}
@@ -121,7 +129,8 @@ public class TaskRun implements TenantInterface {
this.outputs,
this.state.withState(State.Type.FAILED),
this.iteration,
this.dynamic
this.dynamic,
this.forceExecution
);
}

View File

@@ -1,20 +1,19 @@
package io.kestra.core.models.flows;
import io.micronaut.core.annotation.Introspected;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Positive;
@SuperBuilder
@Getter
@NoArgsConstructor
@Introspected
public class Concurrency {
@Positive
@Min(1)
@NotNull
private Integer limit;

View File

@@ -3,6 +3,7 @@ package io.kestra.core.models.flows;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.kestra.core.models.tasks.Task;
import io.micronaut.core.annotation.Introspected;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
@@ -253,9 +254,22 @@ public class State {
return this == Type.KILLED;
}
/**
* @return states that are terminal to an execution
*/
public static List<Type> terminatedTypes() {
return Stream.of(Type.values()).filter(type -> type.isTerminated()).toList();
}
/**
* Compute the final 'failure' of a task depending on <code>allowFailure</code> and <code>allowWarning</code>:
* - if both are true -> SUCCESS
* - if only <code>allowFailure</code> is true -> WARNING
* - if none -> FAILED
*/
public static State.Type fail(Task task) {
return task.isAllowFailure() ? (task.isAllowWarning() ? State.Type.SUCCESS : State.Type.WARNING) : State.Type.FAILED;
}
}
@Value

View File

@@ -108,7 +108,7 @@ public class MultiselectInput extends Input<List<String>> implements ItemTypeInt
private List<String> renderExpressionValues(final Function<String, Object> renderer) {
Object result;
try {
result = renderer.apply(expression);
result = renderer.apply(expression.trim());
} catch (Exception e) {
throw ManualConstraintViolation.toConstraintViolationException(
"Cannot render 'expression'. Cause: " + e.getMessage(),

View File

@@ -86,7 +86,7 @@ public class SelectInput extends Input<String> implements RenderableInput {
private List<String> renderExpressionValues(final Function<String, Object> renderer) {
Object result;
try {
result = renderer.apply(expression);
result = renderer.apply(expression.trim());
} catch (Exception e) {
throw ManualConstraintViolation.toConstraintViolationException(
"Cannot render 'expression'. Cause: " + e.getMessage(),

View File

@@ -329,6 +329,14 @@ public class DefaultPluginRegistry implements PluginRegistry {
pluginClassByIdentifier.clear();
}
/**
* {@inheritDoc}
**/
@Override
public boolean isVersioningSupported() {
return false;
}
public record PluginBundleIdentifier(@Nullable URL location) {
public static PluginBundleIdentifier CORE = new PluginBundleIdentifier(null);

View File

@@ -151,7 +151,7 @@ public class LocalPluginManager implements PluginManager {
* {@inheritDoc}
**/
@Override
public PluginArtifact install(File file, boolean installForRegistration, @Nullable Path localRepositoryPath) {
public PluginArtifact install(File file, boolean installForRegistration, @Nullable Path localRepositoryPath, boolean forceInstallOnExistingVersions) {
try {
PluginArtifact artifact = PluginArtifact.fromFile(file);
log.info("Installing managed plugin artifact '{}'", artifact);

View File

@@ -55,14 +55,16 @@ public interface PluginManager extends AutoCloseable {
/**
* Installs the given plugin artifact.
*
* @param file the plugin JAR file.
* @param installForRegistration specify whether plugin artifacts should be scanned and registered.
* @param localRepositoryPath the optional local repository path to install artifact.
* @param file the plugin JAR file.
* @param installForRegistration specify whether plugin artifacts should be scanned and registered.
* @param localRepositoryPath the optional local repository path to install artifact.
* @param forceInstallOnExistingVersions specify whether plugin should be forced install upon the existing one
* @return The URI of the installed plugin.
*/
PluginArtifact install(final File file,
boolean installForRegistration,
@Nullable Path localRepositoryPath);
@Nullable Path localRepositoryPath,
boolean forceInstallOnExistingVersions);
/**
* Installs the given plugin artifact.

View File

@@ -116,4 +116,11 @@ public interface PluginRegistry {
default void clear() {
}
/**
* Checks whether plugin-versioning is supported by this registry.
*
* @return {@code true} if supported. Otherwise {@code false}.
*/
boolean isVersioningSupported();
}

View File

@@ -18,9 +18,11 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystemNotFoundException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -202,19 +204,13 @@ public class PluginScanner {
var guidesDirectory = classLoader.getResource("doc/guides");
if (guidesDirectory != null) {
try (var fileSystem = FileSystems.newFileSystem(guidesDirectory.toURI(), Collections.emptyMap())) {
var root = fileSystem.getPath("/doc/guides");
try (var stream = Files.walk(root, 1)) {
stream
.skip(1) // first element is the root element
.sorted(Comparator.comparing(path -> path.getName(path.getParent().getNameCount()).toString()))
.forEach(guide -> {
var guideName = guide.getName(guide.getParent().getNameCount()).toString();
guides.add(guideName.substring(0, guideName.lastIndexOf('.')));
});
}
try {
var root = Path.of(guidesDirectory.toURI());
addGuides(root, guides);
} catch (IOException | URISyntaxException e) {
// silently fail
} catch (FileSystemNotFoundException e) {
addGuidesThroughNewFileSystem(guidesDirectory, guides);
}
}
@@ -243,6 +239,27 @@ public class PluginScanner {
.build();
}
private static void addGuidesThroughNewFileSystem(URL guidesDirectory, List<String> guides) {
try (var fileSystem = FileSystems.newFileSystem(guidesDirectory.toURI(), Collections.emptyMap())) {
var root = fileSystem.getPath("doc/guides");
addGuides(root, guides);
} catch (IOException | URISyntaxException e) {
// silently fail
}
}
private static void addGuides(Path root, List<String> guides) throws IOException {
try (var stream = Files.walk(root, 1)) {
stream
.skip(1) // first element is the root element
.sorted(Comparator.comparing(path -> path.getName(path.getParent().getNameCount()).toString()))
.forEach(guide -> {
var guideName = guide.getName(guide.getParent().getNameCount()).toString();
guides.add(guideName.substring(0, guideName.lastIndexOf('.')));
});
}
}
public static Manifest getManifest(ClassLoader classLoader) {
try {
URL url = classLoader.getResource(JarFile.MANIFEST_NAME);

View File

@@ -86,7 +86,7 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
DeserializationContext context) throws IOException {
Class<? extends Plugin> pluginType = null;
final String identifier = extractPluginRawIdentifier(node);
final String identifier = extractPluginRawIdentifier(node, pluginRegistry.isVersioningSupported());
if (identifier != null) {
log.trace("Looking for Plugin for: {}",
identifier
@@ -103,7 +103,7 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
);
if (DataChart.class.isAssignableFrom(pluginType)) {
final Class<? extends Plugin> dataFilterClass = pluginRegistry.findClassByIdentifier(extractPluginRawIdentifier(node.get("data")));
final Class<? extends Plugin> dataFilterClass = pluginRegistry.findClassByIdentifier(extractPluginRawIdentifier(node.get("data"), pluginRegistry.isVersioningSupported()));
ParameterizedType genericDataFilterClass = (ParameterizedType) dataFilterClass.getGenericSuperclass();
Type dataFieldsEnum = genericDataFilterClass.getActualTypeArguments()[0];
TypeFactory typeFactory = JacksonMapper.ofJson().getTypeFactory();
@@ -142,7 +142,7 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
);
}
static String extractPluginRawIdentifier(final JsonNode node) {
static String extractPluginRawIdentifier(final JsonNode node, final boolean isVersioningSupported) {
String type = Optional.ofNullable(node.get(TYPE)).map(JsonNode::textValue).orElse(null);
String version = Optional.ofNullable(node.get(VERSION)).map(JsonNode::textValue).orElse(null);
@@ -150,6 +150,6 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
return null;
}
return version != null && !version.isEmpty() ? type + ":" + version : type;
return isVersioningSupported && version != null && !version.isEmpty() ? type + ":" + version : type;
}
}

View File

@@ -67,6 +67,9 @@ public class ExecutorService {
@Inject
private WorkerGroupExecutorInterface workerGroupExecutorInterface;
@Inject
private WorkerJobRunningStateStore workerJobRunningStateStore;
protected FlowMetaStoreInterface flowExecutorInterface;
@Inject
@@ -664,7 +667,7 @@ public class ExecutorService {
.taskRunId(workerTaskResult.getTaskRun().getId())
.executionId(executor.getExecution().getId())
.date(workerTaskResult.getTaskRun().getState().maxDate().plus(duration != null ? duration : timeout))
.state(duration != null ? behavior.mapToState() : State.Type.FAILED)
.state(duration != null ? behavior.mapToState() : State.Type.fail(pauseTask))
.delayType(ExecutionDelay.DelayType.RESUME_FLOW)
.build();
}
@@ -732,6 +735,7 @@ public class ExecutorService {
List<TaskRun> afterExecutionNexts = FlowableUtils.resolveSequentialNexts(executor.getExecution(), afterExecutionResolvedTasks)
.stream()
.map(throwFunction(NextTaskRun::getTaskRun))
.map(taskRun -> taskRun.withForceExecution(true)) // forceExecution so it would be executed even if the execution is killed
.toList();
if (!afterExecutionNexts.isEmpty()) {
return executor.withTaskRun(afterExecutionNexts, "handleAfterExecution ");
@@ -1072,6 +1076,25 @@ public class ExecutorService {
newExecution = executionService.killParentTaskruns(taskRun, newExecution);
}
executor.withExecution(newExecution, "addWorkerTaskResult");
if (taskRun.getState().isTerminated()) {
log.trace("TaskRun terminated: {}", taskRun);
workerJobRunningStateStore.deleteByKey(taskRun.getId());
metricRegistry
.counter(
MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_COUNT,
MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_COUNT_DESCRIPTION,
metricRegistry.tags(workerTaskResult)
)
.increment();
metricRegistry
.timer(
MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_DURATION,
MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_DURATION_DESCRIPTION,
metricRegistry.tags(workerTaskResult)
)
.record(taskRun.getState().getDuration());
}
}
// Note: as the flow is only used in an error branch and it can take time to load, we pass it thought a Supplier

View File

@@ -12,6 +12,7 @@ import io.kestra.core.models.flows.input.SecretInput;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.utils.ListUtils;
import io.kestra.plugin.core.trigger.Schedule;
import lombok.AllArgsConstructor;
import lombok.With;
@@ -27,6 +28,7 @@ import java.util.function.Consumer;
*/
public final class RunVariables {
public static final String SECRET_CONSUMER_VARIABLE_NAME = "addSecretConsumer";
public static final String FIXTURE_FILES_KEY = "io.kestra.datatype:test_fixtures_files";
/**
* Creates an immutable map representation of the given {@link Task}.
@@ -181,9 +183,6 @@ public final class RunVariables {
// Flow
if (flow != null) {
builder.put("flow", RunVariables.of(flow));
if (flow.getVariables() != null) {
builder.put("vars", flow.getVariables());
}
}
// Task
@@ -298,16 +297,19 @@ public final class RunVariables {
if (execution.getTrigger() != null && execution.getTrigger().getVariables() != null) {
builder.put("trigger", execution.getTrigger().getVariables());
// temporal hack to add back the `schedule`variables
// will be removed in 2.0
if (Schedule.class.getName().equals(execution.getTrigger().getType())) {
// add back its variables inside the `schedule` variables
builder.put("schedule", execution.getTrigger().getVariables());
}
}
if (execution.getLabels() != null) {
builder.put("labels", Label.toNestedMap(execution.getLabels()));
}
if (execution.getVariables() != null) {
builder.putAll(execution.getVariables());
}
if (flow == null) {
Flow flowFromExecution = Flow.builder()
.id(execution.getFlowId())
@@ -319,6 +321,20 @@ public final class RunVariables {
}
}
// variables
Optional.ofNullable(execution)
.map(Execution::getVariables)
.or(() -> Optional.ofNullable(flow).map(FlowInterface::getVariables))
.map(HashMap::new)
.ifPresent(variables -> {
Object fixtureFiles = variables.remove(FIXTURE_FILES_KEY);
builder.put("vars", ImmutableMap.copyOf(variables));
if (fixtureFiles != null) {
builder.put("files", fixtureFiles);
}
});
// Kestra configuration
if (kestraConfiguration != null) {
Map<String, String> kestra = HashMap.newHashMap(2);

View File

@@ -4,7 +4,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.metrics.MetricRegistry;
@@ -19,6 +18,7 @@ import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.server.*;
import io.kestra.core.services.LabelService;
import io.kestra.core.services.LogService;
import io.kestra.core.services.MaintenanceService;
import io.kestra.core.services.VariablesService;
import io.kestra.core.services.WorkerGroupService;
import io.kestra.core.storages.StorageContext;
@@ -158,6 +158,9 @@ public class Worker implements Service, Runnable, AutoCloseable {
private TracerFactory tracerFactory;
private Tracer tracer;
@Inject
private MaintenanceService maintenanceService;
/**
* Creates a new {@link Worker} instance.
*
@@ -285,8 +288,12 @@ public class Worker implements Service, Runnable, AutoCloseable {
));
this.clusterEventQueue.ifPresent(clusterEventQueueInterface -> this.receiveCancellations.addFirst(clusterEventQueueInterface.receive(this::clusterEventQueue)));
if (this.maintenanceService.isInMaintenanceMode()) {
enterMaintenance();
} else {
setState(ServiceState.RUNNING);
}
setState(ServiceState.RUNNING);
if (workerGroupKey != null) {
log.info("Worker started with {} thread(s) in group '{}'", numThreads, workerGroupKey);
}
@@ -304,21 +311,25 @@ public class Worker implements Service, Runnable, AutoCloseable {
ClusterEvent clusterEvent = either.getLeft();
log.info("Cluster event received: {}", clusterEvent);
switch (clusterEvent.eventType()) {
case MAINTENANCE_ENTER -> {
this.executionKilledQueue.pause();
this.workerJobQueue.pause();
this.setState(ServiceState.MAINTENANCE);
}
case MAINTENANCE_EXIT -> {
this.executionKilledQueue.resume();
this.workerJobQueue.resume();
this.setState(ServiceState.RUNNING);
}
case MAINTENANCE_ENTER -> enterMaintenance();
case MAINTENANCE_EXIT -> exitMaintenance();
}
}
private void enterMaintenance() {
this.executionKilledQueue.pause();
this.workerJobQueue.pause();
this.setState(ServiceState.MAINTENANCE);
}
private void exitMaintenance() {
this.executionKilledQueue.resume();
this.workerJobQueue.resume();
this.setState(ServiceState.RUNNING);
}
private void setState(final ServiceState state) {
this.state.set(state);
Map<String, Object> properties = new HashMap<>();
@@ -395,11 +406,16 @@ public class Worker implements Service, Runnable, AutoCloseable {
} catch (IllegalVariableEvaluationException e) {
RunContextLogger contextLogger = runContextLoggerFactory.create(currentWorkerTask);
contextLogger.logger().error("Failed evaluating runIf: {}", e.getMessage(), e);
try {
this.workerTaskResultQueue.emit(new WorkerTaskResult(workerTask.fail()));
} catch (QueueException ex) {
log.error("Unable to emit the worker task result for task {} taskrun {}", currentWorkerTask.getTask().getId(), currentWorkerTask.getTaskRun().getId(), e);
}
} catch (QueueException e) {
log.error("Unable to emit the worker task result for task {} taskrun {}", currentWorkerTask.getTask().getId(), currentWorkerTask.getTaskRun().getId(), e);
}
if (workerTaskResult.getTaskRun().getState().isFailed() && !currentWorkerTask.getTask().isAllowFailure()) {
if (workerTaskResult == null || workerTaskResult.getTaskRun().getState().isFailed() && !currentWorkerTask.getTask().isAllowFailure()) {
break;
}
@@ -624,7 +640,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
));
}
if (killedExecution.contains(workerTask.getTaskRun().getExecutionId())) {
if (! Boolean.TRUE.equals(workerTask.getTaskRun().getForceExecution()) && killedExecution.contains(workerTask.getTaskRun().getExecutionId())) {
WorkerTaskResult workerTaskResult = new WorkerTaskResult(workerTask.getTaskRun().withState(KILLED));
try {
this.workerTaskResultQueue.emit(workerTaskResult);
@@ -776,7 +792,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
if (!(workerTask.getTask() instanceof RunnableTask<?> task)) {
// This should never happen but better to deal with it than crashing the Worker
var state = workerTask.getTask().isAllowFailure() ? workerTask.getTask().isAllowWarning() ? SUCCESS : WARNING : FAILED;
var state = State.Type.fail(workerTask.getTask());
TaskRunAttempt attempt = TaskRunAttempt.builder()
.state(new io.kestra.core.models.flows.State().withState(state))
.workerId(this.id)

View File

@@ -0,0 +1,20 @@
package io.kestra.core.runners;
/**
* State store containing all workers' jobs in RUNNING state.
*
* @see WorkerJob
*/
public interface WorkerJobRunningStateStore {
/**
* Deletes a running worker job for the given key.
*
* <p>
* A key can be a {@link WorkerTask} Task Run ID.
* </p>
*
* @param key the key of the worker job to be deleted.
*/
void deleteByKey(String key);
}

View File

@@ -48,7 +48,7 @@ public class WorkerTask extends WorkerJob {
* @return this worker task, updated
*/
public TaskRun fail() {
var state = this.task.isAllowFailure() ? this.task.isAllowWarning() ? State.Type.SUCCESS : State.Type.WARNING : State.Type.FAILED;
var state = State.Type.fail(task);
return this.getTaskRun().withState(state);
}
}

View File

@@ -86,6 +86,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
private final LogService logService;
protected SchedulerExecutionStateInterface executionState;
private final WorkerGroupExecutorInterface workerGroupExecutorInterface;
private final MaintenanceService maintenanceService;
// must be volatile as it's updated by the flow listener thread and read by the scheduleExecutor thread
private volatile Boolean isReady = false;
@@ -136,6 +137,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
this.serviceStateEventPublisher = applicationContext.getBean(ApplicationEventPublisher.class);
this.executionEventPublisher = applicationContext.getBean(ApplicationEventPublisher.class);
this.workerGroupExecutorInterface = applicationContext.getBean(WorkerGroupExecutorInterface.class);
this.maintenanceService = applicationContext.getBean(MaintenanceService.class);
setState(ServiceState.CREATED);
}
@@ -289,8 +292,11 @@ public abstract class AbstractScheduler implements Scheduler, Service {
// listen to cluster events
this.clusterEventQueue.ifPresent(clusterEventQueueInterface -> this.receiveCancellations.addFirst(((QueueInterface<ClusterEvent>) clusterEventQueueInterface).receive(this::clusterEventQueue)));
setState(ServiceState.RUNNING);
if (this.maintenanceService.isInMaintenanceMode()) {
enterMaintenance();
} else {
setState(ServiceState.RUNNING);
}
log.info("Scheduler started");
}
@@ -399,31 +405,35 @@ public abstract class AbstractScheduler implements Scheduler, Service {
ClusterEvent clusterEvent = either.getLeft();
log.info("Cluster event received: {}", clusterEvent);
switch (clusterEvent.eventType()) {
case MAINTENANCE_ENTER -> {
this.executionQueue.pause();
this.triggerQueue.pause();
this.workerJobQueue.pause();
this.workerTriggerResultQueue.pause();
this.executionKilledQueue.pause();
this.pauseAdditionalQueues();
this.isPaused.set(true);
this.setState(ServiceState.MAINTENANCE);
}
case MAINTENANCE_EXIT -> {
this.executionQueue.resume();
this.triggerQueue.resume();
this.workerJobQueue.resume();
this.workerTriggerResultQueue.resume();
this.executionKilledQueue.resume();
this.resumeAdditionalQueues();
this.isPaused.set(false);
this.setState(ServiceState.RUNNING);
}
case MAINTENANCE_ENTER -> enterMaintenance();
case MAINTENANCE_EXIT -> exitMaintenance();
}
}
private void enterMaintenance() {
this.executionQueue.pause();
this.triggerQueue.pause();
this.workerJobQueue.pause();
this.workerTriggerResultQueue.pause();
this.executionKilledQueue.pause();
this.pauseAdditionalQueues();
this.isPaused.set(true);
this.setState(ServiceState.MAINTENANCE);
}
private void exitMaintenance() {
this.executionQueue.resume();
this.triggerQueue.resume();
this.workerJobQueue.resume();
this.workerTriggerResultQueue.resume();
this.executionKilledQueue.resume();
this.resumeAdditionalQueues();
this.isPaused.set(false);
this.setState(ServiceState.RUNNING);
}
protected void resumeAdditionalQueues() {
// by default: do nothing
}
@@ -996,6 +1006,9 @@ public abstract class AbstractScheduler implements Scheduler, Service {
}
setState(ServiceState.TERMINATING);
this.receiveCancellations.forEach(Runnable::run);
this.scheduleExecutor.shutdown();
this.executionMonitorExecutor.shutdown();
try {
if (onClose != null) {
onClose.run();
@@ -1003,9 +1016,6 @@ public abstract class AbstractScheduler implements Scheduler, Service {
} catch (Exception e) {
log.error("Unexpected error while terminating scheduler.", e);
}
this.receiveCancellations.forEach(Runnable::run);
this.scheduleExecutor.shutdown();
this.executionMonitorExecutor.shutdown();
setState(ServiceState.TERMINATED_GRACEFULLY);
if (log.isDebugEnabled()) {

View File

@@ -5,6 +5,7 @@ import com.amazon.ion.IonSystem;
import com.amazon.ion.system.*;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
@@ -36,6 +37,8 @@ import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import static com.fasterxml.jackson.core.StreamReadConstraints.DEFAULT_MAX_STRING_LEN;
public final class JacksonMapper {
public static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<>() {};
public static final TypeReference<List<Object>> LIST_TYPE_REFERENCE = new TypeReference<>() {};
@@ -43,6 +46,12 @@ public final class JacksonMapper {
private JacksonMapper() {}
static {
StreamReadConstraints.overrideDefaultStreamReadConstraints(
StreamReadConstraints.builder().maxNameLength(DEFAULT_MAX_STRING_LEN).build()
);
}
private static final ObjectMapper MAPPER = JacksonMapper.configure(
new ObjectMapper()
);
@@ -52,7 +61,7 @@ public final class JacksonMapper {
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
public static ObjectMapper ofJson() {
return MAPPER;
return JacksonMapper.ofJson(true);
}
public static ObjectMapper ofJson(boolean strict) {

View File

@@ -111,7 +111,7 @@ public interface Service extends AutoCloseable {
* </pre>
*/
enum ServiceState {
CREATED(1, 2, 3), // 0
CREATED(1, 2, 3, 4, 9), // 0
RUNNING(2, 3, 4, 9), // 1
ERROR(4), // 2
DISCONNECTED(4, 7), // 3

View File

@@ -176,7 +176,7 @@ public class FlowService {
previous :
FlowWithSource.of(flowToImport.toBuilder().revision(previous.getRevision() + 1).build(), source)
)
.orElseGet(() -> FlowWithSource.of(flowToImport, source).toBuilder().revision(1).build());
.orElseGet(() -> FlowWithSource.of(flowToImport, source).toBuilder().tenantId(tenantId).revision(1).build());
} else {
return maybeExisting
.map(previous -> repository().update(flow, previous))

View File

@@ -0,0 +1,14 @@
package io.kestra.core.services;
import jakarta.inject.Singleton;
@Singleton
public class MaintenanceService {
/**
* @return true if the cluster is in maintenance mode
*/
public boolean isInMaintenanceMode() {
// maintenance mode is an EE feature
return false;
}
}

View File

@@ -5,16 +5,19 @@ import io.kestra.core.test.TestState;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import java.net.URI;
import java.util.List;
public record UnitTestResult(
@NotNull
String unitTestId,
String testId,
@NotNull
String unitTestType,
String testType,
@NotNull
String executionId,
@NotNull
URI url,
@NotNull
TestState state,
@NotNull
List<AssertionResult> assertionResults,
@@ -22,14 +25,13 @@ public record UnitTestResult(
List<AssertionRunError> errors,
Fixtures fixtures
) {
public static UnitTestResult of(String unitTestId, String unitTestType, String executionId, List<AssertionResult> results, List<AssertionRunError> errors, @Nullable Fixtures fixtures) {
public static UnitTestResult of(String unitTestId, String unitTestType, String executionId, URI url, List<AssertionResult> results, List<AssertionRunError> errors, @Nullable Fixtures fixtures) {
TestState state;
if(!errors.isEmpty()){
state = TestState.ERROR;
} else {
state = results.stream().anyMatch(assertion -> !assertion.isSuccess()) ? TestState.FAILED : TestState.SUCCESS;
}
return new UnitTestResult(unitTestId, unitTestType, executionId, state, results, errors, fixtures);
return new UnitTestResult(unitTestId, unitTestType, executionId, url, state, results, errors, fixtures);
}
}

View File

@@ -73,7 +73,7 @@ public class GraphUtils {
)))
.orElse(Collections.emptyMap());
triggersDeclarations.forEach(trigger -> {
triggersDeclarations.stream().filter(trigger -> trigger != null).forEach(trigger -> {
GraphTrigger triggerNode = new GraphTrigger(trigger, triggersById.get(trigger.getId()));
triggerCluster.addNode(triggerNode);
triggerCluster.addEdge(triggerCluster.getRoot(), triggerNode, new Relation());

View File

@@ -1,5 +1,7 @@
package io.kestra.core.utils;
import io.kestra.core.models.Setting;
import io.kestra.core.repositories.SettingRepositoryInterface;
import io.micronaut.context.env.Environment;
import io.micronaut.context.env.PropertiesPropertySourceLoader;
import io.micronaut.context.env.PropertySource;
@@ -29,6 +31,9 @@ public class VersionProvider {
@Inject
private Environment environment;
@Inject
private Optional<SettingRepositoryInterface> settingRepository; // repositories are not always there on unit tests
@PostConstruct
public void start() {
final Optional<PropertySource> gitProperties = new PropertiesPropertySourceLoader()
@@ -40,6 +45,18 @@ public class VersionProvider {
this.revision = loadRevision(gitProperties);
this.date = loadTime(gitProperties);
this.version = loadVersion(buildProperties, gitProperties);
// check the version in the settings and update if needed, we did't use it would allow us to detect incompatible update later if needed
if (settingRepository.isPresent()) {
Optional<Setting> versionSetting = settingRepository.get().findByKey(Setting.INSTANCE_VERSION);
if (versionSetting.isEmpty() || !versionSetting.get().getValue().equals(this.version)) {
settingRepository.get().save(Setting.builder()
.key(Setting.INSTANCE_VERSION)
.value(this.version)
.build()
);
}
}
}
private String loadVersion(final Optional<PropertySource> buildProperties,

View File

@@ -0,0 +1,75 @@
package io.kestra.plugin.core.execution;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.ExecutionUpdatableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.MapUtils;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.util.List;
import java.util.Map;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Allow to set execution variables. These variables will then be available via the `{{ vars.name }}` expression."
)
@Plugin(
examples = {
@Example(
full = true,
title = "Set variables",
code = """
id: variables
namespace: company.team
variables:
name: World
tasks:
- id: set_vars
type: io.kestra.plugin.core.execution.SetVariables
variables:
message: Hello
name: Loïc
- id: hello
type: io.kestra.plugin.core.log.Log
message: "{{ vars.message }} {{ vars.name }}\""""
)
}
)
public class SetVariables extends Task implements ExecutionUpdatableTask {
@Schema(title = "The variables")
@NotNull
private Property<Map<String, Object>> variables;
@Schema(title = "Whether to overwrite existing variables")
@NotNull
@Builder.Default
private Property<Boolean> overwrite = Property.ofValue(true);
@Override
public Execution update(Execution execution, RunContext runContext) throws Exception {
Map<String, Object> renderedVars = runContext.render(this.variables).asMap(String.class, Object.class);
boolean renderedOverwrite = runContext.render(overwrite).as(Boolean.class).orElseThrow();
if (!renderedOverwrite) {
// check that none of the new variables already exist
List<String> duplicated = renderedVars.keySet().stream().filter(key -> execution.getVariables().containsKey(key)).toList();
if (!duplicated.isEmpty()) {
throw new IllegalArgumentException("`overwrite` is set to false and the following variables already exist: " + String.join(",", duplicated));
}
}
return execution.withVariables(MapUtils.merge(execution.getVariables(), renderedVars));
}
}

View File

@@ -0,0 +1,89 @@
package io.kestra.plugin.core.execution;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.ExecutionUpdatableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.util.List;
import java.util.Map;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Allow to unset execution variables."
)
@Plugin(
examples = {
@Example(
full = true,
title = "Set and later unset variables",
code = """
id: variables
namespace: company.team
variables:
name: World
tasks:
- id: set_vars
type: io.kestra.plugin.core.execution.SetVariables
variables:
message: Hello
name: Loïc
- id: hello
type: io.kestra.plugin.core.log.Log
message: "{{ vars.message }} {{ vars.name }}"
- id: unset_variables
type: io.kestra.plugin.core.execution.UnsetVariables
variables:
- message
- name"""
)
}
)
public class UnsetVariables extends Task implements ExecutionUpdatableTask {
@Schema(title = "The variables")
@NotNull
private Property<List<String>> variables;
@Schema(title = "Whether to ignore missing variables")
@NotNull
@Builder.Default
private Property<Boolean> ignoreMissing = Property.ofValue(false);
@Override
public Execution update(Execution execution, RunContext runContext) throws Exception {
List<String> renderedVariables = runContext.render(variables).asList(String.class);
boolean renderedIgnoreMissing = runContext.render(ignoreMissing).as(Boolean.class).orElseThrow();
Map<String, Object> variables = execution.getVariables();
for (String key : renderedVariables) {
removeVar(variables, key, renderedIgnoreMissing);
}
return execution.withVariables(variables);
}
private void removeVar(Map<String, Object> vars, String key, boolean ignoreMissing) {
if (key.indexOf('.') >= 0) {
String prefix = key.substring(0, key.indexOf('.'));
String suffix = key.substring(key.indexOf('.') + 1);
removeVar((Map<String, Object>) vars.get(prefix), suffix, ignoreMissing);
} else {
if (ignoreMissing && !vars.containsKey(key)) {
return;
}
vars.remove(key);
}
}
}

View File

@@ -555,7 +555,7 @@ public class ForEachItem extends Task implements FlowableTask<VoidOutput>, Child
builder.uri(uri);
} catch (Exception e) {
runContext.logger().warn("Failed to extract outputs with the error: '{}'", e.getLocalizedMessage(), e);
var state = this.isAllowFailure() ? State.Type.WARNING : State.Type.FAILED;
var state = State.Type.fail(this);
taskRun = taskRun
.withState(state)
.withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(state)).build()))

View File

@@ -29,8 +29,13 @@ import java.util.concurrent.TimeUnit;
@Plugin(
examples = {
@Example(
full = true,
code = """
id: sleep
id: sleep
namespace: company.team
tasks:
- id: sleep
type: io.kestra.plugin.core.flow.Sleep
duration: "PT5S"
"""

View File

@@ -238,7 +238,7 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
builder.outputs(outputs);
} catch (Exception e) {
runContext.logger().warn("Failed to extract outputs with the error: '{}'", e.getLocalizedMessage(), e);
var state = this.isAllowFailure() ? this.isAllowWarning() ? State.Type.SUCCESS : State.Type.WARNING : State.Type.FAILED;
var state = State.Type.fail(this);
Variables variables = variablesService.of(StorageContext.forTask(taskRun), builder.build());
taskRun = taskRun
.withState(state)

View File

@@ -82,12 +82,12 @@ import java.util.stream.Stream;
code = """
id: daily_flow
namespace: company.team
tasks:
- id: log
type: io.kestra.plugin.core.log.Log
message: It's {{ trigger.date ?? taskrun.startDate | date("HH:mm") }}
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
@@ -437,13 +437,6 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
Optional.empty()
);
execution = execution.toBuilder()
// keep to avoid breaking compatibility
.variables(ImmutableMap.of(
"schedule", execution.getTrigger().getVariables()
))
.build();
return Optional.of(execution);
}

View File

@@ -49,7 +49,6 @@ class DocumentationGeneratorTest {
assertThat(render).contains("description: \"Short description for this task\"");
assertThat(render).contains("`VALUE_1`");
assertThat(render).contains("`VALUE_2`");
assertThat(render).contains("This plugin is exclusively available on the Cloud and Enterprise editions of Kestra.");
}
@SuppressWarnings({"rawtypes", "unchecked"})

View File

@@ -1,6 +1,7 @@
package io.kestra.core.http.client;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.net.HttpHeaders;
import io.kestra.core.context.TestRunContextFactory;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
@@ -262,6 +263,30 @@ class HttpClientTest {
}
}
@Test
void postCustomObject_WithUnknownResponseField() throws IllegalVariableEvaluationException, HttpClientException, IOException {
CustomObject test = CustomObject.builder()
.id(IdUtils.create())
.name("test")
.build();
Map<String, String> withAdditionalField = JacksonMapper.ofJson().convertValue(test, new TypeReference<>() {
});
withAdditionalField.put("foo", "bar");
try (HttpClient client = client()) {
HttpResponse<CustomObject> response = client.request(
HttpRequest.of(URI.create(embeddedServerUri + "/http/json-post"), "POST", HttpRequest.JsonRequestBody.builder().content(withAdditionalField).build()),
CustomObject.class
);
assertThat(response.getStatus().getCode()).isEqualTo(200);
assertThat(response.getBody().id).isEqualTo(test.id);
assertThat(response.getHeaders().firstValue(HttpHeaders.CONTENT_TYPE).orElseThrow()).isEqualTo(MediaType.APPLICATION_JSON);
}
}
@Test
void postMultipart() throws IOException, URISyntaxException, IllegalVariableEvaluationException, HttpClientException {
Map<String, Object> multipart = Map.of(
@@ -509,4 +534,4 @@ class HttpClientTest {
String id;
String name;
}
}
}

View File

@@ -26,7 +26,7 @@ class MultiselectInputTest {
MultiselectInput input = MultiselectInput
.builder()
.id("id")
.expression("{{ values }}")
.expression("{{ values }}\n")
.build();
// When
Input<?> renderInput = RenderableInput.mayRenderInput(input, s -> {
@@ -60,4 +60,4 @@ class MultiselectInputTest {
// Then
Assertions.assertEquals(((MultiselectInput)renderInput).getValues(), List.of("1", "2"));
}
}
}

View File

@@ -26,7 +26,7 @@ class SelectInputTest {
SelectInput input = SelectInput
.builder()
.id("id")
.expression("{{ values }}")
.expression("{{ values }}\n")
.build();
// When
Input<?> renderInput = RenderableInput.mayRenderInput(input, s -> {
@@ -60,4 +60,4 @@ class SelectInputTest {
// Then
Assertions.assertEquals(((SelectInput)renderInput).getValues(), List.of("1", "2"));
}
}
}

View File

@@ -4,6 +4,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import io.kestra.core.models.Plugin;
import io.kestra.core.plugins.PluginRegistry;
@@ -15,12 +16,14 @@ import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
@ExtendWith(MockitoExtension.class)
class PluginDeserializerTest {
@Mock
private PluginRegistry registry;
@Test
void shouldSucceededDeserializePluginGivenValidType() throws JsonProcessingException {
// Given
@@ -38,8 +41,9 @@ class PluginDeserializerTest {
TestPluginHolder deserialized = om.readValue(input, TestPluginHolder.class);
// Then
Assertions.assertEquals(TestPlugin.class.getCanonicalName(), deserialized.plugin().getType());
Mockito.verify(registry, Mockito.only()).findClassByIdentifier(identifier);
assertThat(TestPlugin.class.getCanonicalName()).isEqualTo(deserialized.plugin().getType());
Mockito.verify(registry, Mockito.times(1)).isVersioningSupported();
Mockito.verify(registry, Mockito.times(1)).findClassByIdentifier(identifier);
}
@Test
@@ -57,17 +61,33 @@ class PluginDeserializerTest {
});
// Then
Assertions.assertEquals("io.kestra.core.plugins.serdes.Unknown", exception.getTypeId());
assertThat("io.kestra.core.plugins.serdes.Unknown").isEqualTo(exception.getTypeId());
}
@Test
void shouldReturnNullPluginIdentifierGivenNullType() {
Assertions.assertNull(PluginDeserializer.extractPluginRawIdentifier(new TextNode(null)));
assertThat(PluginDeserializer.extractPluginRawIdentifier(new TextNode(null), true)).isNull();
}
@Test
void shouldReturnNullPluginIdentifierGivenEmptyType() {
Assertions.assertNull(PluginDeserializer.extractPluginRawIdentifier(new TextNode("")));
assertThat(PluginDeserializer.extractPluginRawIdentifier(new TextNode(""), true)).isNull();
}
@Test
void shouldReturnTypeWithVersionGivenSupportedVersionTrue() {
ObjectNode jsonNodes = new ObjectNode(new ObjectMapper().getNodeFactory());
jsonNodes.set("type", new TextNode("io.kestra.core.plugins.serdes.Unknown"));
jsonNodes.set("version", new TextNode("1.0.0"));
assertThat(PluginDeserializer.extractPluginRawIdentifier(jsonNodes, true)).isEqualTo("io.kestra.core.plugins.serdes.Unknown:1.0.0");
}
@Test
void shouldReturnTypeWithVersionGivenSupportedVersionFalse() {
ObjectNode jsonNodes = new ObjectNode(new ObjectMapper().getNodeFactory());
jsonNodes.set("type", new TextNode("io.kestra.core.plugins.serdes.Unknown"));
jsonNodes.set("version", new TextNode("1.0.0"));
assertThat(PluginDeserializer.extractPluginRawIdentifier(jsonNodes, false)).isEqualTo("io.kestra.core.plugins.serdes.Unknown");
}
public record TestPluginHolder(Plugin plugin) {

View File

@@ -492,6 +492,12 @@ public abstract class AbstractRunnerTest {
slaTestCase.executionConditionSLAShouldLabel();
}
@Test
@LoadFlows({"flows/valids/sla-parent-flow.yaml", "flows/valids/sla-subflow.yaml"})
void executionConditionSLAShouldLaslaViolationOnSubflowMayEndTheParentFlowbel() throws Exception {
slaTestCase.slaViolationOnSubflowMayEndTheParentFlow();
}
@Test
@LoadFlows({"flows/valids/if.yaml"})
void multipleIf() throws TimeoutException, QueueException {

View File

@@ -48,4 +48,10 @@ public class SLATestCase {
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(execution.getLabels()).contains(new Label("sla", "violated"));
}
public void slaViolationOnSubflowMayEndTheParentFlow() throws QueueException, TimeoutException {
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "sla-parent-flow");
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
}
}

View File

@@ -165,7 +165,7 @@ class YamlParserTest {
Flow flow = this.parse("flows/valids/minimal.yaml");
String s = MAPPER.writeValueAsString(flow);
assertThat(s).isEqualTo("{\"id\":\"minimal\",\"namespace\":\"io.kestra.tests\",\"revision\":2,\"disabled\":false,\"deleted\":false,\"labels\":[{\"key\":\"system.readOnly\",\"value\":\"true\"}],\"tasks\":[{\"id\":\"date\",\"type\":\"io.kestra.plugin.core.debug.Return\",\"format\":\"{{taskrun.startDate}}\"}]}");
assertThat(s).isEqualTo("{\"id\":\"minimal\",\"namespace\":\"io.kestra.tests\",\"revision\":2,\"disabled\":false,\"deleted\":false,\"labels\":[{\"key\":\"system.readOnly\",\"value\":\"true\"},{\"key\":\"existing\",\"value\":\"label\"}],\"tasks\":[{\"id\":\"date\",\"type\":\"io.kestra.plugin.core.debug.Return\",\"format\":\"{{taskrun.startDate}}\"}]}");
}
@Test

View File

@@ -0,0 +1,31 @@
package io.kestra.plugin.core.execution;
import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import org.junit.jupiter.api.Test;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest(startRunner = true)
class SetVariablesTest {
@ExecuteFlow("flows/valids/set-variables.yaml")
@Test
void shouldUpdateExecution(Execution execution) {
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(execution.getTaskRunList()).hasSize(2);
assertThat(((Map<String, Object>) execution.getTaskRunList().get(1).getOutputs().get("values"))).containsEntry("message", "Hello Loïc");
}
@ExecuteFlow("flows/valids/set-variables-duplicate.yaml")
@Test
void shouldFailWhenExistingVariable(Execution execution) {
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
assertThat(execution.getTaskRunList()).hasSize(1);
assertThat(execution.getTaskRunList().getFirst().getState().getCurrent()).isEqualTo(State.Type.FAILED);
}
}

View File

@@ -0,0 +1,23 @@
package io.kestra.plugin.core.execution;
import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import org.junit.jupiter.api.Test;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest(startRunner = true)
class UnsetVariablesTest {
@ExecuteFlow("flows/valids/unset-variables.yaml")
@Test
void shouldUpdateExecution(Execution execution) {
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(execution.getTaskRunList()).hasSize(3);
assertThat(((Map<String, Object>) execution.getTaskRunList().get(2).getOutputs().get("values"))).containsEntry("message", "default");
}
}

View File

@@ -79,6 +79,12 @@ public class PauseTest {
suite.runTimeout(runnerUtils);
}
@Test
@LoadFlows({"flows/valids/pause-timeout-allow-failure.yaml"})
void timeoutAllowFailure() throws Exception {
suite.runTimeoutAllowFailure(runnerUtils);
}
@Test
@LoadFlows({"flows/valids/pause_no_tasks.yaml"})
void runEmptyTasks() throws Exception {
@@ -235,6 +241,25 @@ public class PauseTest {
assertThat(execution.getTaskRunList()).hasSize(1);
}
public void runTimeoutAllowFailure(RunnerUtils runnerUtils) throws Exception {
Execution execution = runnerUtils.runOneUntilPaused(MAIN_TENANT, "io.kestra.tests", "pause-timeout-allow-failure", null, null, Duration.ofSeconds(30));
String executionId = execution.getId();
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.PAUSED);
assertThat(execution.getTaskRunList()).hasSize(1);
execution = runnerUtils.awaitExecution(
e -> e.getId().equals(executionId) && e.getState().getCurrent() == State.Type.WARNING,
() -> {},
Duration.ofSeconds(5)
);
assertThat(execution.getTaskRunList().getFirst().getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count()).as("Task runs were: " + execution.getTaskRunList().toString()).isEqualTo(1L);
assertThat(execution.getTaskRunList().getFirst().getState().getHistories().stream().filter(history -> history.getState() == State.Type.RUNNING).count()).isEqualTo(1L);
assertThat(execution.getTaskRunList().getFirst().getState().getHistories().stream().filter(history -> history.getState() == State.Type.WARNING).count()).isEqualTo(1L);
assertThat(execution.getTaskRunList()).hasSize(2);
}
public void runEmptyTasks(RunnerUtils runnerUtils) throws Exception {
Execution execution = runnerUtils.runOneUntilPaused(MAIN_TENANT, "io.kestra.tests", "pause_no_tasks", null, null, Duration.ofSeconds(30));
String executionId = execution.getId();

View File

@@ -111,6 +111,12 @@ public class WorkingDirectoryTest {
suite.encryption(runnerUtils, runContextFactory);
}
@Test
@LoadFlows({"flows/valids/working-directory-invalid-runif.yaml"})
void invalidRunIf() throws Exception {
suite.invalidRunIf(runnerUtils);
}
@Singleton
public static class Suite {
@Inject
@@ -310,6 +316,15 @@ public class WorkingDirectoryTest {
assertThat(execution.findTaskRunsByTaskId("decrypted").getFirst().getOutputs().get("value")).isEqualTo("Hello World");
}
public void invalidRunIf(RunnerUtils runnerUtils) throws TimeoutException, QueueException {
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "working-directory-invalid-runif", null,
(f, e) -> ImmutableMap.of("failed", "false"), Duration.ofSeconds(60)
);
assertThat(execution.getTaskRunList()).hasSize(2);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
}
private void put(String path, String content) throws IOException {
put(path, content, "io.kestra.tests");
}

View File

@@ -49,7 +49,7 @@ class ScheduleTest {
@Test
void failed() throws Exception {
Schedule trigger = Schedule.builder().id("schedule").cron("1 1 1 1 1").build();
Schedule trigger = Schedule.builder().id("schedule").type(Schedule.class.getName()).cron("1 1 1 1 1").build();
Optional<Execution> evaluate = trigger.evaluate(
conditionContext(trigger),
@@ -82,9 +82,8 @@ class ScheduleTest {
}
@Test
@SuppressWarnings("unchecked")
void success() throws Exception {
Schedule trigger = Schedule.builder().id("schedule").cron("0 0 1 * *").build();
Schedule trigger = Schedule.builder().id("schedule").type(Schedule.class.getName()).cron("0 0 1 * *").build();
ZonedDateTime date = ZonedDateTime.now()
.withDayOfMonth(1)
@@ -103,12 +102,12 @@ class ScheduleTest {
assertThat(evaluate.get().getLabels()).hasSize(3);
assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID)));
var vars = (Map<String, String>) evaluate.get().getVariables().get("schedule");
var vars = evaluate.get().getTrigger().getVariables();
var inputs = evaluate.get().getInputs();
assertThat(dateFromVars(vars.get("date"), date)).isEqualTo(date);
assertThat(dateFromVars(vars.get("next"), date)).isEqualTo(date.plusMonths(1));
assertThat(dateFromVars(vars.get("previous"), date)).isEqualTo(date.minusMonths(1));
assertThat(dateFromVars((String) vars.get("date"), date)).isEqualTo(date);
assertThat(dateFromVars((String) vars.get("next"), date)).isEqualTo(date.plusMonths(1));
assertThat(dateFromVars((String) vars.get("previous"), date)).isEqualTo(date.minusMonths(1));
assertThat(evaluate.get().getLabels()).contains(new Label("flow-label-1", "flow-label-1"));
assertThat(evaluate.get().getLabels()).contains(new Label("flow-label-2", "flow-label-2"));
assertThat(inputs.size()).isEqualTo(2);
@@ -118,7 +117,7 @@ class ScheduleTest {
@Test
void successWithInput() throws Exception {
Schedule trigger = Schedule.builder().id("schedule").cron("0 0 1 * *").inputs(Map.of("input1", "input1")).build();
Schedule trigger = Schedule.builder().id("schedule").type(Schedule.class.getName()).cron("0 0 1 * *").inputs(Map.of("input1", "input1")).build();
ZonedDateTime date = ZonedDateTime.now()
.withDayOfMonth(1)
@@ -147,7 +146,7 @@ class ScheduleTest {
@Test
void success_withLabels() throws Exception {
var scheduleTrigger = Schedule.builder()
.id("schedule")
.id("schedule").type(Schedule.class.getName())
.cron("0 0 1 * *")
.labels(List.of(
new Label("trigger-label-1", "trigger-label-1"),
@@ -173,10 +172,9 @@ class ScheduleTest {
assertThat(evaluate.get().getLabels()).contains(new Label("trigger-label-3", ""));
}
@SuppressWarnings("unchecked")
@Test
void everyMinute() throws Exception {
Schedule trigger = Schedule.builder().id("schedule").cron("* * * * *").build();
Schedule trigger = Schedule.builder().id("schedule").type(Schedule.class.getName()).cron("* * * * *").build();
ZonedDateTime date = ZonedDateTime.now()
.minus(Duration.ofMinutes(1))
@@ -191,18 +189,16 @@ class ScheduleTest {
assertThat(evaluate.isPresent()).isTrue();
var vars = (Map<String, String>) evaluate.get().getVariables().get("schedule");
var vars = evaluate.get().getTrigger().getVariables();;
assertThat(dateFromVars(vars.get("date"), date)).isEqualTo(date);
assertThat(dateFromVars(vars.get("next"), date)).isEqualTo(date.plus(Duration.ofMinutes(1)));
assertThat(dateFromVars(vars.get("previous"), date)).isEqualTo(date.minus(Duration.ofMinutes(1)));
assertThat(dateFromVars((String) vars.get("date"), date)).isEqualTo(date);
assertThat(dateFromVars((String) vars.get("next"), date)).isEqualTo(date.plus(Duration.ofMinutes(1)));
assertThat(dateFromVars((String) vars.get("previous"), date)).isEqualTo(date.minus(Duration.ofMinutes(1)));
}
@SuppressWarnings("unchecked")
@Test
void everySecond() throws Exception {
Schedule trigger = Schedule.builder().id("schedule").cron("* * * * * *").withSeconds(true).build();
Schedule trigger = Schedule.builder().id("schedule").type(Schedule.class.getName()).cron("* * * * * *").withSeconds(true).build();
ZonedDateTime date = ZonedDateTime.now()
.truncatedTo(ChronoUnit.SECONDS)
@@ -215,18 +211,17 @@ class ScheduleTest {
assertThat(evaluate.isPresent()).isTrue();
var vars = (Map<String, String>) evaluate.get().getVariables().get("schedule");
var vars = evaluate.get().getTrigger().getVariables();;
assertThat(dateFromVars(vars.get("date"), date)).isEqualTo(date);
assertThat(dateFromVars(vars.get("next"), date)).isEqualTo(date.plus(Duration.ofSeconds(1)));
assertThat(dateFromVars(vars.get("previous"), date)).isEqualTo(date.minus(Duration.ofSeconds(1)));
assertThat(dateFromVars((String) vars.get("date"), date)).isEqualTo(date);
assertThat(dateFromVars((String) vars.get("next"), date)).isEqualTo(date.plus(Duration.ofSeconds(1)));
assertThat(dateFromVars((String) vars.get("previous"), date)).isEqualTo(date.minus(Duration.ofSeconds(1)));
}
@Test
void shouldNotReturnExecutionForBackFillWhenCurrentDateIsBeforeScheduleDate() throws Exception {
// Given
Schedule trigger = Schedule.builder().id("schedule").cron(TEST_CRON_EVERYDAY_AT_8).build();
Schedule trigger = Schedule.builder().id("schedule").type(Schedule.class.getName()).cron(TEST_CRON_EVERYDAY_AT_8).build();
ZonedDateTime now = ZonedDateTime.now();
TriggerContext triggerContext = triggerContext(now, trigger).toBuilder()
.backfill(Backfill
@@ -246,7 +241,7 @@ class ScheduleTest {
void
shouldReturnExecutionForBackFillWhenCurrentDateIsAfterScheduleDate() throws Exception {
// Given
Schedule trigger = Schedule.builder().id("schedule").cron(TEST_CRON_EVERYDAY_AT_8).build();
Schedule trigger = Schedule.builder().id("schedule").type(Schedule.class.getName()).cron(TEST_CRON_EVERYDAY_AT_8).build();
ZonedDateTime now = ZonedDateTime.of(2025, 1, 1, 0, 0, 0, 0, ZoneId.systemDefault());
TriggerContext triggerContext = triggerContext(ZonedDateTime.now(), trigger).toBuilder()
.backfill(Backfill
@@ -265,7 +260,7 @@ class ScheduleTest {
@Test
void noBackfillNextDate() {
Schedule trigger = Schedule.builder().id("schedule").cron("0 0 * * *").build();
Schedule trigger = Schedule.builder().id("schedule").type(Schedule.class.getName()).cron("0 0 * * *").build();
ZonedDateTime next = trigger.nextEvaluationDate(conditionContext(trigger), Optional.empty());
assertThat(next.getDayOfMonth()).isEqualTo(ZonedDateTime.now().plusDays(1).getDayOfMonth());
@@ -273,7 +268,7 @@ class ScheduleTest {
@Test
void noBackfillNextDateContext() {
Schedule trigger = Schedule.builder().id("schedule").cron("0 0 * * *").timezone("Europe/Paris").build();
Schedule trigger = Schedule.builder().id("schedule").type(Schedule.class.getName()).cron("0 0 * * *").timezone("Europe/Paris").build();
ZonedDateTime date = ZonedDateTime.parse("2020-01-01T00:00:00+01:00[Europe/Paris]");
ZonedDateTime next = trigger.nextEvaluationDate(conditionContext(trigger), Optional.of(triggerContext(date, trigger)));
@@ -281,9 +276,8 @@ class ScheduleTest {
}
@Test
@SuppressWarnings("unchecked")
void systemBackfillChangedFromCronExpression() throws Exception {
Schedule trigger = Schedule.builder().id("schedule").cron("30 0 1 * *").build();
Schedule trigger = Schedule.builder().id("schedule").type(Schedule.class.getName()).cron("30 0 1 * *").build();
ZonedDateTime date = ZonedDateTime.now()
.withDayOfMonth(1)
@@ -303,17 +297,16 @@ class ScheduleTest {
assertThat(evaluate.isPresent()).isTrue();
var vars = (Map<String, String>) evaluate.get().getVariables().get("schedule");
assertThat(dateFromVars(vars.get("date"), expexted)).isEqualTo(expexted);
assertThat(dateFromVars(vars.get("next"), expexted)).isEqualTo(expexted.plusMonths(1));
assertThat(dateFromVars(vars.get("previous"), expexted)).isEqualTo(expexted.minusMonths(1));
var vars = evaluate.get().getTrigger().getVariables();;
assertThat(dateFromVars((String) vars.get("date"), expexted)).isEqualTo(expexted);
assertThat(dateFromVars((String) vars.get("next"), expexted)).isEqualTo(expexted.plusMonths(1));
assertThat(dateFromVars((String) vars.get("previous"), expexted)).isEqualTo(expexted.minusMonths(1));
}
@SuppressWarnings("unchecked")
@Test
void conditions() throws Exception {
Schedule trigger = Schedule.builder()
.id("schedule")
.id("schedule").type(Schedule.class.getName())
.type(Schedule.class.getName())
.cron("0 12 * * 1")
.timezone("Europe/Paris")
@@ -338,17 +331,16 @@ class ScheduleTest {
assertThat(evaluate.isPresent()).isTrue();
var vars = (Map<String, String>) evaluate.get().getVariables().get("schedule");
assertThat(dateFromVars(vars.get("date"), date)).isEqualTo(date);
assertThat(dateFromVars(vars.get("next"), next)).isEqualTo(next);
assertThat(dateFromVars(vars.get("previous"), previous)).isEqualTo(previous);
var vars = evaluate.get().getTrigger().getVariables();;
assertThat(dateFromVars((String) vars.get("date"), date)).isEqualTo(date);
assertThat(dateFromVars((String) vars.get("next"), next)).isEqualTo(next);
assertThat(dateFromVars((String) vars.get("previous"), previous)).isEqualTo(previous);
}
@SuppressWarnings("unchecked")
@Test
void impossibleNextConditions() throws Exception {
Schedule trigger = Schedule.builder()
.id("schedule")
.id("schedule").type(Schedule.class.getName())
.type(Schedule.class.getName())
.cron("0 12 * * 1")
.timezone("Europe/Paris")
@@ -371,16 +363,16 @@ class ScheduleTest {
assertThat(evaluate.isPresent()).isTrue();
var vars = (Map<String, String>) evaluate.get().getVariables().get("schedule");
assertThat(dateFromVars(vars.get("date"), date)).isEqualTo(date);
assertThat(dateFromVars(vars.get("previous"), previous)).isEqualTo(previous);
var vars = evaluate.get().getTrigger().getVariables();;
assertThat(dateFromVars((String) vars.get("date"), date)).isEqualTo(date);
assertThat(dateFromVars((String) vars.get("previous"), previous)).isEqualTo(previous);
assertThat(vars.containsKey("next")).isFalse();
}
@Test
void lateMaximumDelay() {
Schedule trigger = Schedule.builder()
.id("schedule")
.id("schedule").type(Schedule.class.getName())
.cron("* * * * *")
.lateMaximumDelay(Duration.ofMinutes(5))
.build();
@@ -401,17 +393,15 @@ class ScheduleTest {
}
@SuppressWarnings("unchecked")
@Test
void hourly() throws Exception {
Schedule trigger = Schedule.builder()
.id("schedule")
.id("schedule").type(Schedule.class.getName())
.cron("@hourly")
.build();
ZonedDateTime date = ZonedDateTime.now().minusHours(1).withMinute(0).withSecond(0).withNano(0);
Optional<Execution> evaluate = trigger.evaluate(
conditionContext(trigger),
TriggerContext.builder()
@@ -422,14 +412,13 @@ class ScheduleTest {
);
assertThat(evaluate.isPresent()).isTrue();
var vars = (Map<String, String>) evaluate.get().getVariables().get("schedule");
assertThat(dateFromVars(vars.get("date"), date)).isEqualTo(date);
var vars = evaluate.get().getTrigger().getVariables();;
assertThat(dateFromVars((String) vars.get("date"), date)).isEqualTo(date);
}
@SuppressWarnings("unchecked")
@Test
void timezone() throws Exception {
Schedule trigger = Schedule.builder().id("schedule").cron("12 9 1 * *").timezone("America/New_York").build();
Schedule trigger = Schedule.builder().id("schedule").type(Schedule.class.getName()).cron("12 9 1 * *").timezone("America/New_York").build();
ZonedDateTime date = ZonedDateTime.now()
.withZoneSameLocal(ZoneId.of("America/New_York"))
@@ -449,18 +438,18 @@ class ScheduleTest {
assertThat(evaluate.isPresent()).isTrue();
var vars = (Map<String, String>) evaluate.get().getVariables().get("schedule");
var vars = evaluate.get().getTrigger().getVariables();;
assertThat(dateFromVars(vars.get("date"), date)).isEqualTo(date);
assertThat(ZonedDateTime.parse(vars.get("date")).getZone().getId()).isEqualTo("-04:00");
assertThat(dateFromVars(vars.get("next"), date)).isEqualTo(date.plusMonths(1));
assertThat(dateFromVars(vars.get("previous"), date)).isEqualTo(date.minusMonths(1));
assertThat(dateFromVars((String) vars.get("date"), date)).isEqualTo(date);
assertThat(ZonedDateTime.parse((String) vars.get("date")).getZone().getId()).isEqualTo("-04:00");
assertThat(dateFromVars((String) vars.get("next"), date)).isEqualTo(date.plusMonths(1));
assertThat(dateFromVars((String) vars.get("previous"), date)).isEqualTo(date.minusMonths(1));
}
@Test
void timezone_with_backfile() throws Exception {
Schedule trigger = Schedule.builder()
.id("schedule")
.id("schedule").type(Schedule.class.getName())
.cron(TEST_CRON_EVERYDAY_AT_8)
.timezone("America/New_York")
.build();
@@ -480,8 +469,6 @@ class ScheduleTest {
assertThat(result.isPresent()).isTrue();
}
private ConditionContext conditionContext(AbstractTrigger trigger) {
Flow flow = Flow.builder()
.id(IdUtils.create())

View File

@@ -4,6 +4,7 @@ revision: 2
labels:
system.readOnly: "true"
existing: label
tasks:
- id: date

View File

@@ -0,0 +1,15 @@
id: pause-timeout-allow-failure
namespace: io.kestra.tests
tasks:
- id: pause
type: io.kestra.plugin.core.flow.Pause
timeout: PT1S
allowFailure: true
tasks:
- id: ko
type: io.kestra.plugin.core.log.Log
message: "trigger 1 seconds pause"
- id: last
type: io.kestra.plugin.core.debug.Return
format: "{{task.id}} > {{taskrun.startDate}}"

View File

@@ -0,0 +1,17 @@
id: set-variables-duplicate
namespace: io.kestra.tests
variables:
name: World
tasks:
- id: set-vars
type: io.kestra.plugin.core.execution.SetVariables
variables:
message: Hello
name: Loïc
overwrite: false
- id: output
type: io.kestra.plugin.core.output.OutputValues
values:
message: "{{ vars.message }} {{ vars.name }}"

View File

@@ -0,0 +1,16 @@
id: set-variables
namespace: io.kestra.tests
variables:
name: World
tasks:
- id: set-vars
type: io.kestra.plugin.core.execution.SetVariables
variables:
message: Hello
name: Loïc
- id: output
type: io.kestra.plugin.core.output.OutputValues
values:
message: "{{ vars.message }} {{ vars.name }}"

View File

@@ -0,0 +1,8 @@
id: sla-parent-flow
namespace: io.kestra.tests
tasks:
- id: subflow
type: io.kestra.plugin.core.flow.Subflow
namespace: io.kestra.tests
flowId: sla-subflow

View File

@@ -0,0 +1,13 @@
id: sla-subflow
namespace: io.kestra.tests
sla:
- id: maxDuration
type: MAX_DURATION
duration: PT10S
behavior: FAIL
tasks:
- id: sleep
type: io.kestra.plugin.core.flow.Sleep
duration: PT60S

View File

@@ -4,4 +4,10 @@ namespace: io.kestra.tests
tasks:
- id: sleep-long
type: io.kestra.plugin.core.flow.Sleep
duration: PT300S
duration: PT300S
afterExecution:
- id: output
type: io.kestra.plugin.core.output.OutputValues
values:
state: "{{execution.state}}"

View File

@@ -0,0 +1,16 @@
id: unset-variables
namespace: io.kestra.tests
tasks:
- id: set-vars
type: io.kestra.plugin.core.execution.SetVariables
variables:
message: Hello World
- id: unset-variables
type: io.kestra.plugin.core.execution.UnsetVariables
variables:
- message
- id: output
type: io.kestra.plugin.core.output.OutputValues
values:
message: "{{ vars.message ??? 'default' }}"

View File

@@ -0,0 +1,15 @@
id: working-directory-invalid-runif
namespace: io.kestra.tests
tasks:
- id: workingDirectory
type: io.kestra.plugin.core.flow.WorkingDirectory
tasks:
- id: s1
type: io.kestra.plugin.core.debug.Return
format: 1
- id: s2
type: io.kestra.plugin.core.debug.Return
runIf: "{{ outputs.failed }}"
format: 2

View File

@@ -0,0 +1,200 @@
#!/bin/bash
#===============================================================================
# SCRIPT: update-plugin-kestra-version.sh
#
# DESCRIPTION:
# This script can be used to update the gradle 'kestraVersion' property on each kestra plugin repository.
# By default, if no `GITHUB_PAT` environment variable exist, the script will attempt to clone GitHub repositories using SSH_KEY.
#
#USAGE:
# ./dev-tools/update-plugin-kestra-version.sh --branch <branch> --version <version> [plugin-repositories...]
#
#OPTIONS:
# --branch <branch> Specify the branch on which to update the kestraCoreVersion (default: master).
# --version <version> Specify the Kestra core version (required).
# --plugin-file File containing the plugin list (default: .plugins)
# --dry-run Specify to run in DRY_RUN.
# -y, --yes Automatically confirm prompts (non-interactive).
# -h, --help Show this help message and exit.
# EXAMPLES:
# To release all plugins:
# ./update-plugin-kestra-version.sh --branch=releases/v0.23.x --version="[0.23,0.24)"
# To release a specific plugin:
# ./update-plugin-kestra-version.sh --branch=releases/v0.23.x --version="[0.23,0.24)" plugin-kubernetes
# To release specific plugins from file:
# ./update-plugin-kestra-version.sh --branch=releases/v0.23.x --version="[0.23,0.24)" --plugin-file .plugins
#===============================================================================
set -e;
###############################################################
# Global vars
###############################################################
BASEDIR=$(dirname "$(readlink -f $0)")
SCRIPT_NAME=$(basename "$0")
SCRIPT_NAME="${SCRIPT_NAME%.*}"
WORKING_DIR="/tmp/kestra-$SCRIPT_NAME-$(date +%s)"
PLUGIN_FILE="$BASEDIR/../.plugins"
GIT_BRANCH=master
###############################################################
# Functions
###############################################################
# Function to display the help message
usage() {
echo "Usage: $0 --branch <branch> --version <version> [plugin-repositories...]"
echo
echo "Options:"
echo " --branch <branch> Specify the branch on which to update the kestraCoreVersion (default: master)."
echo " --version <version> Specify the Kestra core version (required)."
echo " --plugin-file File containing the plugin list (default: .plugins)"
echo " --dry-run Specify to run in DRY_RUN."
echo " -y, --yes Automatically confirm prompts (non-interactive)."
echo " -h, --help Show this help message and exit."
exit 1
}
# Function to ask to continue
function askToContinue() {
read -p "Are you sure you want to continue? [y/N] " confirm
[[ "$confirm" =~ ^[Yy]$ ]] || { echo "Operation cancelled."; exit 1; }
}
###############################################################
# Options
###############################################################
PLUGINS_ARGS=()
AUTO_YES=false
DRY_RUN=false
# Get the options
while [[ "$#" -gt 0 ]]; do
case "$1" in
--branch)
GIT_BRANCH="$2"
shift 2
;;
--branch=*)
GIT_BRANCH="${1#*=}"
shift
;;
--version)
VERSION="$2"
shift 2
;;
--version=*)
VERSION="${1#*=}"
shift
;;
--plugin-file)
PLUGIN_FILE="$2"
shift 2
;;
--plugin-file=*)
PLUGIN_FILE="${1#*=}"
shift
;;
--dry-run)
DRY_RUN=true
shift
;;
-y|--yes)
AUTO_YES=true
shift
;;
-h|--help)
usage
;;
*)
PLUGINS_ARGS+=("$1")
shift
;;
esac
done
## Check options
if [[ -z "$VERSION" ]]; then
echo -e "Missing required argument: --version\n";
usage
fi
## Get plugin list
if [[ "${#PLUGINS_ARGS[@]}" -eq 0 ]]; then
if [ -f "$PLUGIN_FILE" ]; then
PLUGINS=$(cat "$PLUGIN_FILE" | grep "io\\.kestra\\." | sed -e '/#/s/^.//' | cut -d':' -f1 | uniq | sort);
PLUGINS_COUNT=$(echo "$PLUGINS" | wc -l);
PLUGINS_ARRAY=$(echo "$PLUGINS" | xargs || echo '');
PLUGINS_ARRAY=($PLUGINS_ARRAY);
fi
else
PLUGINS_ARRAY=("${PLUGINS_ARGS[@]}")
PLUGINS_COUNT="${#PLUGINS_ARGS[@]}"
fi
## Get plugin list
echo "VERSION=$RELEASE_VERSION"
echo "GIT_BRANCH=$GIT_BRANCH"
echo "DRY_RUN=$DRY_RUN"
echo "Found ($PLUGINS_COUNT) plugin repositories:";
for PLUGIN in "${PLUGINS_ARRAY[@]}"; do
echo "$PLUGIN"
done
if [[ "$AUTO_YES" == false ]]; then
askToContinue
fi
###############################################################
# Main
###############################################################
mkdir -p $WORKING_DIR
COUNTER=1;
for PLUGIN in "${PLUGINS_ARRAY[@]}"
do
cd $WORKING_DIR;
echo "---------------------------------------------------------------------------------------"
echo "[$COUNTER/$PLUGINS_COUNT] Update Plugin: $PLUGIN"
echo "---------------------------------------------------------------------------------------"
if [[ -z "${GITHUB_PAT}" ]]; then
git clone git@github.com:kestra-io/$PLUGIN
else
echo "Clone git repository using GITHUB PAT"
git clone https://${GITHUB_PAT}@github.com/kestra-io/$PLUGIN.git
fi
cd "$PLUGIN";
if [[ "$PLUGIN" == "plugin-transform" ]] && [[ "$GIT_BRANCH" == "master" ]]; then # quickfix
git checkout main;
else
git checkout "$GIT_BRANCH";
fi
CURRENT_BRANCH=$(git branch --show-current);
echo "Update kestraVersion for plugin: $PLUGIN on branch $CURRENT_BRANCH:";
# Update the kestraVersion property
sed -i "s/^kestraVersion=.*/kestraVersion=${VERSION}/" ./gradle.properties
# Display diff
git diff --exit-code --unified=0 ./gradle.properties | grep -E '^\+|^-' | grep -v -E '^\+\+\+|^---'
if [[ "$DRY_RUN" == false ]]; then
if [[ "$AUTO_YES" == false ]]; then
askToContinue
fi
git add ./gradle.properties
git commit -m"chore(deps): update kestraVersion to ${VERSION}."
git push
else
echo "Skip git commit/push [DRY_RUN=true]";
fi
COUNTER=$(( COUNTER + 1 ));
done;
exit 0;

View File

@@ -1,6 +1,6 @@
version=0.24.0-SNAPSHOT
version=0.23.6
org.gradle.jvmargs=-Xmx2g -XX:MaxMetaspaceSize=512m -XX:+HeapDumpOnOutOfMemoryError
org.gradle.parallel=true
org.gradle.caching=true
org.gradle.priority=low
org.gradle.priority=low

View File

@@ -39,4 +39,13 @@ public class H2TenantMigration extends AbstractJdbcTenantMigration {
return context.execute(query);
}
@Override
protected int deleteTutorialFlows(Table<?> table, DSLContext context) {
String query = """
DELETE FROM "%s"
WHERE JQ_STRING("value", '.namespace') = ?
""".formatted(table.getName());
return context.execute(query, "tutorial");
}
}

View File

@@ -1,5 +1,3 @@
publishSonatypePublicationPublicationToSonatypeRepository.enabled = false
configurations {
compileClasspath.extendsFrom(micronaut)
}

View File

@@ -1,5 +1,3 @@
publishSonatypePublicationPublicationToSonatypeRepository.enabled = false
configurations {
implementation.extendsFrom(micronaut)
}

View File

@@ -7,9 +7,9 @@ import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.WorkerJobRunningStateStore;
import io.kestra.core.runners.WorkerTriggerResult;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.kestra.jdbc.runner.JdbcQueue;
import io.micronaut.context.ApplicationContext;
import io.micronaut.inject.qualifiers.Qualifiers;
@@ -30,7 +30,8 @@ public class JdbcWorkerTriggerResultQueueService implements Closeable {
private final JdbcQueue<WorkerTriggerResult> workerTriggerResultQueue;
@Inject
private AbstractJdbcWorkerJobRunningRepository jdbcWorkerJobRunningRepository;
private WorkerJobRunningStateStore workerJobRunningStateStore;
private final AtomicReference<Runnable> disposable = new AtomicReference<>();
private final AtomicBoolean isClosed = new AtomicBoolean(false);
@@ -52,14 +53,14 @@ public class JdbcWorkerTriggerResultQueueService implements Closeable {
try {
JsonNode json = MAPPER.readTree(either.getRight().getRecord());
var triggerContext = MAPPER.treeToValue(json.get("triggerContext"), TriggerContext.class);
jdbcWorkerJobRunningRepository.deleteByKey(triggerContext.uid());
workerJobRunningStateStore.deleteByKey(triggerContext.uid());
} catch (JsonProcessingException | DeserializationException e) {
// ignore the message if we cannot do anything about it
log.error("Unexpected exception when trying to handle a deserialization error", e);
}
} else {
WorkerTriggerResult workerTriggerResult = either.getLeft();
jdbcWorkerJobRunningRepository.deleteByKey(workerTriggerResult.getTriggerContext().uid());
workerJobRunningStateStore.deleteByKey(workerTriggerResult.getTriggerContext().uid());
}
consumer.accept(either);
});

View File

@@ -45,6 +45,15 @@ public abstract class AbstractJdbcTenantMigration implements TenantMigrationInte
}
if (!dryRun) {
if ("flows".equalsIgnoreCase(table.getName()) || "triggers".equalsIgnoreCase(table.getName())){
log.info("🔸 Delete tutorial flows to prevent duplication");
int deleted = dslContextWrapper.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
return deleteTutorialFlows(table, context);
});
log.info("✅ {} tutorial flows have been deleted", deleted);
}
int updated;
if (tableWithKey(table.getName())){
updated = dslContextWrapper.transactionResult(configuration -> {
@@ -93,4 +102,9 @@ public abstract class AbstractJdbcTenantMigration implements TenantMigrationInte
protected abstract int updateTenantIdFieldAndKey(Table<?> table, DSLContext context);
protected int deleteTutorialFlows(Table<?> table, DSLContext context){
String query = "DELETE FROM %s WHERE namespace = ?".formatted(table.getName());
return context.execute(query, "tutorial");
}
}

View File

@@ -3,6 +3,7 @@ package io.kestra.jdbc.repository;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.repositories.WorkerJobRunningRepositoryInterface;
import io.kestra.core.runners.WorkerJobRunning;
import io.kestra.core.runners.WorkerJobRunningStateStore;
import lombok.extern.slf4j.Slf4j;
import org.jooq.DSLContext;
import org.jooq.Record1;
@@ -13,7 +14,7 @@ import java.util.List;
import java.util.Optional;
@Slf4j
public abstract class AbstractJdbcWorkerJobRunningRepository extends AbstractJdbcRepository implements WorkerJobRunningRepositoryInterface {
public abstract class AbstractJdbcWorkerJobRunningRepository extends AbstractJdbcRepository implements WorkerJobRunningRepositoryInterface, WorkerJobRunningStateStore {
protected io.kestra.jdbc.AbstractJdbcRepository<WorkerJobRunning> jdbcRepository;
public AbstractJdbcWorkerJobRunningRepository(io.kestra.jdbc.AbstractJdbcRepository<WorkerJobRunning> jdbcRepository) {
@@ -26,9 +27,15 @@ public abstract class AbstractJdbcWorkerJobRunningRepository extends AbstractJdb
}
@Override
public void deleteByKey(String uid) {
Optional<WorkerJobRunning> workerJobRunning = this.findByKey(uid);
workerJobRunning.ifPresent(jobRunning -> this.jdbcRepository.delete(jobRunning));
public void deleteByKey(String key) {
this.jdbcRepository.getDslContextWrapper()
.transaction(configuration ->
DSL
.using(configuration)
.deleteFrom(this.jdbcRepository.getTable())
.where(field("key").eq(key))
.execute()
);
}
@Override

View File

@@ -24,13 +24,15 @@ public abstract class AbstractJdbcExecutionDelayStorage extends AbstractJdbcRepo
this.jdbcRepository
.getDslContextWrapper()
.transaction(configuration -> {
SelectConditionStep<Record1<Object>> select = DSL
var select = DSL
.using(configuration)
.select(AbstractJdbcRepository.field("value"))
.from(this.jdbcRepository.getTable())
.where(
AbstractJdbcRepository.field("date").lessOrEqual(now.toOffsetDateTime())
);
)
.forUpdate()
.skipLocked();
this.jdbcRepository.fetch(select)
.forEach(executionDelay -> {

View File

@@ -49,7 +49,9 @@ public abstract class AbstractJdbcSLAMonitorStorage extends AbstractJdbcReposito
DSLContext context = DSL.using(configuration);
var select = context.select()
.from(this.jdbcRepository.getTable())
.where(field("deadline").lt(date));
.where(field("deadline").lt(date))
.forUpdate()
.skipLocked();
this.jdbcRepository.fetch(select)
.forEach(slaMonitor -> {

View File

@@ -200,6 +200,8 @@ public class JdbcExecutor implements ExecutorInterface, Service {
private final AbstractJdbcFlowTopologyRepository flowTopologyRepository;
private final MaintenanceService maintenanceService;
private final String id = IdUtils.create();
private final AtomicBoolean shutdown = new AtomicBoolean(false);
@@ -229,13 +231,15 @@ public class JdbcExecutor implements ExecutorInterface, Service {
final AbstractJdbcFlowTopologyRepository flowTopologyRepository,
final ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher,
final TracerFactory tracerFactory,
final ExecutorsUtils executorsUtils
final ExecutorsUtils executorsUtils,
final MaintenanceService maintenanceService
) {
this.serviceLivenessCoordinator = serviceLivenessCoordinator;
this.flowMetaStore = flowMetaStore;
this.flowTopologyRepository = flowTopologyRepository;
this.eventPublisher = eventPublisher;
this.tracer = tracerFactory.getTracer(JdbcExecutor.class, "EXECUTOR");
this.maintenanceService = maintenanceService;
// By default, we start half-available processors count threads with a minimum of 4 by executor service
// for the worker task result queue and the execution queue.
@@ -389,7 +393,12 @@ public class JdbcExecutor implements ExecutorInterface, Service {
}
));
setState(ServiceState.RUNNING);
if (this.maintenanceService.isInMaintenanceMode()) {
enterMaintenance();
} else {
setState(ServiceState.RUNNING);
}
log.info("Executor started with {} thread(s)", numberOfThreads);
}
@@ -402,29 +411,33 @@ public class JdbcExecutor implements ExecutorInterface, Service {
ClusterEvent clusterEvent = either.getLeft();
log.info("Cluster event received: {}", clusterEvent);
switch (clusterEvent.eventType()) {
case MAINTENANCE_ENTER -> {
this.executionQueue.pause();
this.workerTaskResultQueue.pause();
this.killQueue.pause();
this.subflowExecutionResultQueue.pause();
this.flowQueue.pause();
this.isPaused.set(true);
this.setState(ServiceState.MAINTENANCE);
}
case MAINTENANCE_EXIT -> {
this.executionQueue.resume();
this.workerTaskResultQueue.resume();
this.killQueue.resume();
this.subflowExecutionResultQueue.resume();
this.flowQueue.resume();
this.isPaused.set(false);
this.setState(ServiceState.RUNNING);
}
case MAINTENANCE_ENTER -> enterMaintenance();
case MAINTENANCE_EXIT -> exitMaintenance();
}
}
private void enterMaintenance() {
this.executionQueue.pause();
this.workerTaskResultQueue.pause();
this.killQueue.pause();
this.subflowExecutionResultQueue.pause();
this.flowQueue.pause();
this.isPaused.set(true);
this.setState(ServiceState.MAINTENANCE);
}
private void exitMaintenance() {
this.executionQueue.resume();
this.workerTaskResultQueue.resume();
this.killQueue.resume();
this.subflowExecutionResultQueue.resume();
this.flowQueue.resume();
this.isPaused.set(false);
this.setState(ServiceState.RUNNING);
}
void reEmitWorkerJobsForWorkers(final Configuration configuration,
final List<String> ids) {
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_WORKER_JOB_RESUBMIT_COUNT, MetricRegistry.METRIC_EXECUTOR_WORKER_JOB_RESUBMIT_COUNT_DESCRIPTION)
@@ -718,22 +731,6 @@ public class JdbcExecutor implements ExecutorInterface, Service {
try {
// process worker task result
executorService.addWorkerTaskResult(current, () -> findFlow(execution), message);
// send metrics on terminated
TaskRun taskRun = message.getTaskRun();
if (taskRun.getState().isTerminated()) {
metricRegistry
.counter(MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_COUNT, MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_COUNT_DESCRIPTION, metricRegistry.tags(message))
.increment();
metricRegistry
.timer(MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_DURATION, MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_DURATION_DESCRIPTION, metricRegistry.tags(message))
.record(taskRun.getState().getDuration());
log.trace("TaskRun terminated: {}", taskRun);
workerJobRunningRepository.deleteByKey(taskRun.getId());
}
// join worker result
return Pair.of(
current,
@@ -1166,7 +1163,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
try {
// Handle paused tasks
if (executionDelay.getDelayType().equals(ExecutionDelay.DelayType.RESUME_FLOW)) {
if (executionDelay.getDelayType().equals(ExecutionDelay.DelayType.RESUME_FLOW) && !pair.getLeft().getState().isTerminated()) {
FlowInterface flow = flowMetaStore.findByExecution(pair.getLeft()).orElseThrow();
if (executionDelay.getTaskRunId() == null) {
// if taskRunId is null, this means we restart a flow that was delayed at startup (scheduled on)
@@ -1224,8 +1221,8 @@ public class JdbcExecutor implements ExecutorInterface, Service {
slaMonitorStorage.processExpired(Instant.now(), slaMonitor -> {
Executor result = executionRepository.lock(slaMonitor.getExecutionId(), pair -> {
Executor executor = new Executor(pair.getLeft(), null);
FlowInterface flow = flowMetaStore.findByExecution(pair.getLeft()).orElseThrow();
FlowWithSource flow = findFlow(pair.getLeft());
Executor executor = new Executor(pair.getLeft(), null).withFlow(flow);
Optional<SLA> sla = flow.getSla().stream().filter(s -> s.getId().equals(slaMonitor.getSlaId())).findFirst();
if (sla.isEmpty()) {
// this can happen in case the flow has been updated and the SLA removed

View File

@@ -1,6 +1,5 @@
plugins {
id 'java-platform'
id 'maven-publish'
}
group = 'io.kestra'
@@ -143,14 +142,4 @@ dependencies {
api "io.kestra:runner-memory:$version"
api "io.kestra:storage-local:$version"
}
}
publishing {
publications {
sonatypePublication(MavenPublication) {
groupId project.group
artifactId project.name
from components.javaPlatform
}
}
}

View File

@@ -398,7 +398,7 @@ public class Docker extends TaskRunner<Docker.DockerTaskRunnerDetailResult> {
String remotePath = windowsToUnixPath(taskCommands.getWorkingDirectory().toString());
// first, create an archive
Path fileArchive = runContext.workingDir().createFile("inputFiles.tart");
Path fileArchive = runContext.workingDir().createFile("inputFiles.tar");
try (FileOutputStream fos = new FileOutputStream(fileArchive.toString());
TarArchiveOutputStream out = new TarArchiveOutputStream(fos)) {
out.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX); // allow long file name
@@ -827,8 +827,23 @@ public class Docker extends TaskRunner<Docker.DockerTaskRunnerDetailResult> {
.longValue();
}
private String getImageNameWithoutTag(String fullImageName) {
if (fullImageName == null || fullImageName.isEmpty()) {
return fullImageName;
}
int lastColonIndex = fullImageName.lastIndexOf(':');
int firstSlashIndex = fullImageName.indexOf('/');
if (lastColonIndex > -1 && (firstSlashIndex == -1 || lastColonIndex > firstSlashIndex)) {
return fullImageName.substring(0, lastColonIndex);
} else {
return fullImageName; // No tag found or the colon is part of the registry host
}
}
private void pullImage(DockerClient dockerClient, String image, PullPolicy policy, Logger logger) {
NameParser.ReposTag imageParse = NameParser.parseRepositoryTag(image);
var imageNameWithoutTag = getImageNameWithoutTag(image);
var parsedTagFromImage = NameParser.parseRepositoryTag(image);
if (policy.equals(PullPolicy.IF_NOT_PRESENT)) {
try {
@@ -839,7 +854,9 @@ public class Docker extends TaskRunner<Docker.DockerTaskRunnerDetailResult> {
}
}
try (PullImageCmd pull = dockerClient.pullImageCmd(image)) {
// pullImageCmd without the tag (= repository) to avoid being redundant with withTag below
// and prevent errors with Podman trying to pull "image:tag:tag"
try (var pull = dockerClient.pullImageCmd(imageNameWithoutTag)) {
new RetryUtils().<Boolean, InternalServerErrorException>of(
Exponential.builder()
.delayFactor(2.0)
@@ -851,8 +868,8 @@ public class Docker extends TaskRunner<Docker.DockerTaskRunnerDetailResult> {
(bool, throwable) -> throwable instanceof InternalServerErrorException ||
throwable.getCause() instanceof ConnectionClosedException,
() -> {
String tag = !imageParse.tag.isEmpty() ? imageParse.tag : "latest";
String repository = pull.getRepository().contains(":") ? pull.getRepository().split(":")[0] : pull.getRepository();
var tag = !parsedTagFromImage.tag.isEmpty() ? parsedTagFromImage.tag : "latest";
var repository = pull.getRepository().contains(":") ? pull.getRepository().split(":")[0] : pull.getRepository();
pull
.withTag(tag)
.exec(new PullImageResultCallback())

View File

@@ -1,12 +1,40 @@
package io.kestra.plugin.scripts.runner.docker;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.runners.AbstractTaskRunnerTest;
import io.kestra.core.models.tasks.runners.TaskRunner;
import io.kestra.plugin.scripts.exec.scripts.runners.CommandsWrapper;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.List;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
class DockerTest extends AbstractTaskRunnerTest {
@Override
protected TaskRunner<?> taskRunner() {
return Docker.builder().image("rockylinux:9.3-minimal").build();
}
@Test
void shouldNotHaveTagInDockerPullButJustInWithTag() throws Exception {
var runContext = runContext(this.runContextFactory);
var docker = Docker.builder()
.image("ghcr.io/kestra-io/kestrapy:latest")
.pullPolicy(Property.ofValue(PullPolicy.ALWAYS))
.build();
var taskCommands = new CommandsWrapper(runContext).withCommands(Property.ofValue(List.of(
"/bin/sh", "-c",
"echo Hello World!"
)));
var result = docker.run(runContext, taskCommands, Collections.emptyList());
assertThat(result).isNotNull();
assertThat(result.getExitCode()).isZero();
Assertions.assertThat(result.getLogConsumer().getStdOutCount()).isEqualTo(1);
}
}

View File

@@ -32,7 +32,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
@KestraTest
public abstract class AbstractTaskRunnerTest {
@Inject private TestRunContextFactory runContextFactory;
@Inject protected TestRunContextFactory runContextFactory;
@Inject private StorageInterface storage;
@Test
@@ -81,9 +81,11 @@ public abstract class AbstractTaskRunnerTest {
@Test
protected void inputAndOutputFiles() throws Exception {
RunContext runContext = runContext(this.runContextFactory, Map.of("internalStorageFile", "kestra://some/internalStorage.txt"));
RunContext runContext = runContext(this.runContextFactory, Map.of("internalStorageFile", "kestra:///internalStorage.txt"));
var commands = initScriptCommands(runContext);
Mockito.when(commands.relativeWorkingDirectoryFilesPaths()).thenCallRealMethod();
Mockito.when(commands.relativeWorkingDirectoryFilesPaths(false)).thenCallRealMethod();
// Generate internal storage file
FileUtils.writeStringToFile(Path.of("/tmp/unittest/main/internalStorage.txt").toFile(), "Hello from internal storage", StandardCharsets.UTF_8);

View File

@@ -1,25 +1,33 @@
import type {StorybookConfig} from "@storybook/vue3-vite";
import path from "path";
const config: StorybookConfig = {
stories: [
"../tests/**/*.stories.@(js|jsx|mjs|ts|tsx)"
],
addons: [
"@storybook/addon-essentials",
"@storybook/addon-themes",
"@storybook/experimental-addon-test"
],
framework: {
name: "@storybook/vue3-vite",
options: {},
},
async viteFinal(config) {
const {default: viteJSXPlugin} = await import("@vitejs/plugin-vue-jsx")
config.plugins = [
...(config.plugins ?? []),
viteJSXPlugin(),
];
return config;
},
stories: [
"../tests/**/*.stories.@(js|jsx|mjs|ts|tsx)"
],
addons: [
"@storybook/addon-themes",
"@storybook/addon-vitest",
],
framework: {
name: "@storybook/vue3-vite",
options: {},
},
async viteFinal(config) {
const {default: viteJSXPlugin} = await import("@vitejs/plugin-vue-jsx")
config.plugins = [
...(config.plugins ?? []),
viteJSXPlugin(),
];
if (config.resolve) {
config.resolve.alias = {
"override/services/filterLanguagesProvider": path.resolve(__dirname, "../tests/storybook/mocks/services/filterLanguagesProvider.mock.ts"),
...config.resolve?.alias
};
}
return config;
},
};
export default config;

View File

@@ -1,4 +1,4 @@
import {setup} from "@storybook/vue3";
import {setup} from "@storybook/vue3-vite";
import {withThemeByClassName} from "@storybook/addon-themes";
import initApp from "../src/utils/init";
import stores from "../src/stores/store";
@@ -11,7 +11,7 @@ window.KESTRA_BASE_PATH = "/ui";
window.KESTRA_UI_PATH = "./";
/**
* @type {import('@storybook/vue3').Preview}
* @type {import('@storybook/vue3-vite').Preview}
*/
const preview = {
parameters: {

View File

@@ -0,0 +1,42 @@
import path from "node:path";
import {defineProject, mergeConfig} from "vitest/config";
import {storybookTest} from "@storybook/addon-vitest/vitest-plugin";
import initialConfig from "../vite.config.js"
// More info at: https://storybook.js.org/docs/writing-tests/test-addon
export default mergeConfig(
// We need to define a side first project to set up the alias for the filterLanguagesProvider mock because otherwise the `override` alias will take precedence over this one (first match rule)
defineProject({
resolve: {
alias: {
"override/services/filterLanguagesProvider": path.resolve(__dirname, "../tests/storybook/mocks/services/filterLanguagesProvider.mock.ts")
}
}
}),
mergeConfig(
initialConfig,
defineProject({
plugins: [
// The plugin will run tests for the stories defined in your Storybook config
// See options at: https://storybook.js.org/docs/writing-tests/test-addon#storybooktest
storybookTest({configDir: path.join(__dirname)}),
],
test: {
name: "storybook",
browser: {
enabled: true,
headless: true,
provider: "playwright",
instances: [{browser: "chromium"}],
},
setupFiles: ["vitest.setup.ts"],
},
define: {
"process.env.RUN_TEST_WITH_PERSISTENT": JSON.stringify("false"), // Disable persistent mode for tests
}
}),
),
);

View File

@@ -1,5 +1,5 @@
import {beforeAll} from "vitest";
import {setProjectAnnotations} from "@storybook/vue3";
import {setProjectAnnotations} from "@storybook/vue3-vite";
import * as projectAnnotations from "./preview";
// This is an important step to apply the right configuration when testing your stories.

View File

@@ -20,7 +20,7 @@ export default [
"**/*.spec.ts",
"vite.config.js",
"vitest.config.js",
"vitest.workspace.js",
".storybook/vitest.config.js",
],
languageOptions: {globals: globals.node},
},

3124
ui/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -22,22 +22,22 @@
},
"dependencies": {
"@js-joda/core": "^5.6.5",
"@kestra-io/ui-libs": "^0.0.203",
"@kestra-io/ui-libs": "^0.0.205",
"@vue-flow/background": "^1.3.2",
"@vue-flow/controls": "^1.1.2",
"@vue-flow/core": "^1.44.0",
"@vueuse/core": "^13.2.0",
"@vue-flow/core": "^1.45.0",
"@vueuse/core": "^13.3.0",
"ansi-to-html": "^0.7.2",
"axios": "^1.9.0",
"bootstrap": "^5.3.6",
"buffer": "^6.0.3",
"chart.js": "^4.4.9",
"core-js": "^3.42.0",
"core-js": "^3.43.0",
"cronstrue": "^2.61.0",
"dagre": "^0.8.5",
"el-table-infinite-scroll": "^3.0.6",
"element-plus": "^2.9.10",
"humanize-duration": "^3.32.2",
"element-plus": "^2.10.2",
"humanize-duration": "^3.33.0",
"js-yaml": "^4.1.0",
"lodash": "^4.17.21",
"markdown-it": "^14.1.0",
@@ -50,21 +50,21 @@
"md5": "^2.3.0",
"moment": "^2.30.1",
"moment-range": "^4.0.2",
"moment-timezone": "^0.5.48",
"moment-timezone": "^0.5.46",
"nprogress": "^0.2.0",
"path-browserify": "^1.0.1",
"pdfjs-dist": "^5.2.133",
"posthog-js": "^1.245.1",
"pdfjs-dist": "^5.3.31",
"posthog-js": "^1.250.1",
"rapidoc": "^9.3.8",
"semver": "^7.7.2",
"shiki": "^3.4.2",
"shiki": "^3.6.0",
"splitpanes": "^3.2.0",
"throttle-debounce": "^5.0.2",
"vue": "^3.5.13",
"vue": "^3.5.16",
"vue-axios": "^3.5.2",
"vue-chartjs": "^5.3.2",
"vue-gtag": "^2.1.0",
"vue-i18n": "^11.1.3",
"vue-i18n": "^11.1.5",
"vue-material-design-icons": "^5.3.1",
"vue-router": "^4.5.1",
"vue-sidebar-menu": "^5.7.0",
@@ -80,60 +80,59 @@
"@esbuild-plugins/node-modules-polyfill": "^0.2.2",
"@eslint/js": "^9.27.0",
"@rushstack/eslint-patch": "^1.11.0",
"@shikijs/markdown-it": "^3.4.2",
"@storybook/addon-essentials": "^8.6.14",
"@storybook/addon-themes": "^8.6.14",
"@storybook/blocks": "^8.6.14",
"@storybook/experimental-addon-test": "^8.6.14",
"@storybook/test": "^8.6.14",
"@storybook/test-runner": "^0.22.0",
"@storybook/vue3": "^8.6.14",
"@storybook/vue3-vite": "^8.6.14",
"@shikijs/markdown-it": "^3.6.0",
"@storybook/addon-themes": "^9.0.8",
"@storybook/addon-vitest": "^9.0.8",
"@storybook/test-runner": "^0.22.1",
"@storybook/vue3-vite": "^9.0.8",
"@types/humanize-duration": "^3.27.4",
"@types/js-yaml": "^4.0.9",
"@types/moment": "^2.11.29",
"@types/path-browserify": "^1.0.3",
"@typescript-eslint/parser": "^8.32.1",
"@types/testing-library__jest-dom": "^5.14.9",
"@types/testing-library__user-event": "^4.1.1",
"@typescript-eslint/parser": "^8.34.0",
"@vitejs/plugin-vue": "^5.2.4",
"@vitejs/plugin-vue-jsx": "^4.2.0",
"@vitest/browser": "^3.1.4",
"@vitest/coverage-v8": "^3.1.4",
"@vitest/browser": "^3.2.3",
"@vitest/coverage-v8": "^3.2.3",
"@vue/eslint-config-prettier": "^10.2.0",
"@vue/test-utils": "^2.4.6",
"@vueuse/router": "^13.2.0",
"change-case": "4.1.2",
"@vueuse/router": "^13.3.0",
"change-case": "5.4.4",
"cross-env": "^7.0.3",
"decompress": "^4.2.1",
"eslint": "^9.27.0",
"eslint-plugin-storybook": "^0.12.0",
"eslint": "^9.28.0",
"eslint-plugin-storybook": "^9.0.8",
"eslint-plugin-vue": "^9.33.0",
"globals": "^16.1.0",
"globals": "^16.2.0",
"husky": "^9.1.7",
"jsdom": "^26.1.0",
"lint-staged": "^15.5.2",
"lint-staged": "^16.1.0",
"monaco-editor": "^0.52.2",
"monaco-yaml": "5.3.1",
"patch-package": "^8.0.0",
"playwright": "^1.52.0",
"playwright": "^1.53.0",
"prettier": "^3.5.3",
"rollup-plugin-copy": "^3.5.0",
"sass": "^1.89.0",
"storybook": "^8.6.14",
"sass": "^1.89.2",
"storybook": "^9.0.8",
"storybook-vue3-router": "^5.0.0",
"typescript": "^5.8.3",
"typescript-eslint": "^8.32.1",
"typescript-eslint": "^8.34.0",
"vite": "^6.3.5",
"vitest": "^3.1.4"
"vitest": "^3.2.3"
},
"optionalDependencies": {
"@esbuild/darwin-arm64": "^0.25.4",
"@esbuild/darwin-x64": "^0.25.4",
"@esbuild/linux-x64": "^0.25.4",
"@rollup/rollup-darwin-arm64": "^4.41.0",
"@rollup/rollup-darwin-x64": "^4.41.0",
"@rollup/rollup-linux-x64-gnu": "^4.41.0",
"@swc/core-darwin-arm64": "^1.11.24",
"@swc/core-darwin-x64": "^1.11.24",
"@swc/core-linux-x64-gnu": "^1.11.24"
"@esbuild/darwin-arm64": "^0.25.5",
"@esbuild/darwin-x64": "^0.25.5",
"@esbuild/linux-x64": "^0.25.5",
"@rollup/rollup-darwin-arm64": "^4.43.0",
"@rollup/rollup-darwin-x64": "^4.43.0",
"@rollup/rollup-linux-x64-gnu": "^4.43.0",
"@swc/core-darwin-arm64": "^1.12.0",
"@swc/core-darwin-x64": "^1.12.0",
"@swc/core-linux-x64-gnu": "^1.12.0"
},
"overrides": {
"bootstrap": {
@@ -141,7 +140,8 @@
},
"el-table-infinite-scroll": {
"vue": "$vue"
}
},
"storybook": "$storybook"
},
"lint-staged": {
"**/*.{js,mjs,cjs,ts,vue}": "eslint --fix"

View File

@@ -15,19 +15,15 @@
<slot v-else />
</span>
</template>
<script>
export default {
props:{
tooltip: {
type: String,
default: ""
},
placement:{
type: String,
default: "top"
},
},
}
<script lang="ts" setup>
withDefaults(
defineProps<{
tooltip?: string;
placement?: string;
}>(),{
tooltip: "",
placement: "",
});
</script>
<style lang="scss" scoped>

View File

@@ -1,5 +1,5 @@
<template>
<Splitpanes class="default-theme" @resize="onResize">
<Splitpanes class="default-theme" v-bind="$attrs" @resize="onResize">
<Pane
v-for="(panel, panelIndex) in panels"
min-size="10"
@@ -47,6 +47,7 @@
@dragleave.prevent
:data-tab-id="tab.value"
@click="panel.activeTab = tab"
@mouseup="middleMouseClose($event, panelIndex, tab)"
>
<component :is="tab.button.icon" class="tab-icon" />
{{ tab.button.label }}
@@ -131,10 +132,31 @@
</div>
</Pane>
</Splitpanes>
<div
v-if="showDropZones"
class="absolute-drop-zones-container"
>
<div
class="new-panel-drop-zone left-drop-zone"
:class="{'panel-dragover': leftPanelDragover}"
@dragover.prevent="leftPanelDragOver"
@dragleave.prevent="leftPanelDragLeave"
@drop.prevent="(e) => newPanelDrop(e, 'left')"
/>
<div
class="new-panel-drop-zone right-drop-zone"
:class="{'panel-dragover': rightPanelDragover}"
@dragover.prevent="rightPanelDragOver"
@dragleave.prevent="rightPanelDragLeave"
@drop.prevent="(e) => newPanelDrop(e, 'right')"
/>
</div>
</template>
<script lang="ts" setup>
import {nextTick, ref, watch, provide} from "vue";
import {nextTick, ref, watch, provide, computed} from "vue";
import {useI18n} from "vue-i18n";
import "splitpanes/dist/splitpanes.css"
@@ -206,6 +228,15 @@
const dragging = ref(false);
const tabContainerRefs = ref<HTMLDivElement[]>([]);
const draggingPanel = ref<number | null>(null);
const realDragging = ref(false);
const leftPanelDragover = ref(false);
const rightPanelDragover = ref(false);
const showDropZones = computed(() =>
realDragging.value &&
movedTabInfo.value &&
!draggingPanel.value
);
function onResize(e: {size:number}[]) {
let i = 0;
@@ -222,7 +253,10 @@
function cleanUp(){
dragging.value = false;
realDragging.value = false;
mouseXRef.value = -1;
leftPanelDragover.value = false;
rightPanelDragover.value = false;
nextTick(() => {
movedTabInfo.value = null
for(const panel of panels.value) {
@@ -244,6 +278,12 @@
}
function dragover(e: DragEvent) {
// Ensure we set the realDragging flag when a drag operation is in progress
if (movedTabInfo.value) {
realDragging.value = true;
dragging.value = true;
}
// if mouse has not moved vertically, stop the processing
// this will be triggered every few ms so perf and readability will be paramount
if(mouseXRef.value === e.clientX){
@@ -381,6 +421,49 @@
}
}
function newPanelDrop(e: DragEvent, direction: "left" | "right") {
if (!movedTabInfo.value) return;
const {tab: movedTab} = movedTabInfo.value;
// Create a new panel with the dragged tab
const newPanel = {
tabs: [movedTab],
activeTab: movedTab
};
// Add the new panel based on the drop direction, not relative to original panel
if (direction === "left") {
panels.value.splice(0, 0, newPanel);
} else {
panels.value.push(newPanel);
}
// Remove the tab from the original panel
// After adding the new panel, the original panel's index may have changed
// Find it again by looking for the tab in all panels
for (let i = 0; i < panels.value.length; i++) {
const panel = panels.value[i];
const tabIndex = panel.tabs.findIndex(t => t.value === movedTab.value);
if (i === 0 && direction === "left") continue;
if (i === panels.value.length - 1 && direction === "right") continue;
if (tabIndex !== -1) {
panel.tabs.splice(tabIndex, 1);
if (panel.activeTab.value === movedTab.value && panel.tabs.length > 0) {
panel.activeTab = tabIndex > 0
? panel.tabs[tabIndex - 1]
: panel.tabs[0];
}
break;
}
}
cleanUp();
}
function closeAllTabs(panelIndex: number){
panels.value[panelIndex].tabs = [];
}
@@ -463,6 +546,36 @@
panelsCopy.splice(newIndex, 0, movedPanel);
panels.value = panelsCopy;
}
function rightPanelDragOver() {
if (!movedTabInfo.value) return;
rightPanelDragover.value = true;
leftPanelDragover.value = false;
removeAllPotentialTabs();
}
function rightPanelDragLeave() {
rightPanelDragover.value = false;
}
function leftPanelDragOver() {
if (!movedTabInfo.value) return;
leftPanelDragover.value = true;
rightPanelDragover.value = false;
removeAllPotentialTabs();
}
function leftPanelDragLeave() {
leftPanelDragover.value = false;
}
function middleMouseClose(event:MouseEvent, panelIndex:number, tab: Tab) {
// Middle mouse button
if (event.button === 1) {
event.preventDefault();
destroyTab(panelIndex, tab);
}
}
</script>
<style lang="scss" scoped>
@@ -620,4 +733,46 @@
transition: background-color 0.2s ease;
}
.absolute-drop-zones-container {
position: absolute;
top: 0;
left: 0;
right: 0;
bottom: 0;
pointer-events: none;
z-index: 100;
display: flex;
justify-content: space-between;
}
.new-panel-drop-zone {
position: relative;
width: 60px;
display: flex;
align-items: center;
justify-content: center;
background-color: rgba(30, 30, 30, 0.5);
transition: all 0.2s ease;
border: 2px dashed var(--ks-border-primary, #444);
border-radius: 4px;
margin: 8px;
pointer-events: auto;
height: calc(100% - 16px);
}
.new-panel-drop-zone:hover,
.new-panel-drop-zone.panel-dragover {
background-color: rgba(40, 40, 40, 0.8);
border-color: var(--ks-border-active, #888);
}
.left-drop-zone {
border-right-width: 2px;
}
.right-drop-zone {
border-left-width: 2px;
}
</style>

View File

@@ -21,8 +21,7 @@
</template>
</el-tab-pane>
</el-tabs>
<section v-if="isEditorActiveTab || activeTab.component" data-component="FILENAME_PLACEHOLDER#container" ref="container" v-bind="$attrs" :class="{...containerClass, 'd-flex flex-row': isEditorActiveTab, 'maximized': activeTab.maximized}">
<section v-if="isEditorActiveTab || activeTab.component" data-component="FILENAME_PLACEHOLDER#container" ref="container" v-bind="$attrs" :class="{...containerClass, 'maximized': activeTab.maximized}">
<EditorSidebar v-if="isEditorActiveTab" ref="sidebar" :style="`flex: 0 0 calc(${explorerWidth}% - 11px);`" :current-n-s="namespace" v-show="explorerVisible" />
<div v-if="isEditorActiveTab && explorerVisible" @mousedown.prevent.stop="dragSidebar" class="slider" />
<div v-if="isEditorActiveTab" :style="`flex: 1 1 ${100 - (isEditorActiveTab && explorerVisible ? explorerWidth : 0)}%;`">
@@ -246,7 +245,6 @@
padding: 0;
display: flex;
flex-grow: 1;
flex-direction: column;
}
:deep(.el-tabs__nav-next),

View File

@@ -320,9 +320,16 @@
}
}
this.$store.dispatch("trigger/search", query).then(triggersData => {
const previousSelection = this.selection;
this.$store.dispatch("trigger/search", query).then(async triggersData => {
this.triggers = triggersData.results;
this.total = triggersData.total;
if (previousSelection && this.$refs.selectTable) {
await this.$refs.selectTable.waitTableRender();
this.$refs.selectTable.setSelection(previousSelection);
}
if (callback) {
callback();
}
@@ -378,9 +385,15 @@
return;
}
this.$store.dispatch("trigger/update", {...trigger, disabled: !value})
.then(_ => {
this.loadData();
})
.then(trigger => {
// replace the update trigger in the list
this.triggers = this.triggers.map(t => {
if (t.id === trigger.id) {
return {triggerContext: trigger, abstractTrigger: t.abstractTrigger};
}
return t;
});
});
},
genericConfirmAction(toast, queryAction, byIdAction, success, data) {
this.$toast().confirm(

View File

@@ -1,5 +1,5 @@
<template>
<div class="h-100 overflow-y-auto no-code">
<div class="no-code">
<Breadcrumbs />
<hr class="m-0">
@@ -19,11 +19,11 @@
import {
BREADCRUMB_INJECTION_KEY, CLOSE_TASK_FUNCTION_INJECTION_KEY,
CREATE_TASK_FUNCTION_INJECTION_KEY, CREATING_TASK_INJECTION_KEY,
EDIT_TASK_FUNCTION_INJECTION_KEY, BLOCKTYPE_INJECT_KEY,
CREATING_TASK_INJECTION_KEY, BLOCKTYPE_INJECT_KEY,
PANEL_INJECTION_KEY, POSITION_INJECTION_KEY,
REF_PATH_INJECTION_KEY, PARENT_PATH_INJECTION_KEY,
FLOW_INJECTION_KEY,
EDITING_TASK_INJECTION_KEY,
} from "./injectionKeys";
import Breadcrumbs from "./components/Breadcrumbs.vue";
import Editor from "./segments/Editor.vue";
@@ -34,7 +34,7 @@
(e: "updateMetadata", value: {[key: string]: any}): void
(e: "reorder", yaml: string): void
(e: "createTask", blockType: string, parentPath: string, refPath: number | undefined, position?: "before" | "after"): boolean | void
(e: "editTask", blockType: string, parentPath: string, refPath: number): boolean | void
(e: "editTask", blockType: string, parentPath: string, refPath?: number): boolean | void
(e: "closeTask"): boolean | void
}>()
@@ -55,9 +55,11 @@
*/
refPath?: number;
creatingTask?: boolean;
editingTask?: boolean;
position?: "before" | "after";
}>(), {
creatingTask: false,
editingTask: false,
position: "after",
refPath: undefined,
blockType: undefined,
@@ -66,7 +68,6 @@
const metadata = computed(() => YAML_UTILS.getMetadata(props.flow));
const creatingTaskRef = ref(props.creatingTask)
const breadcrumbs = ref<Breadcrumb[]>([])
const panel = ref()
@@ -77,13 +78,9 @@
provide(BREADCRUMB_INJECTION_KEY, breadcrumbs);
provide(BLOCKTYPE_INJECT_KEY, props.blockType);
provide(POSITION_INJECTION_KEY, props.position);
provide(CREATING_TASK_INJECTION_KEY, computed(() => creatingTaskRef.value));
provide(CREATE_TASK_FUNCTION_INJECTION_KEY, (blockType, parentPath, refPath) => {
emit("createTask", blockType, parentPath, refPath)
});
provide(EDIT_TASK_FUNCTION_INJECTION_KEY, (blockType, parentPath, refPath) => {
emit("editTask", blockType, parentPath, refPath)
});
provide(CREATING_TASK_INJECTION_KEY, props.creatingTask);
provide(EDITING_TASK_INJECTION_KEY, props.editingTask);
provide(CLOSE_TASK_FUNCTION_INJECTION_KEY, () => {
if (breadcrumbs.value[breadcrumbs.value.length - 1].component) {
breadcrumbs.value.pop();
@@ -95,4 +92,13 @@
})
</script>
<style scoped lang="scss" src="./styles/code.scss" />
<style lang="scss" scoped>
.no-code {
height: 100%;
overflow-y: auto;
hr {
margin: 0;
}
}
</style>

View File

@@ -1,32 +1,31 @@
<template>
<div>
<NoCode
:flow="lastValidFlowYaml"
:parent-path="parentPath"
:ref-path="refPath"
:block-type="blockType"
:creating-task="creatingTask"
:position
@update-metadata="(e) => onUpdateMetadata(e)"
@update-task="(e) => editorUpdate(e)"
@reorder="(yaml) => handleReorder(yaml)"
@create-task="(blockType, parentPath, refPath) => emit('createTask', blockType, parentPath, refPath, 'after')"
@close-task="() => emit('closeTask')"
@edit-task="(blockType, parentPath, refPath) => emit('editTask', blockType, parentPath, refPath)"
/>
</div>
<NoCode
:flow="lastValidFlowYaml"
:parent-path="parentPath"
:ref-path="refPath"
:block-type="blockType"
:creating-task="creatingTask"
:editing-task="editingTask"
:position
@update-metadata="(e) => onUpdateMetadata(e)"
@update-task="(e) => editorUpdate(e)"
@reorder="(yaml) => handleReorder(yaml)"
@close-task="() => emit('closeTask')"
/>
</template>
<script setup lang="ts">
import {computed, ref} from "vue";
import {computed, provide, ref} from "vue";
import debounce from "lodash/debounce";
import {useStore} from "vuex";
import * as YAML_UTILS from "@kestra-io/ui-libs/flow-yaml-utils";
import NoCode from "./NoCode.vue";
import {BlockType} from "./utils/types";
import {CREATE_TASK_FUNCTION_INJECTION_KEY, EDIT_TASK_FUNCTION_INJECTION_KEY} from "./injectionKeys";
export interface NoCodeProps {
creatingTask?: boolean;
editingTask?: boolean;
blockType?: BlockType | "pluginDefaults";
parentPath?: string;
refPath?: number;
@@ -37,10 +36,17 @@
const emit = defineEmits<{
(e: "createTask", blockType: string, parentPath: string, refPath: number | undefined, position: "after" | "before"): boolean | void;
(e: "editTask", blockType: string, parentPath: string, refPath: number): boolean | void;
(e: "editTask", blockType: string, parentPath: string, refPath?: number): boolean | void;
(e: "closeTask"): boolean | void;
}>();
provide(CREATE_TASK_FUNCTION_INJECTION_KEY, (blockType, parentPath, refPath) => {
emit("createTask", blockType, parentPath, refPath, "after")
});
provide(EDIT_TASK_FUNCTION_INJECTION_KEY, (blockType, parentPath, refPath) => {
emit("editTask", blockType, parentPath, refPath)
});
const store = useStore();
const flowYaml = computed<string>(() => store.getters["flow/flowYaml"]);

Some files were not shown because too many files have changed in this diff Show More