Compare commits

...

198 Commits

Author SHA1 Message Date
github-actions[bot]
7b431f6d71 chore(version): update to version '0.23.17' 2025-09-23 12:50:57 +00:00
nKwiatkowski
b7846f42c6 chore(version): update to version '0.23.16' 2025-09-23 14:50:31 +02:00
Bart Ledoux
2ded421c4f tests(core): make one test pass 2025-09-23 14:42:47 +02:00
github-actions[bot]
95f6faf557 chore(version): update to version '0.23.17' 2025-09-23 12:34:27 +00:00
nKwiatkowski
e23b0d1f28 chore(version): update to version '0.23.16' 2025-09-23 14:33:53 +02:00
github-actions[bot]
c630d548f6 chore(core): localize to languages other than english (#11462)
Co-authored-by: GitHub Action <actions@github.com>
2025-09-23 13:24:38 +02:00
Bart Ledoux
516bf76816 Revert "chore(i18n): 👿 add changes to translations for automation"
This reverts commit 495e480bde.
2025-09-23 13:21:50 +02:00
Bart Ledoux
495e480bde chore(i18n): 👿 add changes to translations for automation 2025-09-23 13:20:42 +02:00
Bart Ledoux
0a5509b884 chore: split ui unit test config file 2025-09-23 13:04:34 +02:00
github-actions[bot]
e12f51c386 chore(version): update to version '0.23.17' 2025-09-23 09:41:25 +00:00
brian.mulier
c969eff20a fix(system): avoid trigger locking after scheduler restart
closes #11434
2025-09-22 18:37:55 +02:00
brian.mulier
29db1efc40 fix(ci): same CI as develop 2025-09-22 18:37:50 +02:00
brian-mulier-p
21d8a71255 fix(tests): enforce closing consumers after each tests (#11399) 2025-09-19 16:32:11 +02:00
brian-mulier-p
44bcbe713b fix(core): avoid ClassCastException when doing secret decryption (#11393)
closes kestra-io/kestra-ee#5191
2025-09-19 13:51:01 +02:00
github-actions[bot]
708ffaab38 chore(version): update to version '0.23.16' 2025-09-16 09:20:56 +00:00
Loïc Mathieu
569fc10d48 chore(version): upgrade to 0.23.15 2025-09-09 10:08:19 +02:00
Miloš Paunović
7c23461efd fix(flow)*: properly handle tab closing by clicking the cross icon in the corner of the panel (#11090)
Relates to https://github.com/kestra-io/kestra/issues/10981.
2025-09-04 14:22:12 +02:00
github-actions[bot]
b09b1fdafe chore(version): update to version '0.23.14' 2025-09-02 12:21:34 +00:00
Roman Acevedo
adcab1893b ci: fix setversion-tag.yml not triggering a main.yml job on a pushed tag
the missing token: ${{ secrets.GH_PERSONAL_TOKEN }} is the only difference between this CI and EE CI, so it is probably the right fix

# Conflicts:
#	.github/workflows/setversion-tag.yml
2025-09-02 10:40:47 +02:00
Loïc Mathieu
5710c79954 fix(executions): clear errors/finally/afterExecution branches when changing the state of a taskrun
As changing the state of a taskrun will restart the flow, if we didn't clear those branches, the flow would not resart properly.

Fixes https://github.com/kestra-io/kestra-ee/issues/3211
2025-08-29 16:25:29 +02:00
github-actions[bot]
55a2384253 chore(version): update to version '0.23.13' 2025-08-26 10:35:39 +00:00
brian.mulier
4975c907a7 fix(logs): emitAsync is now keeping messages order 2025-08-25 16:54:13 +02:00
brian.mulier
87d508648d fix(logs): higher max message length to keep stacktraces in a single log 2025-08-25 16:53:34 +02:00
brian.mulier
85da1089ec chore(deps): bump Micronaut platform to 4.9.2
closes #10626
closes #10788
2025-08-25 16:53:34 +02:00
Loïc Mathieu
68e1b9c80f fix(system): properly close the ScheduledExecutorService tasks
This avoids having running threads while the component is supposed to be closed.
2025-08-20 15:52:48 +02:00
nKwiatkowski
21c24e0349 fix: retry flaky test 2025-08-20 11:51:48 +02:00
nKwiatkowski
ed8a908b22 fix: retry flaky test 2025-08-20 11:50:49 +02:00
Nicolas K.
86d97bed77 fix(test): disable kafka concurrency queue test (#10755)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-08-19 18:12:59 +02:00
nKwiatkowski
f2c3cf4f8c chore: update version to 0.23.12 2025-08-19 15:17:56 +02:00
brian.mulier
b3c896eccb fix(core): change cache policy on files returned by webserver that needs to stay fresh
closes #7499
2025-08-19 13:13:53 +02:00
brian.mulier
58d36f5948 fix(dashboards): quick fix to avoid infinite refresh and restore refresh dashboard feature 2025-08-19 13:05:31 +02:00
Florian Hussonnois
922a655a4c fix(core): fix preconditions rendering for ExecutionOutputs (#10651)
Ensure that preconditions are always re-rendered for any
new executions

Changes:
* add new fluent skipCache methods on RunContextProperty and Property
  classes

Fix: #10651
2025-08-18 21:00:06 +02:00
YannC.
94f0e211ba fix: compilation issue 2025-08-14 15:45:19 +02:00
Loïc Mathieu
2b590bf955 fix(execution): parallel flowable may not ends all child flowable
Parallel flowable tasks like `Parallel`, `Dag` and `ForEach` are racy. When a task fail in a branch, other concurrent branches that have flowable may never ends.
We make sure that all children are terminated when a flowable is itself terminated.

Fixes #6780
2025-08-14 12:25:56 +02:00
Loïc Mathieu
b61eeaff8c fix(execution): concurrency limit didn't work with afterExecutions
This is because the execution is never considered fully terminated so concurrency limit is not handled properly.
This should also affect SLA, trigger lock, and other cleaning stuff.

The root issue is that, with a worker task from afterExecution, there are no other update on the execution itself (as it's already terminated) so no execution messages are again processed by the executor.

Because of that, the worker task result message from the afterExecution block is the last message, but unfortunatly as messages from the worker task result have no flow attached, the computation of the final termination is incorrect.
The solution is to load the flow if null inside the executor and the execution is terminated which should only occurs inside afterExecution.

Fixes #10657
Fixes #8459
Fixes #8609
2025-08-13 09:32:25 +02:00
Prayag
26d7fa47d3 fix(core): Enter key is now validating filter / refreshing data (#9630)
closes #9471

---------

Co-authored-by: brian.mulier <bmmulier@hotmail.fr>
2025-08-12 17:29:14 +02:00
Loïc Mathieu
bece420c9a fix(executions): SLA monitor should take into account restarted executions 2025-08-12 11:49:14 +02:00
Loïc Mathieu
081066888f fix(executions): concurrency limit exceeded when restarting an execution
Fixes #7880
2025-08-12 11:49:02 +02:00
YannC.
c721fe68a7 chore: update version to 0.23.11 2025-08-12 10:47:34 +02:00
Loïc Mathieu
88c77084f5 fix(executions): correctly fail the request when trying to resume an execution with the wrong inputs
Fixes #9959
2025-08-12 09:40:23 +02:00
brian.mulier
3846ac87e3 fix(dashboard): properly use time filters in queries
closes kestra-io/kestra-ee#4389
2025-08-11 22:29:56 +02:00
Nicolas K.
b9c843f01d feat(releases): add test jar to meven central deployment (#10675)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-08-11 17:17:06 +02:00
brian.mulier
9686718130 tests(core): add a test to taskrunners to ensure it's working multiple times on the same working directory
part of kestra-io/plugin-ee-kubernetes#45
2025-08-11 15:00:32 +02:00
Loïc Mathieu
da40f46b4a fix(executions): properly fail the task if it contains unsupported unicode sequence
This occurs in Postgres using the `\u0000` unicode sequence. Postgres refuse to store any JSONB with this sequence as it has no textual representation.
We now properly detect that and fail the task.

Fixes #10326
2025-08-11 11:54:35 +02:00
Piyush Bhaskar
4ae207ed81 fix(flows): copy trigger url propely. (#10645) 2025-08-08 13:14:49 +05:30
brian.mulier
0a2fa4d3b2 fix(core): ensure props with defaults are not marked as required in generated doc 2025-08-07 15:09:46 +02:00
brian.mulier
995d5c1ac2 fix(core): wrong @NotNull import leading to key not being marked as required
closes #9287
2025-08-07 15:09:46 +02:00
Loïc Mathieu
92bf135c02 fix(test): RestactCaseTest.restartFailedWithFinally() should use executionService.isTerminated() 2025-08-06 09:56:43 +02:00
brian.mulier
d318281342 chore(version): upgrade to version 0.23.10 2025-08-05 10:56:31 +02:00
Loïc Mathieu
3f68749276 fix(executions): Don't create outputs from the Subflow task when we didn't wait
As, well, if we didn't wait for the subflow execution, we cannot have access to its outputs.
2025-07-31 13:07:46 +02:00
brian.mulier
bc07dfbf1c fix(core): avoid follow execution from being discarded too early
closes #10472
closes #7623
2025-07-31 10:26:16 +02:00
Piyush Bhaskar
3b3aa495b0 fix(core): update running count and status of executions in concurrency (#10418) 2025-07-29 18:30:40 +05:30
Loïc Mathieu
a945780e4d chore(version): upgrade to version 0.23.9 2025-07-29 14:57:00 +02:00
Roman Acevedo
d512f86927 fix(cli): tenantService was injected directly, this is not working in cli 2025-07-29 14:42:33 +02:00
github-actions[bot]
7f355dd730 chore(core): localize to languages other than english (#10405)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-29 10:50:29 +02:00
Loïc Mathieu
ffa33b1a7a chore(version): version 0.23.8 2025-07-29 10:11:12 +02:00
Roman Acevedo
a5b4ec3b2e fix(triggers): bulk action on triggers did not take into account this is async (#10307) 2025-07-29 09:31:43 +02:00
Miloš Paunović
5585e9df47 fix(namespaces): make sure the namespace parameter is properly passed when reading a file (#10384)
Relates to https://github.com/kestra-io/kestra/issues/10363.
Relates to https://github.com/kestra-io/kestra-ee/issues/4514.
2025-07-29 09:20:29 +02:00
YannC
f8cb335a16 fix: set postgres and mysql queue offset as a bigint (#10344) 2025-07-28 16:28:47 +02:00
Loïc Mathieu
af9129f900 fix(core): compilation issue 2025-07-28 15:08:57 +02:00
yuri
177ba35e15 fix(core): amend misc label-related issues (#10044)
* fix(core): amend misc label-related issues

* re-enabled bulk update of label value
* re-enabled merging flow-execution labels by key
* made duplicated keys rejection readable
* forced multiple validations within `RequestUtils`
* ensured existing labels can be overriden
* added multiple tests validating complex scenarios

BREAKING CHANGE: switched from first to last label value override
BREAKING CHANGE: preventing empty key/value labels
BREAKING CHANGE: preventing whitespace in key

* fix(core): reflect feedback

* Deduplicated a list inside the `Labels` task.
* Worked around label mutation at `Worker`.
* Attempted to deduplicate labels within `Execution` as possible.

* fix(core): remove irrelevant changes
2025-07-28 11:31:19 +02:00
Florian Hussonnois
b99946deff fix(system): avoid potential NPE in ServiceLivenessManager (#10338)
Avoid a potential NPE in ServiceLivenessManager when
a local service is unregistered during shutdown before the liveness probe completes

Fix: #10338
2025-07-25 12:34:51 +02:00
Florian Hussonnois
19428ad344 fix(system): ignore state transition failure for indexer
Fix: kestra-io/kestra-ee#4474
2025-07-25 12:34:44 +02:00
Loïc Mathieu
162764ff0d fix(executions): flow concurrency limit not honors when executions are created at a high rate
This is due to the fact that we now process the execution queue concurrently so there is a race when counting currently running executions. This can be seen easily using a ForEachItem as it could create tens or hundreds of executions almost instantly leading to almost all those executions started as they would all see 0 executions running...

Using a dedicated execution running queue, as done in EE, would serialize the messages and fix the issue.

However, if using multiple executor instances and concurrency limit = 1, there is a theoretical race as no locks will be done if no execution is running. A max surge of executions could be as high as the number of executor but this race is less probable to happen in real world scenario.

Fixes #10167
2025-07-25 12:06:26 +02:00
Florian Hussonnois
ccd7b43b97 fix(core): fix search filtering based on endDate field (kestra-io/kestra-ee#4446)
Related-to: kestra-io/kestra-ee#4446
2025-07-25 11:14:35 +02:00
Loïc Mathieu
53f881ed60 fix(executions): race condition inside nested ForEach with concurrency
Fixes #10167
2025-07-25 09:46:46 +02:00
Piyush Bhaskar
0759aaeae8 fix(executions): update query parameter for state filtering (#10315) 2025-07-24 14:42:01 +05:30
Miloš Paunović
fc8b389d09 fix(executions): make sure outputs do not overflow over right drawer (#10238)
Closes https://github.com/kestra-io/kestra/issues/10232.
2025-07-24 10:40:22 +02:00
MilosPaunovic
8355eb191e chore(flows): show small execution charts on flow listing
Co-authored-by: YannC. <ycoornaert@kestra.io>
2025-07-24 10:34:32 +02:00
Piyush Bhaskar
50f72f8ea3 fix(core): check null uri (#10309) 2025-07-24 12:48:42 +05:30
Nicolas K.
ae14d980a4 fix(cli): #10062 add tenant to load flows properly at the startup (#10290)
* fix(cli): #10062 add tenant to load flows properly at the startup

* fix(cli): #10062 add fallback tenant to ee service

* fix(cli): #10062 use tenant id in all cli

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-07-23 15:36:42 +02:00
Florian Hussonnois
bc1a08b418 chore(version): bump to version '0.23.7' 2025-07-22 15:50:22 +02:00
Nicolas K.
e264c0b75d fix(pebble): #8953 add more flexible day number conversion method (#10205)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-07-22 14:59:24 +02:00
Piyush Bhaskar
fccbb6b648 fix(logs): update query filter to show logs ns and flowwise (#10248) 2025-07-22 18:04:27 +05:30
MilosPaunovic
a243c563d3 fix(namespaces)*: prevent overwriting namespace file content with undefined string
Relates to https://github.com/kestra-io/kestra-ee/issues/4439.
2025-07-22 14:28:28 +02:00
Loïc Mathieu
45ad1f6ee4 fix(tests): strengthen awaitExecution predicate
In some test situation, awaitExecution may receive old messages so we strenghten the predicate to be sure to wait for the correct execution: the one that ends successfully
2025-07-22 12:47:40 +02:00
Piyush Bhaskar
8359bfc680 fix(triggers): ensure clearing the selection. (#10245) 2025-07-22 16:00:31 +05:30
YannC.
30a808188c fix: handle label filter with and instead or for flow
close #4390
2025-07-22 09:42:06 +02:00
Loïc Mathieu
5121ceb63a fixx(system): compilation issue 2025-07-21 12:36:03 +02:00
Loïc Mathieu
1dae994910 fix(executions)*: restart with finally or afterExecution
When a flow fail and is restarted and contains either a finally or an afterExecution block, those are not resetted so the restart will skip all task and terminate the flow.
The fix will reset the status of those tasks so they are restarted.

Fixes #10155
2025-07-21 12:27:54 +02:00
Loïc Mathieu
26a82fce95 fix(executions): support unicode file name inside the internal storage
Fixes #9550
2025-07-21 12:27:25 +02:00
Bart Ledoux
a8584a8a33 feat(flows): add setting to disable hovers in editor 2025-07-17 10:43:05 +02:00
Piyush Bhaskar
5737216b34 fix(triggers): only updates the trigger that matches both flowId and triggerId (#10156) 2025-07-17 14:10:44 +05:30
YannC.
747c424f1f chore(version): upgrade to v0.23.6 2025-07-15 14:52:54 +02:00
brian-mulier-p
33bfc979c5 fix(core): trim expressions in select & multiselect to be able to use '|' instead of '>-' (#10017)
closes #10016
2025-07-09 16:39:02 +02:00
nKwiatkowski
58ceb66cfb chore(version): upgrade to v0.23.5 2025-07-08 15:18:25 +02:00
Loïc Mathieu
a08266593f fix(webserver)*: bulk set labels remove existing labels
FIxes #9764
2025-07-07 15:26:09 +02:00
Loïc Mathieu
d5d5f457b4 fix(system): force running after execution tasks even if the execution is killed
Fixes #9852
2025-07-07 12:41:31 +02:00
François Delbrayelle
cacac2239d fix(taskrunner): abstract task runner (#9769) 2025-07-04 09:50:08 +02:00
nKwiatkowski
5c45bd5eb5 feat(cicd): #4006 add javadoc and sources to cli release 2025-07-03 14:58:12 +02:00
Miloš Paunović
fdf126202c fix(namespaces)*: take pagination into account when browsing namespace flows (#9849)
Closes https://github.com/kestra-io/kestra/issues/9805.
2025-07-02 11:48:53 +02:00
Nicolas K.
0f3c745bb9 feat(cicd): #4006 change signing method (#9854)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-07-02 11:14:07 +02:00
YannC.
5a6a0ff3e3 chore(version): upgrade to v0.23.4 2025-07-01 17:56:42 +02:00
Loïc Mathieu
f5f88e18ce feat(cluster): persist maintenance mode in the database
Part-of: https://github.com/kestra-io/kestra-ee/issues/3735
2025-07-01 17:56:42 +02:00
Nicolas K.
12f521860e feat(cicd): #4006 migrate to maven central (#9807)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-07-01 16:19:11 +02:00
Nicolas K.
b6cf3e1f93 feat(cicd): #4006 migrate sonatype to maven central (#9803)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-07-01 15:02:05 +02:00
YannC.
7125885ea9 fix(triggers): correctly replace the update triggers when disabling 2025-07-01 14:21:22 +02:00
YannC.
0b29a4a247 feat(triggers): avoid clearing selection when refreshing in triggers list 2025-07-01 14:21:22 +02:00
Piyush Bhaskar
0377f87c66 feat(tenant): all routes on /main tenant 2025-07-01 11:57:13 +05:30
Loïc Mathieu
06bd0c6380 fix(system)*: mitigate possible deadlock for execution delay and SLA
In case multiple instances of the executor are started, the execution delay loop and the monitoring SLA loop have a risk of duplicate execution resume or execution SLA violation computation.
This could create some race conditions and duplicate execution update.
But this may also risk to create some deadlocks as two instances of the executor may try to lock the same exection to restart it (or fail it due to SLA).
2025-06-30 14:33:54 +02:00
brian.mulier
cd39995f24 fix(core): use namespace prefix instead of equals
On the namespace/flows, namespace/executions pages and when having a default namespace on Logs page

closes kestra-io/kestra-ee#4200
2025-06-25 17:48:54 +02:00
Loïc Mathieu
938e156bd5 chore(system): call the close runnable later 2025-06-25 14:37:46 +02:00
brian.mulier
1fb7943738 chore(version): update to version '0.23.3' 2025-06-24 17:33:04 +02:00
brian-mulier-p
09d648cf86 fix(variables): put fixtures files with arbitrary key and extract it back as root level "files" variable (#9689) 2025-06-24 17:32:37 +02:00
brian.mulier
02a22faed4 chore(version): update to version '0.23.2' 2025-06-24 14:19:20 +02:00
Ludovic DEHON
169d6610f5 test(core): fix falling test on schedule 2025-06-24 14:19:20 +02:00
Loïc Mathieu
e253958cf4 fix(system): possible NPE on trigger when computing variables 2025-06-24 14:19:20 +02:00
brian-mulier-p
c75f06a036 fix: avoid failure to deserialize json objects that have unknown fields with http client (#9668)
closes #9667
2025-06-24 14:19:20 +02:00
Loïc Mathieu
b3b1b7a5cb feat(executions)*: add tasks to set and unset execution variables
Closes #9555
2025-06-24 14:19:20 +02:00
Loïc Mathieu
34e07b9e2b fix(execution): parent flow never ends when subflow fail due to SLA
This is because the executor didn't have the flow inside it so the execution is not correctly terminated.
It may fix other issues (like flow triggers, purge, ...)

Fixes #9618
2025-06-20 18:04:12 +02:00
Loïc Mathieu
85b449c926 fix(system): flow graph fail to be created while editting a flow
Fixes #9551

It is not the validation per se that fail, it's the graph dependency computation that is also done while editing a flow that fail.
2025-06-20 12:09:18 +02:00
Loïc Mathieu
0017ead9b3 fix(system)*: runIf inside a WorkingDirectory can crash the Worker
Fixes #9639
2025-06-20 12:09:04 +02:00
Barthélémy Ledoux
b0292f02f7 fix(ui): default value for expression cannot be null (#9636) 2025-06-20 11:12:32 +02:00
Piyush Bhaskar
202dc7308d feat(namespaces): show ns description (#9610)
* feat(namespaces): show ns description

* add slot and data for description
2025-06-20 13:59:03 +05:30
François Delbrayelle
3273a9a40c fix(plugin-versioning): replace current JAR if more recent (#9629) 2025-06-20 09:51:21 +02:00
Loïc Mathieu
bd303f4529 fix(system): support allowFailure and allowWarning for the Pause task
Fixes #9416
2025-06-19 17:34:38 +02:00
Barthélémy Ledoux
db57326f0f tests: nocode editor (#9624) 2025-06-19 14:21:15 +02:00
github-actions[bot]
90a576490f chore(version): update to version '0.23.1' 2025-06-19 10:32:53 +00:00
Loïc Mathieu
2cdd968100 feat(system): store version in the settings 2025-06-19 12:23:20 +02:00
Barthélémy Ledoux
adfc3bf526 perf(ui): load a sample schema while waiting (#9558) 2025-06-19 11:34:15 +02:00
Nicolas K.
3a61f9b1ba Fix/tutorial flows with migration (#9620)
* fix(core): #9609 delete tutorial flows and triggers before migrating the database

* fix(core): #9609 delete tutorial flows and triggers before migrating the database for EE version

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-06-19 10:58:29 +02:00
YannC
64e3014426 fix: correctly use default tenant when synchronizing file with local (#9605)
close #9568
2025-06-19 10:04:58 +02:00
François Delbrayelle
1f68e5f4ed fix(podman): do not pass the tag directly to pullImageCmd (withTag) (#9607) 2025-06-18 18:50:54 +02:00
François Delbrayelle
9bfa888e36 fix(plugin): FileSystems.newFileSystem caused a Path component should be / in plugins tests (#9570) 2025-06-18 16:03:45 +02:00
github-actions[bot]
691a77538a chore(version): update to version '0.23.0' 2025-06-17 09:35:23 +00:00
Bart Ledoux
b07086f553 chore: update ui-libs 2025-06-17 11:21:21 +02:00
Ludovic DEHON
ee12c884e9 fix(tasks): sleep example are a full one 2025-06-16 15:02:34 +02:00
Barthélémy Ledoux
712d6da84f fix(ui): make file panel appear beside main panel in namespace (#9546) 2025-06-16 14:45:05 +02:00
Bart Ledoux
fcc5fa2056 fix: package-lock 2025-06-16 14:44:01 +02:00
Loïc Mathieu
dace30ded7 fix(system): compilation issue 2025-06-16 14:18:55 +02:00
github-actions[bot]
2b578f0f94 chore(version): update to version '0.23.0-rc5-SNAPSHOT' 2025-06-16 12:05:27 +00:00
Florian Hussonnois
91f958b26b fix(executor): delete WorkerJobRunning for any terminated task (#9493)
Make ExecutorService responsible for deleting WorkerJobRunning
when a terminated TaskRun is added to an execution.

Changes:
 - Remove unecessary read before delete on WorkerJobRunning table.

Close: #9493
2025-06-16 14:03:11 +02:00
Bart Ledoux
d7fc6894fe tests: fix storybook tests 2025-06-16 13:29:34 +02:00
Bart Ledoux
c286348d27 fix(ui): make array and KV Pairs work in nocode 2025-06-16 12:17:23 +02:00
brian.mulier
de4ec49721 fix(core): yaml utils migration 2025-06-16 11:18:47 +02:00
Barthélémy Ledoux
1966ac6012 fix: cleanup empty metadata to fix variable creation (#9529) 2025-06-16 11:17:52 +02:00
Barthélémy Ledoux
a293a37ec9 fix(ui): nocode API calls on EE needs tenant (#9527) 2025-06-16 11:17:43 +02:00
Barthélémy Ledoux
f295724bb6 fix: small tweaks on tabs (#9520) 2025-06-16 11:17:34 +02:00
Barthélémy Ledoux
06505ad977 fix(ui): snafu on duplicate input pair (#9514) 2025-06-16 11:15:30 +02:00
Barthélémy Ledoux
cb31ef642f fix(ui): [nocode] make dag tasks work (#9506) 2025-06-16 11:14:17 +02:00
Barthélémy Ledoux
c320323371 fix(ui): nocode updating inputs from yaml (#9430) 2025-06-16 11:12:35 +02:00
Barthélémy Ledoux
a190cdd0e7 fix(ui): add datepicker to nocode string field (#9351)
Co-authored-by: GitHub Action <actions@github.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
2025-06-16 11:12:27 +02:00
Barthélémy Ledoux
0678f7c5e9 fix(ui): rename namespace field (#9492) 2025-06-16 11:08:05 +02:00
Barthélémy Ledoux
f39ba5c95e fix(ui): prevent cursor change in Editor component when modelValue is updated from outside (#9371) 2025-06-16 11:07:55 +02:00
Karuna Tata
b4e334c5d8 feat(ui): drag and convert tabs to panels (#9198)
Co-authored-by: Barthélémy Ledoux <bledoux@kestra.io>
2025-06-16 11:07:37 +02:00
Bart Ledoux
561380c942 fix(ui): restore add button as a button 2025-06-16 11:07:25 +02:00
Satvik Kushwaha
68b4867b5a fix(ui): make download and preview visible for text ouputs (#8348)
Co-authored-by: Barthélémy Ledoux <ledouxb@me.com>
2025-06-16 11:06:24 +02:00
Barthélémy Ledoux
cb7f99d107 fix(ui): variables should work with duplicated keys (#9425) 2025-06-16 11:05:17 +02:00
Barthélémy Ledoux
efac7146ff fix: properly detect condition fields (#9353) 2025-06-16 11:02:41 +02:00
Barthélémy Ledoux
11de42c0b8 fix(ui): nocode - open onPause in a new tab (#9366) 2025-06-16 11:02:31 +02:00
Barthélémy Ledoux
b58d9e10dd fix: initialize array fields without any value (#9367) 2025-06-16 11:00:04 +02:00
Barthélémy Ledoux
e25e70d37e refactor: load nocode root form from server schema (#9327) 2025-06-16 10:59:53 +02:00
Karuna Tata
f2dac28997 fix(ui): clear selection of retry form radio buttons (#9268)
Co-authored-by: Barthélémy Ledoux <ledouxb@me.com>
thank you so much for this geat work ! ❤️
2025-06-16 10:59:44 +02:00
Barthélémy Ledoux
0ac8819d95 fix(ui): allow key of sub-tasks to be other than tasks (#9333) 2025-06-16 10:59:24 +02:00
Ludovic DEHON
d261de0df3 fix(core): robots.txt was not served
close kestra-io/kestra#9015
2025-06-13 23:01:48 +02:00
brian.mulier
02cac65614 fix(core): filters was triggering endless refresh
closes #9508
2025-06-13 16:25:34 +02:00
MilosPaunovic
5064687b7e fix(core)*: make sure tour always opens with code & topology tabs visible (#9513)
Closes https://github.com/kestra-io/kestra-ee/issues/4073.
2025-06-13 08:55:20 +02:00
YannC
7c8419b266 fix(ui): Better duplicate key management in the pair component (#9431)
* fix(ui): Better duplicate key mananage in the pair component

close #9220

* fix(ui): add a have-error prop on inputText that show a red shadow

* refactor: simplify inputpair component (#9491)

* fix: only show lock if disabled

* alertState define order

---------

Co-authored-by: Barthélémy Ledoux <bledoux@kestra.io>
2025-06-12 13:28:02 +02:00
Roman Acevedo
84e4c62c6d fix(tests): test editor was showing previous shown plugin doc
fixes https://github.com/kestra-io/kestra-ee/issues/4066
2025-06-12 13:21:29 +02:00
Nicolas K.
9aa605e23b Feat/rework compatibility layer (#9490)
* feat(core): rework compatibility layer

* feat(core): #4062 rework compatibility layer

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-06-12 10:42:49 +02:00
Roman Acevedo
faa77aed79 feat(tests): add execution url in test result 2025-06-12 10:03:05 +02:00
brian-mulier-p
fdce552528 feat(core): introduce tasksWithState autocompletion (#9485)
part of #8350
2025-06-12 09:55:57 +02:00
brian.mulier
a028a61792 fix(core): avoid infinite load upon route redirect (#9480)
closes #9479
2025-06-11 17:03:52 +02:00
brian.mulier
023a77a320 fix(core): properly map labels filters from query (#9480)
closes #9324
2025-06-11 17:03:52 +02:00
brian.mulier
bfee04bca2 fix(core): prevent incompatible timeRange & start/endDate filters + prevent multiple scope filters (#9480)
closes #9240
2025-06-11 17:03:52 +02:00
YannC
3756f01bdf fix(ui): base the required prop on the requiredProperties list (#9433)
close #9377
2025-06-11 13:09:27 +02:00
YannC
c1240d7391 feat(ui): allow to close a tab with mouse middle click like in a navigator/ide (#9434) 2025-06-11 08:55:13 +02:00
YannC
ac37ae6032 fix(core): use Min annotation instead of Positive (#9432)
close #9380
2025-06-10 17:15:11 +02:00
github-actions[bot]
9e51b100b0 chore(version): update to version '0.23.0-rc3-SNAPSHOT' 2025-06-10 12:51:54 +00:00
Miloš Paunović
bc81e01608 fix(core)*: properly display chart colors for logs (#9429) 2025-06-10 13:51:56 +02:00
YannC.
9f2162c942 feat(): add Kestra plugin in the list 2025-06-10 12:44:09 +02:00
brian-mulier-p
97992d99ee fix(core): handle properly dot in nested keys & commas in quoted filter values (#9410) 2025-06-10 11:55:30 +02:00
brian.mulier
f90f6b8429 chore(deps): bump vitest to 3.2.3 2025-06-10 11:55:30 +02:00
brian.mulier
0f7360ae81 build(tests): replace workspaces with proper storybook config + working aliases 2025-06-10 11:53:11 +02:00
Florian Hussonnois
938590f31f fix(plugins): check whether plugin registry support versioning (#9122) 2025-06-10 11:49:40 +02:00
YannC.
b2d1c84a86 fix(): display correctly doc/chart preview when editing custom dashboard
close #9411
2025-06-10 10:25:41 +02:00
Ludovic DEHON
d7ca302830 feat(system): add server_type as global metrics tags 2025-06-10 09:23:14 +02:00
Roman Acevedo
8656e852cc build(ci): fix setversion workflow not making tag push trigger main 2025-06-09 18:03:49 +02:00
brian-mulier-p
cc72336350 fix(core): avoid adding invalid keys from query parameters to filter (#9383)
closes #9364
2025-06-09 18:03:49 +02:00
Roman Acevedo
316d89764e tests(core): add storybook on executions filters (#9354) 2025-06-09 18:03:49 +02:00
Barthélémy Ledoux
4873bf4d36 chore: upgrade storybook (#9326) 2025-06-09 14:40:21 +02:00
Florian Hussonnois
204bf7f5e1 chore: add script to update gradle kestraVersion prop on plugins 2025-06-09 14:31:45 +02:00
Loïc Mathieu
1e0950fdf8 fix(system): import flow should set the tenantId 2025-06-09 13:51:53 +02:00
github-actions[bot]
4cddc704f4 chore(version): update to version '0.23.0-rc2-SNAPSHOT' 2025-06-09 10:48:43 +00:00
Miloš Paunović
f2f0e29f93 fix(namespaces): properly load flows when changing namespace (#9393)
Closes https://github.com/kestra-io/kestra/issues/9352.
2025-06-09 12:34:36 +02:00
Miloš Paunović
95011e022e fix(namespaces): reload namespace once the id parameter changes (#9372)
Closes https://github.com/kestra-io/kestra-ee/issues/3630.
2025-06-06 12:25:37 +02:00
brian.mulier
65503b708a fix(core): add DefaultFilterLanguage as default in KestraFilter
closes #9365
2025-06-05 17:42:34 +02:00
brian-mulier-p
876b8cb2e6 fix(core): avoid crashing in case of taskrun having too large value (#9359)
closes #9312
2025-06-05 14:11:37 +02:00
Nicolas K.
f3b7592dfa fix(flows): #9319 error when puase with timeout trigger an execution (#9334)
* fix(flows): #9319 error when puase with timeout trigger an execution even after it's terminated

* fix(flows): only skip paused flow when execution is terminated

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-06-05 10:15:49 +02:00
brian.mulier
4dbeaf86bb fix(core): larger debounce for filter 2025-06-05 09:48:53 +02:00
brian.mulier
f98e78399d fix(core): handle whitespaces in label key and value 2025-06-05 09:48:43 +02:00
brian.mulier
71dac0f311 fix(core): smarter autocomplete order in editor 2025-06-05 09:48:00 +02:00
brian-mulier-p
3077d0ac7a fix(core): additional plugins are now properly shown in plugin docs (#9329)
closes kestra-io/plugin-langchain4j#61
2025-06-05 09:46:57 +02:00
YannC.
9504bbaffe fix(ci): put back bump helm chart and remove if condition 2025-06-05 08:48:56 +02:00
YannC.
159c9373ad fix(ci): checkout actions from main branch 2025-06-04 21:12:56 +02:00
YannC.
55b9088b55 fix(ci): modify actions order 2025-06-04 21:06:17 +02:00
YannC.
601d1a0abb fix(ci): Correctly pass all the secrets through all workflows 2025-06-04 15:10:33 +02:00
Florian Hussonnois
4a1cf98f26 chore(version): bump to version '0.23.0-rc1-SNAPSHOT' 2025-06-04 14:07:30 +02:00
353 changed files with 9051 additions and 5013 deletions

View File

@@ -80,7 +80,6 @@ python3 -m pip install virtualenv
The frontend is made with [Vue.js](https://vuejs.org/) and located on the `/ui` folder.
- `npm install`
- create a file `ui/.env.development.local` with content `VITE_APP_API_URL=http://localhost:8080` (or your actual server url)
- `npm run dev` will start the development server with hot reload.
- The server start by default on port 5173 and is reachable on `http://localhost:5173`
- You can run `npm run build` in order to build the front-end that will be delivered from the backend (without running the `npm run dev`) above.

View File

@@ -1,26 +1,31 @@
# See GitHub's docs for more information on this file:
# https://docs.github.com/en/free-pro-team@latest/github/administering-a-repository/configuration-options-for-dependency-updates
version: 2
updates:
# Maintain dependencies for GitHub Actions
- package-ecosystem: "github-actions"
directory: "/"
schedule:
# Check for updates to GitHub Actions every week
interval: "weekly"
day: "wednesday"
time: "08:00"
timezone: "Europe/Paris"
open-pull-requests-limit: 50
labels:
- "dependency-upgrade"
open-pull-requests-limit: 50
# Maintain dependencies for Gradle modules
- package-ecosystem: "gradle"
directory: "/"
schedule:
# Check for updates to Gradle modules every week
interval: "weekly"
day: "wednesday"
time: "08:00"
timezone: "Europe/Paris"
open-pull-requests-limit: 50
labels:
- "dependency-upgrade"
open-pull-requests-limit: 50
# Maintain dependencies for NPM modules
- package-ecosystem: "npm"
@@ -31,8 +36,15 @@ updates:
time: "08:00"
timezone: "Europe/Paris"
open-pull-requests-limit: 50
labels: ["dependency-upgrade"]
labels:
- "dependency-upgrade"
ignore:
# Ignore updates of version 1.x, as we're using beta of 2.x
# Ignore updates of version 1.x, as we're using the beta of 2.x (still in beta)
- dependency-name: "vue-virtual-scroller"
versions: ["1.x"]
versions:
- "1.x"
# Ignore updates to monaco-yaml, version is pinned to 5.3.1 due to patch-package script additions
- dependency-name: "monaco-yaml"
versions:
- ">=5.3.2"

View File

@@ -2,7 +2,7 @@ name: Auto-Translate UI keys and create PR
on:
schedule:
- cron: "0 9-21 * * *" # Every hour from 9 AM to 9 PM
- cron: "0 9-21/3 * * *" # Every 3 hours from 9 AM to 9 PM
workflow_dispatch:
inputs:
retranslate_modified_keys:
@@ -20,13 +20,13 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
name: Checkout
with:
fetch-depth: 0
- name: Set up Python
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: "3.x"
@@ -39,13 +39,10 @@ jobs:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
- name: Set up Node
uses: actions/setup-node@v4
uses: actions/setup-node@v5
with:
node-version: "20.x"
- name: Check keys matching
run: node ui/src/translations/check.js
- name: Set up Git
run: |
git config --global user.name "GitHub Action"
@@ -64,4 +61,7 @@ jobs:
fi
git commit -m "chore(core): localize to languages other than english" -m "Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference."
git push -u origin $BRANCH_NAME || (git push origin --delete $BRANCH_NAME && git push -u origin $BRANCH_NAME)
gh pr create --title "Translations from en.json" --body "This PR was created automatically by a GitHub Action." --base develop --head $BRANCH_NAME --assignee anna-geller --reviewer anna-geller
gh pr create --title "Translations from en.json" --body $'This PR was created automatically by a GitHub Action.\n\nSomeone from the @kestra-io/frontend team needs to review and merge.' --base ${{ github.ref_name }} --head $BRANCH_NAME
- name: Check keys matching
run: node ui/src/translations/check.js

View File

@@ -27,7 +27,7 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v4
uses: actions/checkout@v5
with:
# We must fetch at least the immediate parents so that if this is
# a pull request then we can checkout the head.
@@ -50,7 +50,7 @@ jobs:
# Set up JDK
- name: Set up JDK
uses: actions/setup-java@v4
uses: actions/setup-java@v5
if: ${{ matrix.language == 'java' }}
with:
distribution: 'temurin'

View File

@@ -1,147 +0,0 @@
name: Create Docker images on Release
on:
workflow_dispatch:
inputs:
retag-latest:
description: 'Retag latest Docker images'
required: true
type: string
default: "true"
options:
- "true"
- "false"
release-tag:
description: 'Kestra Release Tag'
required: false
type: string
plugin-version:
description: 'Plugin version'
required: false
type: string
default: "LATEST"
env:
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
jobs:
plugins:
name: List Plugins
runs-on: ubuntu-latest
outputs:
plugins: ${{ steps.plugins.outputs.plugins }}
steps:
# Checkout
- uses: actions/checkout@v4
# Get Plugins List
- name: Get Plugins List
uses: ./.github/actions/plugins-list
id: plugins
with:
plugin-version: ${{ env.PLUGIN_VERSION }}
docker:
name: Publish Docker
needs: [ plugins ]
runs-on: ubuntu-latest
strategy:
matrix:
image:
- name: "-no-plugins"
plugins: ""
packages: jattach
python-libs: ""
- name: ""
plugins: ${{needs.plugins.outputs.plugins}}
packages: python3 python-is-python3 python3-pip curl jattach
python-libs: kestra
steps:
- uses: actions/checkout@v4
# Vars
- name: Set image name
id: vars
run: |
if [[ "${{ inputs.release-tag }}" == "" ]]; then
TAG=${GITHUB_REF#refs/*/}
echo "tag=${TAG}" >> $GITHUB_OUTPUT
else
TAG="${{ inputs.release-tag }}"
echo "tag=${TAG}" >> $GITHUB_OUTPUT
fi
if [[ "${{ env.PLUGIN_VERSION }}" == *"-SNAPSHOT" ]]; then
echo "plugins=--repositories=https://s01.oss.sonatype.org/content/repositories/snapshots ${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT;
else
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
fi
# Download release
- name: Download release
uses: robinraju/release-downloader@v1.12
with:
tag: ${{steps.vars.outputs.tag}}
fileName: 'kestra-*'
out-file-path: build/executable
- name: Copy exe to image
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
# Docker setup
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Docker - Fix Qemu
shell: bash
run: |
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
# Docker Login
- name: Login to DockerHub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_PASSWORD }}
# Docker Build and push
- name: Push to Docker Hub
uses: docker/build-push-action@v6
with:
context: .
push: true
tags: ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }}
platforms: linux/amd64,linux/arm64
build-args: |
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }}
APT_PACKAGES=${{ matrix.image.packages }}
PYTHON_LIBRARIES=${{ matrix.image.python-libs }}
- name: Install regctl
if: github.event.inputs.retag-latest == 'true'
uses: regclient/actions/regctl-installer@main
- name: Retag to latest
if: github.event.inputs.retag-latest == 'true'
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
end:
runs-on: ubuntu-latest
needs:
- docker
if: always()
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
steps:
# Slack
- name: Slack notification
uses: Gamesight/slack-workflow-status@master
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
name: GitHub Actions
icon_emoji: ':github-actions:'
channel: 'C02DQ1A7JLR' # _int_git channel

View File

@@ -1,158 +1,86 @@
name: 'Reusable Workflow for Running End-to-End Tests'
name: 'E2E tests revival'
description: 'New E2E tests implementation started by Roman. Based on playwright in npm UI project, tests Kestra OSS develop docker image. These tests are written from zero, lets make them unflaky from the start!.'
on:
schedule:
- cron: "0 * * * *" # Every hour
workflow_call:
inputs:
tags:
description: "Tags used for filtering tests to include for QA."
type: string
required: true
docker-artifact-name:
description: "The GitHub artifact containing the Kestra docker image."
type: string
noInputYet:
description: 'not input yet.'
required: false
docker-image-tag:
description: "The Docker image Tag for Kestra"
default: 'kestra/kestra:develop'
type: string
required: true
backend:
description: "The Kestra backend type to be used for E2E tests."
type: string
required: true
default: "postgres"
secrets:
GITHUB_AUTH_TOKEN:
description: "The GitHub Token."
required: true
GOOGLE_SERVICE_ACCOUNT:
description: "The Google Service Account."
default: "no input"
workflow_dispatch:
inputs:
noInputYet:
description: 'not input yet.'
required: false
type: string
default: "no input"
jobs:
check:
timeout-minutes: 60
timeout-minutes: 15
runs-on: ubuntu-latest
env:
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
E2E_TEST_DOCKER_DIR: ./kestra/e2e-tests/docker
KESTRA_BASE_URL: http://127.27.27.27:8080/ui/
steps:
# Checkout kestra
- name: Login to DockerHub
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ github.token }}
- name: Checkout kestra
uses: actions/checkout@v4
uses: actions/checkout@v5
with:
path: kestra
# Setup build
- uses: kestra-io/actions/.github/actions/setup-build@main
- uses: kestra-io/actions/composite/setup-build@main
name: Setup - Build
id: build
with:
java-enabled: true
node-enabled: true
python-enabled: true
# Get Docker Image
- name: Download Kestra Image
if: inputs.docker-artifact-name != ''
uses: actions/download-artifact@v4
with:
name: ${{ inputs.docker-artifact-name }}
path: /tmp
- name: Load Kestra Image
if: inputs.docker-artifact-name != ''
- name: Install Npm dependencies
run: |
docker load --input /tmp/${{ inputs.docker-artifact-name }}.tar
# Docker Compose
- name: Login to DockerHub
uses: docker/login-action@v3
if: inputs.docker-artifact-name == ''
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ github.token }}
# Build configuration
- name: Create additional application configuration
run: |
touch ${{ env.E2E_TEST_DOCKER_DIR }}/data/application-secrets.yml
- name: Setup additional application configuration
if: env.APPLICATION_SECRETS != null
env:
APPLICATION_SECRETS: ${{ secrets.APPLICATION_SECRETS }}
run: |
echo $APPLICATION_SECRETS | base64 -d > ${{ env.E2E_TEST_DOCKER_DIR }}/data/application-secrets.yml
# Deploy Docker Compose Stack
- name: Run Kestra (${{ inputs.backend }})
env:
KESTRA_DOCKER_IMAGE: ${{ inputs.docker-image-tag }}
run: |
cd ${{ env.E2E_TEST_DOCKER_DIR }}
echo "KESTRA_DOCKER_IMAGE=$KESTRA_DOCKER_IMAGE" >> .env
docker compose -f docker-compose-${{ inputs.backend }}.yml up -d
- name: Install Playwright Deps
run: |
cd kestra
./gradlew playwright --args="install-deps"
# Run E2E Tests
- name: Wait For Kestra UI
run: |
# Start time
START_TIME=$(date +%s)
# Timeout duration in seconds (5 minutes)
TIMEOUT_DURATION=$((5 * 60))
while [ $(curl -s -L -o /dev/null -w %{http_code} $KESTRA_BASE_URL) != 200 ]; do
echo -e $(date) "\tKestra server HTTP state: " $(curl -k -L -s -o /dev/null -w %{http_code} $KESTRA_BASE_URL) " (waiting for 200)";
# Check the elapsed time
CURRENT_TIME=$(date +%s)
ELAPSED_TIME=$((CURRENT_TIME - START_TIME))
# Break the loop if the elapsed time exceeds the timeout duration
if [ $ELAPSED_TIME -ge $TIMEOUT_DURATION ]; then
echo "Timeout reached: Exiting after 5 minutes."
exit 1;
fi
sleep 2;
done;
echo "Kestra is running: $KESTRA_BASE_URL 🚀";
continue-on-error: true
- name: Run E2E Tests (${{ inputs.tags }})
if: inputs.tags != ''
run: |
cd kestra
./gradlew e2eTestsCheck -P tags=${{ inputs.tags }}
cd kestra/ui
npm i
npx playwright install --with-deps chromium
- name: Run E2E Tests
if: inputs.tags == ''
run: |
cd kestra
./gradlew e2eTestsCheck
sh build-and-start-e2e-tests.sh
# Allure check
- name: Auth to Google Cloud
id: auth
if: ${{ !cancelled() && env.GOOGLE_SERVICE_ACCOUNT != 0 }}
uses: 'google-github-actions/auth@v2'
- name: Upload Playwright Report as Github artifact
# 'With this report, you can analyze locally the results of the tests. see https://playwright.dev/docs/ci-intro#html-report'
uses: actions/upload-artifact@v4
if: ${{ !cancelled() }}
with:
credentials_json: '${{ secrets.GOOGLE_SERVICE_ACCOUNT }}'
- uses: rlespinasse/github-slug-action@v5
- name: Publish allure report
uses: andrcuns/allure-publish-action@v2.9.0
if: ${{ !cancelled() && env.GOOGLE_SERVICE_ACCOUNT != 0 }}
env:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_AUTH_TOKEN }}
JAVA_HOME: /usr/lib/jvm/default-jvm/
with:
storageType: gcs
resultsGlob: build/allure-results
bucket: internal-kestra-host
baseUrl: "https://internal.dev.kestra.io"
prefix: ${{ format('{0}/{1}/{2}', github.repository, env.GITHUB_HEAD_REF_SLUG != '' && env.GITHUB_HEAD_REF_SLUG || github.ref_name, 'allure/playwright') }}
copyLatest: true
ignoreMissingResults: true
name: playwright-report
path: kestra/ui/playwright-report/
retention-days: 7
# Allure check
# TODO I don't know what it should do
# - uses: rlespinasse/github-slug-action@v5
# name: Allure - Generate slug variables
#
# - name: Allure - Publish report
# uses: andrcuns/allure-publish-action@v2.9.0
# if: always() && env.GOOGLE_SERVICE_ACCOUNT != ''
# continue-on-error: true
# env:
# GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_AUTH_TOKEN }}
# JAVA_HOME: /usr/lib/jvm/default-jvm/
# with:
# storageType: gcs
# resultsGlob: "**/build/allure-results"
# bucket: internal-kestra-host
# baseUrl: "https://internal.dev.kestra.io"
# prefix: ${{ format('{0}/{1}', github.repository, 'allure/java') }}
# copyLatest: true
# ignoreMissingResults: true

View File

@@ -4,7 +4,7 @@ on:
workflow_dispatch:
inputs:
releaseVersion:
description: 'The release version (e.g., 0.21.0-rc1)'
description: 'The release version (e.g., 0.21.0)'
required: true
type: string
nextVersion:
@@ -21,25 +21,17 @@ jobs:
runs-on: ubuntu-latest
steps:
# Checkout
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v4
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: ./actions/.github/actions/setup-build
- uses: kestra-io/actions/composite/setup-build@main
id: build
with:
java-enabled: true
node-enabled: true
python-enabled: true
caches-enabled: true
# Get Plugins List
- name: Get Plugins List
@@ -60,7 +52,7 @@ jobs:
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
run: |
chmod +x ./dev-tools/release-plugins.sh;
./dev-tools/release-plugins.sh \
--release-version=${{github.event.inputs.releaseVersion}} \
--next-version=${{github.event.inputs.nextVersion}} \
@@ -73,10 +65,10 @@ jobs:
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
run: |
chmod +x ./dev-tools/release-plugins.sh;
./dev-tools/release-plugins.sh \
--release-version=${{github.event.inputs.releaseVersion}} \
--next-version=${{github.event.inputs.nextVersion}} \
--dry-run \
--yes \
${{ steps.plugins-list.outputs.repositories }}
${{ steps.plugins-list.outputs.repositories }}

View File

@@ -4,7 +4,7 @@ on:
workflow_dispatch:
inputs:
releaseVersion:
description: 'The release version (e.g., 0.21.0-rc1)'
description: 'The release version (e.g., 0.21.0)'
required: true
type: string
nextVersion:
@@ -23,8 +23,8 @@ jobs:
# Checks
- name: Check Inputs
run: |
if ! [[ "$RELEASE_VERSION" =~ ^[0-9]+(\.[0-9]+)\.0-rc[01](-SNAPSHOT)?$ ]]; then
echo "Invalid release version. Must match regex: ^[0-9]+(\.[0-9]+)\.0-rc[01](-SNAPSHOT)?$"
if ! [[ "$RELEASE_VERSION" =~ ^[0-9]+(\.[0-9]+)\.0$ ]]; then
echo "Invalid release version. Must match regex: ^[0-9]+(\.[0-9]+)\.0$"
exit 1
fi
@@ -33,20 +33,13 @@ jobs:
exit 1;
fi
# Checkout
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
fetch-depth: 0
path: kestra
# Checkout GitHub Actions
- uses: actions/checkout@v4
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: ./actions/.github/actions/setup-build
- uses: kestra-io/actions/composite/setup-build@main
id: build
with:
java-enabled: true
@@ -78,7 +71,6 @@ jobs:
git checkout develop;
if [[ "$RELEASE_VERSION" == *"-SNAPSHOT" ]]; then
# -SNAPSHOT qualifier maybe used to test release-candidates
./gradlew release -Prelease.useAutomaticVersion=true \
-Prelease.releaseVersion="${RELEASE_VERSION}" \
-Prelease.newVersion="${NEXT_VERSION}" \
@@ -89,4 +81,4 @@ jobs:
-Prelease.releaseVersion="${RELEASE_VERSION}" \
-Prelease.newVersion="${NEXT_VERSION}" \
-Prelease.pushReleaseVersionBranch="${PUSH_RELEASE_BRANCH}"
fi
fi

View File

@@ -3,10 +3,17 @@ name: Main Workflow
on:
workflow_dispatch:
inputs:
skip-test:
description: 'Skip test'
type: choice
required: true
default: 'false'
options:
- "true"
- "false"
plugin-version:
description: "Kestra version"
default: 'LATEST'
required: true
description: "plugins version"
required: false
type: string
push:
branches:
@@ -25,16 +32,17 @@ jobs:
tests:
name: Execute tests
uses: ./.github/workflows/workflow-test.yml
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
with:
report-status: false
release:
name: Release
needs: [tests]
if: "!startsWith(github.ref, 'refs/heads/releases')"
if: "!failure() && !cancelled() && !startsWith(github.ref, 'refs/heads/releases')"
uses: ./.github/workflows/workflow-release.yml
with:
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
plugin-version: ${{ inputs.plugin-version != '' && inputs.plugin-version || (github.ref == 'refs/heads/develop' && 'LATEST-SNAPSHOT' || 'LATEST') }}
secrets:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
@@ -43,32 +51,26 @@ jobs:
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
end:
runs-on: ubuntu-latest
needs:
- release
if: always()
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
steps:
# Update
- name: Github - Update internal
uses: benc-uk/workflow-dispatch@v1
if: github.ref == 'refs/heads/develop' && needs.docker.result == 'success'
- name: Trigger EE Workflow
uses: peter-evans/repository-dispatch@v3
if: github.ref == 'refs/heads/develop' && needs.release.result == 'success'
with:
workflow: oss-build.yml
repo: kestra-io/infra
ref: master
token: ${{ secrets.GH_PERSONAL_TOKEN }}
repository: kestra-io/kestra-ee
event-type: "oss-updated"
# Slack
- name: Slack - Notification
uses: Gamesight/slack-workflow-status@master
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
if: ${{ failure() && env.SLACK_WEBHOOK_URL != 0 && (github.ref == 'refs/heads/master' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/develop') }}
uses: kestra-io/actions/composite/slack-status@main
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
name: GitHub Actions
icon_emoji: ":github-actions:"
channel: "C02DQ1A7JLR" # _int_git channel
webhook-url: ${{ secrets.SLACK_WEBHOOK_URL }}

View File

@@ -4,6 +4,7 @@ on:
pull_request:
branches:
- develop
- releases/*
concurrency:
group: ${{ github.workflow }}-${{ github.ref_name }}-pr
@@ -56,19 +57,7 @@ jobs:
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
end:
name: End
runs-on: ubuntu-latest
if: always()
needs: [frontend, backend]
steps:
# Slack
- name: Slack notification
uses: Gamesight/slack-workflow-status@master
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
name: GitHub Actions
icon_emoji: ":github-actions:"
channel: "C02DQ1A7JLR"
e2e-tests:
name: E2E - Tests
uses: ./.github/workflows/e2e.yml

View File

@@ -17,7 +17,7 @@ jobs:
runs-on: ubuntu-latest
steps:
# Checkout
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
fetch-depth: 0

View File

@@ -22,11 +22,11 @@ jobs:
echo "Invalid release version. Must match regex: ^[0-9]+(\.[0-9]+)(\.[0-9]+)-(rc[0-9])?(-SNAPSHOT)?$"
exit 1
fi
# Extract the major and minor versions
BASE_VERSION=$(echo "$RELEASE_VERSION" | sed -E 's/^([0-9]+\.[0-9]+)\..*/\1/')
RELEASE_BRANCH="refs/heads/releases/v${BASE_VERSION}.x"
CURRENT_BRANCH="$GITHUB_REF"
if ! [[ "$CURRENT_BRANCH" == "$RELEASE_BRANCH" ]]; then
echo "Invalid release branch. Expected $RELEASE_BRANCH, was $CURRENT_BRANCH"
@@ -34,11 +34,14 @@ jobs:
fi
# Checkout
- uses: actions/checkout@v4
- name: Checkout
uses: actions/checkout@v5
with:
fetch-depth: 0
token: ${{ secrets.GH_PERSONAL_TOKEN }}
- name: Configure Git
# Configure
- name: Git - Configure
run: |
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
git config --global user.name "github-actions[bot]"
@@ -54,4 +57,4 @@ jobs:
git commit -m"chore(version): update to version '$RELEASE_VERSION'"
git push
git tag -a "v$RELEASE_VERSION" -m"v$RELEASE_VERSION"
git push origin "v$RELEASE_VERSION"
git push --tags

View File

@@ -17,17 +17,10 @@ jobs:
runs-on: ubuntu-latest
steps:
# Checkout
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v4
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: ./actions/.github/actions/setup-build
id: build
@@ -66,19 +59,12 @@ jobs:
actions: read
steps:
# Checkout
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v4
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: ./actions/.github/actions/setup-build
- uses: kestra-io/actions/composite/setup-build@main
id: build
with:
java-enabled: false
@@ -87,7 +73,7 @@ jobs:
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
- name: Docker Vulnerabilities Check
uses: aquasecurity/trivy-action@0.30.0
uses: aquasecurity/trivy-action@0.33.1
with:
image-ref: kestra/kestra:develop
format: 'template'
@@ -111,28 +97,20 @@ jobs:
actions: read
steps:
# Checkout
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v4
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: ./actions/.github/actions/setup-build
- uses: kestra-io/actions/composite/setup-build@main
id: build
with:
java-enabled: false
node-enabled: false
caches-enabled: true
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
- name: Docker Vulnerabilities Check
uses: aquasecurity/trivy-action@0.30.0
uses: aquasecurity/trivy-action@0.33.1
with:
image-ref: kestra/kestra:latest
format: table

View File

@@ -20,6 +20,7 @@ permissions:
contents: write
checks: write
actions: read
pull-requests: write
jobs:
test:
@@ -29,13 +30,13 @@ jobs:
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
name: Checkout - Current ref
with:
fetch-depth: 0
# Setup build
- uses: kestra-io/actions/.github/actions/setup-build@main
- uses: kestra-io/actions/composite/setup-build@main
name: Setup - Build
id: build
with:
@@ -59,84 +60,15 @@ jobs:
export GOOGLE_APPLICATION_CREDENTIALS=$HOME/.gcp-service-account.json
./gradlew check javadoc --parallel
# report test
- name: Test - Publish Test Results
uses: dorny/test-reporter@v2
if: always()
with:
name: Java Tests Report
reporter: java-junit
path: '**/build/test-results/test/TEST-*.xml'
list-suites: 'failed'
list-tests: 'failed'
fail-on-error: 'false'
token: ${{ secrets.GITHUB_AUTH_TOKEN }}
# Sonar
- name: Test - Analyze with Sonar
if: env.SONAR_TOKEN != ''
- name: comment PR with test report
if: ${{ !cancelled() && github.event_name == 'pull_request' }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_AUTH_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
shell: bash
run: ./gradlew sonar --info
run: npx --yes @kestra-io/kestra-devtools generateTestReportSummary --only-errors --ci $(pwd)
# GCP
- name: GCP - Auth with unit test account
id: auth
if: always() && env.GOOGLE_SERVICE_ACCOUNT != ''
continue-on-error: true
uses: "google-github-actions/auth@v2"
with:
credentials_json: "${{ secrets.GOOGLE_SERVICE_ACCOUNT }}"
- name: GCP - Setup Cloud SDK
if: env.GOOGLE_SERVICE_ACCOUNT != ''
uses: "google-github-actions/setup-gcloud@v2"
# Allure check
- uses: rlespinasse/github-slug-action@v5
name: Allure - Generate slug variables
- name: Allure - Publish report
uses: andrcuns/allure-publish-action@v2.9.0
if: always() && env.GOOGLE_SERVICE_ACCOUNT != ''
continue-on-error: true
env:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_AUTH_TOKEN }}
JAVA_HOME: /usr/lib/jvm/default-jvm/
with:
storageType: gcs
resultsGlob: "**/build/allure-results"
bucket: internal-kestra-host
baseUrl: "https://internal.dev.kestra.io"
prefix: ${{ format('{0}/{1}', github.repository, 'allure/java') }}
copyLatest: true
ignoreMissingResults: true
# Jacoco
- name: Jacoco - Copy reports
if: env.GOOGLE_SERVICE_ACCOUNT != ''
continue-on-error: true
shell: bash
run: |
mv build/reports/jacoco/testCodeCoverageReport build/reports/jacoco/test/
mv build/reports/jacoco/test/testCodeCoverageReport.xml build/reports/jacoco/test/jacocoTestReport.xml
gsutil -m rsync -d -r build/reports/jacoco/test/ gs://internal-kestra-host/${{ format('{0}/{1}', github.repository, 'jacoco') }}
# Codecov
- name: Codecov - Upload coverage reports
uses: codecov/codecov-action@v5
# Report Java
- name: Report - Java
uses: kestra-io/actions/composite/report-java@main
if: ${{ !cancelled() }}
continue-on-error: true
with:
token: ${{ secrets.CODECOV_TOKEN }}
flags: backend
- name: Codecov - Upload test results
uses: codecov/test-results-action@v1
if: ${{ !cancelled() }}
continue-on-error: true
with:
token: ${{ secrets.CODECOV_TOKEN }}
flags: backend
secrets: ${{ toJSON(secrets) }}

View File

@@ -1,23 +1,7 @@
name: Build Artifacts
on:
workflow_call:
inputs:
plugin-version:
description: "Kestra version"
default: 'LATEST'
required: true
type: string
outputs:
docker-tag:
value: ${{ jobs.build.outputs.docker-tag }}
description: "The Docker image Tag for Kestra"
docker-artifact-name:
value: ${{ jobs.build.outputs.docker-artifact-name }}
description: "The GitHub artifact containing the Kestra docker image name."
plugins:
value: ${{ jobs.build.outputs.plugins }}
description: "The Kestra plugins list used for the build."
workflow_call: {}
jobs:
build:
@@ -31,7 +15,7 @@ jobs:
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
steps:
- name: Checkout - Current ref
uses: actions/checkout@v4
uses: actions/checkout@v5
with:
fetch-depth: 0
@@ -42,7 +26,7 @@ jobs:
run: npm ci
# Setup build
- uses: kestra-io/actions/.github/actions/setup-build@main
- uses: kestra-io/actions/composite/setup-build@main
name: Setup - Build
id: build
with:
@@ -68,7 +52,7 @@ jobs:
if [[ $TAG = "master" || $TAG == v* ]]; then
echo "plugins=$PLUGINS" >> $GITHUB_OUTPUT
else
echo "plugins=--repositories=https://s01.oss.sonatype.org/content/repositories/snapshots $PLUGINS" >> $GITHUB_OUTPUT
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ $PLUGINS" >> $GITHUB_OUTPUT
fi
# Build
@@ -82,55 +66,6 @@ jobs:
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
# Docker Tag
- name: Setup - Docker vars
id: vars
shell: bash
run: |
TAG=${GITHUB_REF#refs/*/}
if [[ $TAG = "master" ]]
then
TAG="latest";
elif [[ $TAG = "develop" ]]
then
TAG="develop";
elif [[ $TAG = v* ]]
then
TAG="${TAG}";
else
TAG="build-${{ github.run_id }}";
fi
echo "tag=${TAG}" >> $GITHUB_OUTPUT
echo "artifact=docker-kestra-${TAG}" >> $GITHUB_OUTPUT
# Docker setup
- name: Docker - Setup QEMU
uses: docker/setup-qemu-action@v3
- name: Docker - Fix Qemu
shell: bash
run: |
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
- name: Docker - Setup Buildx
uses: docker/setup-buildx-action@v3
# Docker Build
- name: Docker - Build & export image
uses: docker/build-push-action@v6
if: "!startsWith(github.ref, 'refs/tags/v')"
with:
context: .
push: false
file: Dockerfile
tags: |
kestra/kestra:${{ steps.vars.outputs.tag }}
build-args: |
KESTRA_PLUGINS=${{ steps.plugins.outputs.plugins }}
APT_PACKAGES=${{ env.DOCKER_APT_PACKAGES }}
PYTHON_LIBRARIES=${{ env.DOCKER_PYTHON_LIBRARIES }}
outputs: type=docker,dest=/tmp/${{ steps.vars.outputs.artifact }}.tar
# Upload artifacts
- name: Artifacts - Upload JAR
uses: actions/upload-artifact@v4
@@ -143,10 +78,3 @@ jobs:
with:
name: exe
path: build/executable/
- name: Artifacts - Upload Docker
uses: actions/upload-artifact@v4
if: "!startsWith(github.ref, 'refs/tags/v')"
with:
name: ${{ steps.vars.outputs.artifact }}
path: /tmp/${{ steps.vars.outputs.artifact }}.tar

View File

@@ -20,7 +20,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v5
- name: Cache Node Modules
id: cache-node-modules
@@ -39,7 +39,6 @@ jobs:
key: playwright-${{ hashFiles('ui/package-lock.json') }}
- name: Npm - install
shell: bash
if: steps.cache-node-modules.outputs.cache-hit != 'true'
working-directory: ui
run: npm ci
@@ -52,35 +51,20 @@ jobs:
workdir: ui
- name: Npm - Run build
shell: bash
working-directory: ui
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
run: npm run build
- name: Run front-end unit tests
working-directory: ui
run: npm run test:unit -- --coverage
- name: Storybook - Install Playwright
shell: bash
working-directory: ui
if: steps.cache-playwright.outputs.cache-hit != 'true'
run: npx playwright install --with-deps
- name: Run front-end unit tests
shell: bash
- name: Run storybook component tests
working-directory: ui
run: npm run test:cicd
- name: Codecov - Upload coverage reports
uses: codecov/codecov-action@v5
if: ${{ !cancelled() && github.event.pull_request.head.repo.full_name == github.repository }}
continue-on-error: true
with:
token: ${{ secrets.CODECOV_TOKEN }}
flags: frontend
- name: Codecov - Upload test results
uses: codecov/test-results-action@v1
if: ${{ !cancelled() }}
continue-on-error: true
with:
token: ${{ secrets.CODECOV_TOKEN && github.event.pull_request.head.repo.full_name == github.repository }}
flags: frontend
run: npm run test:storybook -- --coverage

View File

@@ -1,51 +1,79 @@
name: Github - Release
on:
workflow_dispatch:
workflow_call:
secrets:
GH_PERSONAL_TOKEN:
description: "The Github personal token."
required: true
push:
tags:
- '*'
SLACK_RELEASES_WEBHOOK_URL:
description: "The Slack webhook URL."
required: true
jobs:
publish:
name: Github - Release
runs-on: ubuntu-latest
steps:
# Check out
- name: Checkout - Repository
uses: actions/checkout@v5
with:
fetch-depth: 0
submodules: true
# Download Exec
# Must be done after checkout actions
- name: Artifacts - Download executable
uses: actions/download-artifact@v4
uses: actions/download-artifact@v5
if: startsWith(github.ref, 'refs/tags/v')
with:
name: exe
path: build/executable
# Check out
- name: Checkout - Repository
uses: actions/checkout@v4
with:
fetch-depth: 0
submodules: true
# Checkout GitHub Actions
- name: Checkout - Actions
uses: actions/checkout@v4
with:
repository: kestra-io/actions
sparse-checkout-cone-mode: true
ref: fix/core-release
path: actions
sparse-checkout: |
.github/actions
- name: Check if current tag is latest
id: is_latest
run: |
latest_tag=$(git tag | grep -E '^v[0-9]+\.[0-9]+\.[0-9]+$' | sed 's/^v//' | sort -V | tail -n1)
current_tag="${GITHUB_REF_NAME#v}"
if [ "$current_tag" = "$latest_tag" ]; then
echo "latest=true" >> $GITHUB_OUTPUT
else
echo "latest=false" >> $GITHUB_OUTPUT
fi
env:
GITHUB_REF_NAME: ${{ github.ref_name }}
# GitHub Release
- name: Create GitHub release
uses: ./actions/.github/actions/github-release
uses: kestra-io/actions/composite/github-release@main
if: ${{ startsWith(github.ref, 'refs/tags/v') }}
env:
MAKE_LATEST: ${{ steps.is_latest.outputs.latest }}
GITHUB_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
# Trigger gha workflow to bump helm chart version
- name: GitHub - Trigger the Helm chart version bump
uses: peter-evans/repository-dispatch@v3
with:
token: ${{ secrets.GH_PERSONAL_TOKEN }}
repository: kestra-io/helm-charts
event-type: update-helm-chart-version
client-payload: |-
{
"new_version": "${{ github.ref_name }}",
"github_repository": "${{ github.repository }}",
"github_actor": "${{ github.actor }}"
}
- name: Merge Release Notes
if: ${{ startsWith(github.ref, 'refs/tags/v') }}
uses: kestra-io/actions/composite/github-release-note-merge@main
env:
GITHUB_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
RELEASE_TAG: ${{ github.ref_name }}

View File

@@ -1,22 +1,45 @@
name: Publish - Docker
name: Create Docker images on Release
on:
workflow_dispatch:
inputs:
plugin-version:
description: "Kestra version"
default: 'LATEST'
retag-latest:
description: 'Retag latest Docker images'
required: true
type: choice
default: "false"
options:
- "true"
- "false"
retag-lts:
description: 'Retag LTS Docker images'
required: true
type: choice
default: "false"
options:
- "true"
- "false"
release-tag:
description: 'Kestra Release Tag (by default, deduced with the ref)'
required: false
type: string
plugin-version:
description: 'Plugin version'
required: false
type: string
default: "LATEST"
force-download-artifact:
description: 'Force download artifact'
required: false
type: string
type: choice
default: "true"
options:
- "true"
- "false"
workflow_call:
inputs:
plugin-version:
description: "Kestra version"
description: "Plugin version"
default: 'LATEST'
required: false
type: string
@@ -33,47 +56,93 @@ on:
description: "The Dockerhub password."
required: true
env:
PLUGIN_VERSION: ${{ inputs.plugin-version != null && inputs.plugin-version || 'LATEST' }}
jobs:
plugins:
name: List Plugins
runs-on: ubuntu-latest
outputs:
plugins: ${{ steps.plugins.outputs.plugins }}
steps:
# Checkout
- uses: actions/checkout@v5
# Get Plugins List
- name: Get Plugins List
uses: ./.github/actions/plugins-list
id: plugins
with: # remap LATEST-SNAPSHOT to LATEST
plugin-version: ${{ env.PLUGIN_VERSION == 'LATEST-SNAPSHOT' && 'LATEST' || env.PLUGIN_VERSION }}
# ********************************************************************************************************************
# Build
# ********************************************************************************************************************
build-artifacts:
name: Build Artifacts
if: ${{ github.event.inputs.force-download-artifact == 'true' }}
if: ${{ inputs.force-download-artifact == 'true' }}
uses: ./.github/workflows/workflow-build-artifacts.yml
with:
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
# ********************************************************************************************************************
# Docker
# ********************************************************************************************************************
publish:
name: Publish - Docker
docker:
name: Publish Docker
needs: [ plugins, build-artifacts ]
if: always()
runs-on: ubuntu-latest
needs: build-artifacts
if: |
always() &&
(needs.build-artifacts.result == 'success' ||
github.event.inputs.force-download-artifact != 'true')
env:
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
strategy:
matrix:
image:
- tag: -no-plugins
- name: "-no-plugins"
plugins: ""
packages: jattach
plugins: false
python-libraries: ""
- tag: ""
plugins: true
packages: python3 python3-venv python-is-python3 python3-pip nodejs npm curl zip unzip jattach
python-libraries: kestra
python-libs: ""
- name: ""
plugins: ${{needs.plugins.outputs.plugins}}
packages: python3 python-is-python3 python3-pip curl jattach
python-libs: kestra
steps:
- name: Checkout - Current ref
uses: actions/checkout@v4
- uses: actions/checkout@v5
# Vars
- name: Set image name
id: vars
run: |
if [[ "${{ inputs.release-tag }}" == "" ]]; then
TAG=${GITHUB_REF#refs/*/}
echo "tag=${TAG}" >> $GITHUB_OUTPUT
else
TAG="${{ inputs.release-tag }}"
echo "tag=${TAG}" >> $GITHUB_OUTPUT
fi
if [[ $GITHUB_REF == refs/tags/* ]]; then
if [[ $TAG =~ ^v[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
# this will remove the patch version number
MINOR_SEMVER=${TAG%.*}
echo "minor_semver=${MINOR_SEMVER}" >> $GITHUB_OUTPUT
else
echo "Tag '$TAG' is not a valid semver (vMAJOR.MINOR.PATCH), skipping minor_semver"
fi
fi
if [[ "${{ env.PLUGIN_VERSION }}" == *"-SNAPSHOT" ]]; then
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ ${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT;
else
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
fi
# Download executable from artifact
- name: Artifacts - Download executable
uses: actions/download-artifact@v5
with:
name: exe
path: build/executable
- name: Copy exe to image
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
# Docker setup
- name: Docker - Setup QEMU
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Docker - Fix Qemu
@@ -81,66 +150,59 @@ jobs:
run: |
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
- name: Docker - Setup Docker Buildx
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
# Docker Login
- name: Docker - Login to DockerHub
- name: Login to DockerHub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_PASSWORD }}
# # Get Plugins List
- name: Plugins - Get List
uses: ./.github/actions/plugins-list
id: plugins-list
if: ${{ matrix.image.plugins}}
with:
plugin-version: ${{ env.PLUGIN_VERSION }}
# Vars
- name: Docker - Set variables
shell: bash
id: vars
run: |
TAG=${GITHUB_REF#refs/*/}
PLUGINS="${{ matrix.image.plugins == true && steps.plugins-list.outputs.plugins || '' }}"
if [[ $TAG == v* ]]; then
TAG="${TAG}";
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
elif [[ $TAG = "develop" ]]; then
TAG="develop";
echo "plugins=--repositories=https://s01.oss.sonatype.org/content/repositories/snapshots $PLUGINS" >> $GITHUB_OUTPUT
else
TAG="build-${{ github.run_id }}";
echo "plugins=--repositories=https://s01.oss.sonatype.org/content/repositories/snapshots $PLUGINS" >> $GITHUB_OUTPUT
fi
echo "tag=${TAG}${{ matrix.image.tag }}" >> $GITHUB_OUTPUT
# Build Docker Image
- name: Artifacts - Download executable
uses: actions/download-artifact@v4
with:
name: exe
path: build/executable
- name: Docker - Copy exe to image
shell: bash
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
# Docker Build and push
- name: Docker - Build image
- name: Push to Docker Hub
uses: docker/build-push-action@v6
with:
context: .
push: true
tags: kestra/kestra:${{ steps.vars.outputs.tag }}
tags: ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }}
platforms: linux/amd64,linux/arm64
build-args: |
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }}
APT_PACKAGES=${{ matrix.image.packages }}
PYTHON_LIBRARIES=${{ matrix.image.python-libraries }}
PYTHON_LIBRARIES=${{ matrix.image.python-libs }}
- name: Install regctl
if: startsWith(github.ref, 'refs/tags/v')
uses: regclient/actions/regctl-installer@main
- name: Retag to minor semver version
if: startsWith(github.ref, 'refs/tags/v') && steps.vars.outputs.minor_semver != ''
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.minor_semver, matrix.image.name) }}
- name: Retag to latest
if: startsWith(github.ref, 'refs/tags/v') && inputs.retag-latest == 'true'
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
- name: Retag to LTS
if: startsWith(github.ref, 'refs/tags/v') && inputs.retag-lts == 'true'
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest-lts{0}', matrix.image.name) }}
end:
runs-on: ubuntu-latest
needs:
- docker
if: always()
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
steps:
- name: Slack notification
if: ${{ failure() && env.SLACK_WEBHOOK_URL != 0 }}
uses: kestra-io/actions/composite/slack-status@main
with:
webhook-url: ${{ secrets.SLACK_WEBHOOK_URL }}

View File

@@ -25,11 +25,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout - Current ref
uses: actions/checkout@v4
uses: actions/checkout@v5
# Setup build
- name: Setup - Build
uses: kestra-io/actions/.github/actions/setup-build@main
uses: kestra-io/actions/composite/setup-build@main
id: build
with:
java-enabled: true
@@ -39,8 +39,8 @@ jobs:
- name: Publish - Release package to Maven Central
shell: bash
env:
ORG_GRADLE_PROJECT_sonatypeUsername: ${{ secrets.SONATYPE_USER }}
ORG_GRADLE_PROJECT_sonatypePassword: ${{ secrets.SONATYPE_PASSWORD }}
ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.SONATYPE_USER }}
ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }}
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE}}
@@ -50,7 +50,7 @@ jobs:
echo "signing.password=${SONATYPE_GPG_PASSWORD}" >> ~/.gradle/gradle.properties
echo "signing.secretKeyRingFile=${HOME}/.gradle/secring.gpg" >> ~/.gradle/gradle.properties
echo ${SONATYPE_GPG_FILE} | base64 -d > ~/.gradle/secring.gpg
./gradlew publishToSonatype ${{ startsWith(github.ref, 'refs/tags/v') && 'closeAndReleaseSonatypeStagingRepository' || '' }}
./gradlew publishToMavenCentral
# Gradle dependency
- name: Java - Gradle dependency graph

View File

@@ -0,0 +1,16 @@
name: Pull Request - Delete Docker
on:
pull_request:
types: [closed]
jobs:
publish:
name: Pull Request - Delete Docker
if: github.repository == 'kestra-io/kestra' # prevent running on forks
runs-on: ubuntu-latest
steps:
- uses: dataaxiom/ghcr-cleanup-action@v1
with:
package: kestra-pr
delete-tags: ${{ github.event.pull_request.number }}

View File

@@ -0,0 +1,78 @@
name: Pull Request - Publish Docker
on:
pull_request:
branches:
- develop
jobs:
build-artifacts:
name: Build Artifacts
if: github.repository == 'kestra-io/kestra' # prevent running on forks
uses: ./.github/workflows/workflow-build-artifacts.yml
publish:
name: Publish Docker
if: github.repository == 'kestra-io/kestra' # prevent running on forks
runs-on: ubuntu-latest
needs: build-artifacts
env:
GITHUB_IMAGE_PATH: "ghcr.io/kestra-io/kestra-pr"
steps:
- name: Checkout - Current ref
uses: actions/checkout@v5
with:
fetch-depth: 0
# Docker setup
- name: Docker - Setup QEMU
uses: docker/setup-qemu-action@v3
- name: Docker - Setup Docker Buildx
uses: docker/setup-buildx-action@v3
# Docker Login
- name: Login to GHCR
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
# Build Docker Image
- name: Artifacts - Download executable
uses: actions/download-artifact@v5
with:
name: exe
path: build/executable
- name: Docker - Copy exe to image
shell: bash
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
- name: Docker - Build image
uses: docker/build-push-action@v6
with:
context: .
file: ./Dockerfile.pr
push: true
tags: ${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }}
platforms: linux/amd64,linux/arm64
# Add comment on pull request
- name: Add comment to PR
uses: actions/github-script@v8
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |
await github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: `**🐋 Docker image**: \`${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }}\`\n` +
`\n` +
`\`\`\`bash\n` +
`docker run --pull=always --rm -it -p 8080:8080 --user=root -v /var/run/docker.sock:/var/run/docker.sock -v /tmp:/tmp ${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }} server local\n` +
`\`\`\``
})

View File

@@ -4,7 +4,7 @@ on:
workflow_dispatch:
inputs:
plugin-version:
description: "Kestra version"
description: "plugins version"
default: 'LATEST'
required: false
type: string
@@ -16,7 +16,7 @@ on:
workflow_call:
inputs:
plugin-version:
description: "Kestra version"
description: "plugins version"
default: 'LATEST'
required: false
type: string
@@ -42,21 +42,25 @@ on:
SONATYPE_GPG_FILE:
description: "The Sonatype GPG file."
required: true
GH_PERSONAL_TOKEN:
description: "GH personnal Token."
required: true
SLACK_RELEASES_WEBHOOK_URL:
description: "Slack webhook for releases channel."
required: true
jobs:
build-artifacts:
name: Build - Artifacts
uses: ./.github/workflows/workflow-build-artifacts.yml
with:
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
Docker:
name: Publish Docker
needs: build-artifacts
uses: ./.github/workflows/workflow-publish-docker.yml
if: startsWith(github.ref, 'refs/heads/develop') || github.event.inputs.publish-docker == 'true'
if: github.ref == 'refs/heads/develop' || inputs.publish-docker == 'true'
with:
force-download-artifact: 'false'
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
plugin-version: ${{ inputs.plugin-version != null && inputs.plugin-version || 'LATEST' }}
secrets:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
@@ -77,4 +81,5 @@ jobs:
if: startsWith(github.ref, 'refs/tags/v')
uses: ./.github/workflows/workflow-github-release.yml
secrets:
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}

View File

@@ -27,7 +27,7 @@ jobs:
ui: ${{ steps.changes.outputs.ui }}
backend: ${{ steps.changes.outputs.backend }}
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
if: "!startsWith(github.ref, 'refs/tags/v')"
- uses: dorny/paths-filter@v3
if: "!startsWith(github.ref, 'refs/tags/v')"
@@ -84,14 +84,12 @@ jobs:
name: Notify - Slack
runs-on: ubuntu-latest
needs: [ frontend, backend ]
if: github.event_name == 'schedule'
steps:
- name: Notify failed CI
id: send-ci-failed
if: |
always() && (needs.frontend.result != 'success' ||
needs.backend.result != 'success')
uses: kestra-io/actions/.github/actions/send-ci-failed@main
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
always() &&
(needs.frontend.result != 'success' || needs.backend.result != 'success') &&
(github.ref == 'refs/heads/master' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/develop')
uses: kestra-io/actions/composite/slack-status@main
with:
webhook-url: ${{ secrets.SLACK_WEBHOOK_URL }}

View File

@@ -61,6 +61,7 @@
#plugin-jenkins:io.kestra.plugin:plugin-jenkins:LATEST
#plugin-jira:io.kestra.plugin:plugin-jira:LATEST
#plugin-kafka:io.kestra.plugin:plugin-kafka:LATEST
#plugin-kestra:io.kestra.plugin:plugin-kestra:LATEST
#plugin-kubernetes:io.kestra.plugin:plugin-kubernetes:LATEST
#plugin-langchain4j:io.kestra.plugin:plugin-langchain4j:LATEST
#plugin-ldap:io.kestra.plugin:plugin-ldap:LATEST

View File

@@ -31,12 +31,10 @@ plugins {
id 'com.github.node-gradle.node' version '7.1.0'
// release
id "io.github.gradle-nexus.publish-plugin" version "2.0.0"
id 'net.researchgate.release' version '3.1.0'
id "com.gorylenko.gradle-git-properties" version "2.5.0"
id 'signing'
id 'ru.vyarus.pom' version '3.0.0' apply false
id 'ru.vyarus.github-info' version '2.0.0' apply false
id "com.vanniktech.maven.publish" version "0.33.0"
// OWASP dependency check
id "org.owasp.dependencycheck" version "12.1.1" apply false
@@ -414,6 +412,7 @@ distTar.dependsOn shadowJar
startScripts.dependsOn shadowJar
startShadowScripts.dependsOn jar
shadowJar.dependsOn 'ui:assembleFrontend'
shadowJar.dependsOn jar
/**********************************************************************************************************************\
* Executable Jar
@@ -484,24 +483,11 @@ tasks.register('runStandalone', JavaExec) {
/**********************************************************************************************************************\
* Publish
**********************************************************************************************************************/
nexusPublishing {
repositoryDescription = "${project.group}:${rootProject.name}:${project.version}"
useStaging = !project.version.endsWith("-SNAPSHOT")
repositories {
sonatype {
nexusUrl.set(uri("https://s01.oss.sonatype.org/service/local/"))
snapshotRepositoryUrl.set(uri("https://s01.oss.sonatype.org/content/repositories/snapshots/"))
}
}
}
subprojects {subProject ->
subprojects {
if (it.name != 'jmh-benchmarks') {
apply plugin: "maven-publish"
if (subProject.name != 'jmh-benchmarks' && subProject.name != rootProject.name) {
apply plugin: 'signing'
apply plugin: 'ru.vyarus.pom'
apply plugin: 'ru.vyarus.github-info'
apply plugin: "com.vanniktech.maven.publish"
javadoc {
options {
@@ -535,56 +521,120 @@ subprojects {
}
}
github {
user 'kestra-io'
license 'Apache'
repository 'kestra'
site 'https://kestra.io'
//These modules should not be published
def unpublishedModules = ["jdbc-mysql", "jdbc-postgres", "webserver"]
if (subProject.name in unpublishedModules){
return
}
maven.pom {
description = 'The modern, scalable orchestrator & scheduler open source platform'
mavenPublishing {
publishToMavenCentral(true)
signAllPublications()
developers {
developer {
id = "tchiotludo"
name = "Ludovic Dehon"
coordinates(
"${rootProject.group}",
subProject.name == "cli" ? rootProject.name : subProject.name,
"${rootProject.version}"
)
pom {
name = project.name
description = "${project.group}:${project.name}:${rootProject.version}"
url = "https://github.com/kestra-io/${rootProject.name}"
licenses {
license {
name = "The Apache License, Version 2.0"
url = "http://www.apache.org/licenses/LICENSE-2.0.txt"
}
}
developers {
developer {
id = "tchiotludo"
name = "Ludovic Dehon"
email = "ldehon@kestra.io"
}
}
scm {
connection = 'scm:git:'
url = "https://github.com/kestra-io/${rootProject.name}"
}
}
}
publishing {
publications {
sonatypePublication(MavenPublication) {
version project.version
afterEvaluate {
publishing {
publications {
withType(MavenPublication).configureEach { publication ->
if (project.name.contains('cli')) {
groupId "io.kestra"
artifactId "kestra"
artifact shadowJar
artifact executableJar
} else if (project.name.contains('platform')){
groupId project.group
artifactId project.name
} else {
from components.java
groupId project.group
artifactId project.name
artifact sourcesJar
artifact javadocJar
artifact testsJar
if (subProject.name == "platform") {
// Clear all artifacts except the BOM
publication.artifacts.clear()
}
}
}
}
}
signing {
// only sign JARs that we publish to Sonatype
required { gradle.taskGraph.hasTask("publishSonatypePublicationPublicationToSonatypeRepository") }
sign publishing.publications.sonatypePublication
if (subProject.name == 'cli') {
/* Make sure the special publication is wired *after* every plugin */
subProject.afterEvaluate {
/* 1. Remove the default java component so Gradle stops expecting
the standard cli-*.jar, sources, javadoc, etc. */
components.removeAll { it.name == "java" }
/* 2. Replace the publications artifacts with shadow + exec */
publishing.publications.withType(MavenPublication).configureEach { pub ->
pub.artifacts.clear()
// main shadow JAR built at root
pub.artifact(rootProject.tasks.named("shadowJar").get()) {
extension = "jar"
}
// executable ZIP built at root
pub.artifact(rootProject.tasks.named("executableJar").get().archiveFile) {
classifier = "exec"
extension = "zip"
}
pub.artifact(tasks.named("sourcesJar").get())
pub.artifact(tasks.named("javadocJar").get())
}
/* 3. Disable Gradle-module metadata for this publication to
avoid the “artifact removed from java component” error. */
tasks.withType(GenerateModuleMetadata).configureEach { it.enabled = false }
/* 4. Make every publish task in :cli wait for the two artifacts */
tasks.matching { it.name.startsWith("publish") }.configureEach {
dependsOn rootProject.tasks.named("shadowJar")
dependsOn rootProject.tasks.named("executableJar")
}
}
}
if (subProject.name != 'platform' && subProject.name != 'cli') {
// only if a test source set actually exists (avoids empty artifacts)
def hasTests = subProject.extensions.findByName('sourceSets')?.findByName('test') != null
if (hasTests) {
// wire the artifact onto every Maven publication of this subproject
publishing {
publications {
withType(MavenPublication).configureEach { pub ->
// keep the normal java component + sources/javadoc already configured
pub.artifact(subProject.tasks.named('testsJar').get())
}
}
}
// make sure publish tasks build the tests jar first
tasks.matching { it.name.startsWith('publish') }.configureEach {
dependsOn subProject.tasks.named('testsJar')
}
}
}
tasks.withType(GenerateModuleMetadata).configureEach {
@@ -595,6 +645,7 @@ subprojects {
}
/**********************************************************************************************************************\
* Version
**********************************************************************************************************************/

View File

@@ -37,4 +37,4 @@ dependencies {
//test
testImplementation "org.wiremock:wiremock-jetty12"
}
}

View File

@@ -1,7 +1,5 @@
package io.kestra.cli;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.http.HttpHeaders;
import io.micronaut.http.HttpRequest;
@@ -16,16 +14,15 @@ import io.micronaut.http.netty.body.NettyJsonHandler;
import io.micronaut.json.JsonMapper;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import lombok.Builder;
import lombok.Value;
import lombok.extern.jackson.Jacksonized;
import picocli.CommandLine;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import lombok.Builder;
import lombok.Value;
import lombok.extern.jackson.Jacksonized;
import picocli.CommandLine;
public abstract class AbstractApiCommand extends AbstractCommand {
@CommandLine.Option(names = {"--server"}, description = "Kestra server url", defaultValue = "http://localhost:8080")
@@ -37,7 +34,7 @@ public abstract class AbstractApiCommand extends AbstractCommand {
@CommandLine.Option(names = {"--user"}, paramLabel = "<user:password>", description = "Server user and password")
protected String user;
@CommandLine.Option(names = {"--tenant"}, description = "Tenant identifier (EE only, when multi-tenancy is enabled)")
@CommandLine.Option(names = {"--tenant"}, description = "Tenant identifier (EE only)")
protected String tenantId;
@CommandLine.Option(names = {"--api-token"}, description = "API Token (EE only).")
@@ -87,12 +84,12 @@ public abstract class AbstractApiCommand extends AbstractCommand {
return request;
}
protected String apiUri(String path) {
protected String apiUri(String path, String tenantId) {
if (path == null || !path.startsWith("/")) {
throw new IllegalArgumentException("'path' must be non-null and start with '/'");
}
return tenantId == null ? "/api/v1/" + MAIN_TENANT + path : "/api/v1/" + tenantId + path;
return "/api/v1/" + tenantId + path;
}
@Builder

View File

@@ -1,5 +1,6 @@
package io.kestra.cli;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.models.validations.ValidateConstraintViolation;
import io.kestra.core.serializers.YamlParser;
@@ -9,6 +10,7 @@ import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import picocli.CommandLine;
import java.io.IOException;
@@ -31,6 +33,9 @@ public abstract class AbstractValidateCommand extends AbstractApiCommand {
@CommandLine.Parameters(index = "0", description = "the directory containing files to check")
protected Path directory;
@Inject
private TenantIdSelectorService tenantService;
/** {@inheritDoc} **/
@Override
protected boolean loadExternalPlugins() {
@@ -112,7 +117,7 @@ public abstract class AbstractValidateCommand extends AbstractApiCommand {
try(DefaultHttpClient client = client()) {
MutableHttpRequest<String> request = HttpRequest
.POST(apiUri("/flows/validate"), body).contentType(MediaType.APPLICATION_YAML);
.POST(apiUri("/flows/validate", tenantService.getTenantId(tenantId)), body).contentType(MediaType.APPLICATION_YAML);
List<ValidateConstraintViolation> validations = client.toBlocking().retrieve(
this.requestOptions(request),

View File

@@ -2,11 +2,13 @@ package io.kestra.cli.commands.flows;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -23,6 +25,9 @@ public class FlowCreateCommand extends AbstractApiCommand {
@CommandLine.Parameters(index = "0", description = "The file containing the flow")
public Path flowFile;
@Inject
private TenantIdSelectorService tenantService;
@SuppressWarnings("deprecation")
@Override
public Integer call() throws Exception {
@@ -34,7 +39,7 @@ public class FlowCreateCommand extends AbstractApiCommand {
try(DefaultHttpClient client = client()) {
MutableHttpRequest<String> request = HttpRequest
.POST(apiUri("/flows"), body).contentType(MediaType.APPLICATION_YAML);
.POST(apiUri("/flows", tenantService.getTenantId(tenantId)), body).contentType(MediaType.APPLICATION_YAML);
client.toBlocking().retrieve(
this.requestOptions(request),

View File

@@ -2,10 +2,12 @@ package io.kestra.cli.commands.flows;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -23,6 +25,9 @@ public class FlowDeleteCommand extends AbstractApiCommand {
@CommandLine.Parameters(index = "1", description = "The ID of the flow")
public String id;
@Inject
private TenantIdSelectorService tenantService;
@SuppressWarnings("deprecation")
@Override
public Integer call() throws Exception {
@@ -30,7 +35,7 @@ public class FlowDeleteCommand extends AbstractApiCommand {
try(DefaultHttpClient client = client()) {
MutableHttpRequest<String> request = HttpRequest
.DELETE(apiUri("/flows/" + namespace + "/" + id ));
.DELETE(apiUri("/flows/" + namespace + "/" + id, tenantService.getTenantId(tenantId)));
client.toBlocking().exchange(
this.requestOptions(request)

View File

@@ -2,7 +2,7 @@ package io.kestra.cli.commands.flows;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.AbstractValidateCommand;
import io.micronaut.context.ApplicationContext;
import io.kestra.cli.services.TenantIdSelectorService;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MediaType;
@@ -25,9 +25,8 @@ import java.nio.file.Path;
public class FlowExportCommand extends AbstractApiCommand {
private static final String DEFAULT_FILE_NAME = "flows.zip";
// @FIXME: Keep it for bug in micronaut that need to have inject on top level command to inject on abstract classe
@Inject
private ApplicationContext applicationContext;
private TenantIdSelectorService tenantService;
@CommandLine.Option(names = {"--namespace"}, description = "The namespace of flows to export")
public String namespace;
@@ -41,7 +40,7 @@ public class FlowExportCommand extends AbstractApiCommand {
try(DefaultHttpClient client = client()) {
MutableHttpRequest<Object> request = HttpRequest
.GET(apiUri("/flows/export/by-query") + (namespace != null ? "?namespace=" + namespace : ""))
.GET(apiUri("/flows/export/by-query", tenantService.getTenantId(tenantId)) + (namespace != null ? "?namespace=" + namespace : ""))
.accept(MediaType.APPLICATION_OCTET_STREAM);
HttpResponse<byte[]> response = client.toBlocking().exchange(this.requestOptions(request), byte[].class);

View File

@@ -1,7 +1,8 @@
package io.kestra.cli.commands.flows;
import com.google.common.collect.ImmutableMap;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
@@ -30,7 +31,7 @@ import java.util.concurrent.TimeoutException;
description = "Test a flow"
)
@Slf4j
public class FlowTestCommand extends AbstractCommand {
public class FlowTestCommand extends AbstractApiCommand {
@Inject
private ApplicationContext applicationContext;
@@ -76,6 +77,7 @@ public class FlowTestCommand extends AbstractCommand {
FlowRepositoryInterface flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
FlowInputOutput flowInputOutput = applicationContext.getBean(FlowInputOutput.class);
RunnerUtils runnerUtils = applicationContext.getBean(RunnerUtils.class);
TenantIdSelectorService tenantService = applicationContext.getBean(TenantIdSelectorService.class);
Map<String, Object> inputs = new HashMap<>();
@@ -89,7 +91,7 @@ public class FlowTestCommand extends AbstractCommand {
try {
runner.run();
repositoryLoader.load(file.toFile());
repositoryLoader.load(tenantService.getTenantId(tenantId), file.toFile());
List<Flow> all = flowRepository.findAllForAllTenants();
if (all.size() != 1) {

View File

@@ -2,11 +2,13 @@ package io.kestra.cli.commands.flows;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -29,6 +31,9 @@ public class FlowUpdateCommand extends AbstractApiCommand {
@CommandLine.Parameters(index = "2", description = "The ID of the flow")
public String id;
@Inject
private TenantIdSelectorService tenantService;
@SuppressWarnings("deprecation")
@Override
public Integer call() throws Exception {
@@ -40,7 +45,7 @@ public class FlowUpdateCommand extends AbstractApiCommand {
try(DefaultHttpClient client = client()) {
MutableHttpRequest<String> request = HttpRequest
.PUT(apiUri("/flows/" + namespace + "/" + id ), body).contentType(MediaType.APPLICATION_YAML);
.PUT(apiUri("/flows/" + namespace + "/" + id, tenantService.getTenantId(tenantId)), body).contentType(MediaType.APPLICATION_YAML);
client.toBlocking().retrieve(
this.requestOptions(request),

View File

@@ -2,6 +2,7 @@ package io.kestra.cli.commands.flows;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.serializers.YamlParser;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
@@ -9,6 +10,7 @@ import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import jakarta.validation.ConstraintViolationException;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -36,6 +38,9 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
@CommandLine.Option(names = {"--namespace"}, description = "The parent namespace of the flows, if not set, every namespace are allowed.")
public String namespace;
@Inject
private TenantIdSelectorService tenantIdSelectorService;
@SuppressWarnings("deprecation")
@Override
public Integer call() throws Exception {
@@ -66,7 +71,7 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
namespaceQuery = "&namespace=" + namespace;
}
MutableHttpRequest<String> request = HttpRequest
.POST(apiUri("/flows/bulk") + "?allowNamespaceChild=true&delete=" + delete + namespaceQuery, body).contentType(MediaType.APPLICATION_YAML);
.POST(apiUri("/flows/bulk", tenantIdSelectorService.getTenantId(tenantId)) + "?allowNamespaceChild=true&delete=" + delete + namespaceQuery, body).contentType(MediaType.APPLICATION_YAML);
List<UpdateResult> updated = client.toBlocking().retrieve(
this.requestOptions(request),

View File

@@ -1,6 +1,7 @@
package io.kestra.cli.commands.flows;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.services.FlowService;
@@ -22,6 +23,9 @@ public class FlowValidateCommand extends AbstractValidateCommand {
@Inject
private FlowService flowService;
@Inject
private TenantIdSelectorService tenantService;
@Override
public Integer call() throws Exception {
return this.call(
@@ -35,7 +39,7 @@ public class FlowValidateCommand extends AbstractValidateCommand {
FlowWithSource flow = (FlowWithSource) object;
List<String> warnings = new ArrayList<>();
warnings.addAll(flowService.deprecationPaths(flow).stream().map(deprecation -> deprecation + " is deprecated").toList());
warnings.addAll(flowService.warnings(flow, this.tenantId));
warnings.addAll(flowService.warnings(flow, tenantService.getTenantId(tenantId)));
return warnings;
},
(Object object) -> {

View File

@@ -3,6 +3,7 @@ package io.kestra.cli.commands.flows.namespaces;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.commands.AbstractServiceNamespaceUpdateCommand;
import io.kestra.cli.commands.flows.IncludeHelperExpander;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.serializers.YamlParser;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
@@ -10,6 +11,7 @@ import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import jakarta.validation.ConstraintViolationException;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -30,6 +32,9 @@ public class FlowNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCo
@CommandLine.Option(names = {"--override-namespaces"}, negatable = true, description = "Replace namespace of all flows by the one provided")
public boolean override = false;
@Inject
private TenantIdSelectorService tenantService;
@SuppressWarnings("deprecation")
@Override
public Integer call() throws Exception {
@@ -59,7 +64,7 @@ public class FlowNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCo
}
try(DefaultHttpClient client = client()) {
MutableHttpRequest<String> request = HttpRequest
.POST(apiUri("/flows/") + namespace + "?delete=" + delete, body).contentType(MediaType.APPLICATION_YAML);
.POST(apiUri("/flows/", tenantService.getTenantId(tenantId)) + namespace + "?delete=" + delete, body).contentType(MediaType.APPLICATION_YAML);
List<UpdateResult> updated = client.toBlocking().retrieve(
this.requestOptions(request),

View File

@@ -2,12 +2,14 @@ package io.kestra.cli.commands.namespaces.files;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.utils.KestraIgnore;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MediaType;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.multipart.MultipartBody;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -34,6 +36,9 @@ public class NamespaceFilesUpdateCommand extends AbstractApiCommand {
@CommandLine.Option(names = {"--delete"}, negatable = true, description = "Whether missing should be deleted")
public boolean delete = false;
@Inject
private TenantIdSelectorService tenantService;
private static final String KESTRA_IGNORE_FILE = ".kestraignore";
@Override
@@ -44,7 +49,7 @@ public class NamespaceFilesUpdateCommand extends AbstractApiCommand {
try (var files = Files.walk(from); DefaultHttpClient client = client()) {
if (delete) {
client.toBlocking().exchange(this.requestOptions(HttpRequest.DELETE(apiUri("/namespaces/") + namespace + "/files?path=" + to, null)));
client.toBlocking().exchange(this.requestOptions(HttpRequest.DELETE(apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/files?path=" + to, null)));
}
KestraIgnore kestraIgnore = new KestraIgnore(from);
@@ -62,7 +67,7 @@ public class NamespaceFilesUpdateCommand extends AbstractApiCommand {
client.toBlocking().exchange(
this.requestOptions(
HttpRequest.POST(
apiUri("/namespaces/") + namespace + "/files?path=" + destination,
apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/files?path=" + destination,
body
).contentType(MediaType.MULTIPART_FORM_DATA)
)

View File

@@ -3,11 +3,13 @@ package io.kestra.cli.commands.namespaces.kv;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.serializers.JacksonMapper;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import picocli.CommandLine.Option;
@@ -42,6 +44,9 @@ public class KvUpdateCommand extends AbstractApiCommand {
@Option(names = {"-f", "--file-value"}, description = "The file from which to read the value to set. If this is provided, it will take precedence over any specified value.")
public Path fileValue;
@Inject
private TenantIdSelectorService tenantService;
@Override
public Integer call() throws Exception {
super.call();
@@ -56,7 +61,7 @@ public class KvUpdateCommand extends AbstractApiCommand {
Duration ttl = expiration == null ? null : Duration.parse(expiration);
MutableHttpRequest<String> request = HttpRequest
.PUT(apiUri("/namespaces/") + namespace + "/kv/" + key, value)
.PUT(apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/kv/" + key, value)
.contentType(MediaType.APPLICATION_JSON_TYPE);
if (ttl != null) {

View File

@@ -2,6 +2,7 @@ package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.cli.services.FileChangedEventListener;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
@@ -44,6 +45,9 @@ public class StandAloneCommand extends AbstractServerCommand {
@CommandLine.Option(names = {"-f", "--flow-path"}, description = "the flow path containing flow to inject at startup (when running with a memory flow repository)")
private File flowPath;
@CommandLine.Option(names = "--tenant", description = "Tenant identifier, Required to load flows from path with the enterprise edition")
private String tenantId;
@CommandLine.Option(names = {"--worker-thread"}, description = "the number of worker threads, defaults to four times the number of available processors. Set it to 0 to avoid starting a worker.")
private int workerThread = defaultWorkerThread();
@@ -98,7 +102,8 @@ public class StandAloneCommand extends AbstractServerCommand {
if (flowPath != null) {
try {
LocalFlowRepositoryLoader localFlowRepositoryLoader = applicationContext.getBean(LocalFlowRepositoryLoader.class);
localFlowRepositoryLoader.load(null, this.flowPath);
TenantIdSelectorService tenantIdSelectorService = applicationContext.getBean(TenantIdSelectorService.class);
localFlowRepositoryLoader.load(tenantIdSelectorService.getTenantId(this.tenantId), this.flowPath);
} catch (IOException e) {
throw new CommandLine.ParameterException(this.spec.commandLine(), "Invalid flow path", e);
}

View File

@@ -2,8 +2,8 @@ package io.kestra.cli.commands.templates;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.templates.TemplateEnabled;
import io.micronaut.context.ApplicationContext;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MediaType;
@@ -27,9 +27,8 @@ import java.nio.file.Path;
public class TemplateExportCommand extends AbstractApiCommand {
private static final String DEFAULT_FILE_NAME = "templates.zip";
// @FIXME: Keep it for bug in micronaut that need to have inject on top level command to inject on abstract classe
@Inject
private ApplicationContext applicationContext;
private TenantIdSelectorService tenantService;
@CommandLine.Option(names = {"--namespace"}, description = "The namespace of templates to export")
public String namespace;
@@ -43,7 +42,7 @@ public class TemplateExportCommand extends AbstractApiCommand {
try(DefaultHttpClient client = client()) {
MutableHttpRequest<Object> request = HttpRequest
.GET(apiUri("/templates/export/by-query") + (namespace != null ? "?namespace=" + namespace : ""))
.GET(apiUri("/templates/export/by-query", tenantService.getTenantId(tenantId)) + (namespace != null ? "?namespace=" + namespace : ""))
.accept(MediaType.APPLICATION_OCTET_STREAM);
HttpResponse<byte[]> response = client.toBlocking().exchange(this.requestOptions(request), byte[].class);

View File

@@ -2,6 +2,7 @@ package io.kestra.cli.commands.templates.namespaces;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.commands.AbstractServiceNamespaceUpdateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.core.serializers.YamlParser;
@@ -10,6 +11,7 @@ import io.micronaut.http.HttpRequest;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -27,6 +29,9 @@ import jakarta.validation.ConstraintViolationException;
@TemplateEnabled
public class TemplateNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCommand {
@Inject
private TenantIdSelectorService tenantService;
@Override
public Integer call() throws Exception {
super.call();
@@ -44,7 +49,7 @@ public class TemplateNamespaceUpdateCommand extends AbstractServiceNamespaceUpda
try (DefaultHttpClient client = client()) {
MutableHttpRequest<List<Template>> request = HttpRequest
.POST(apiUri("/templates/") + namespace + "?delete=" + delete, templates);
.POST(apiUri("/templates/", tenantService.getTenantId(tenantId)) + namespace + "?delete=" + delete, templates);
List<UpdateResult> updated = client.toBlocking().retrieve(
this.requestOptions(request),

View File

@@ -12,8 +12,8 @@ import io.kestra.core.services.PluginDefaultService;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
import io.micronaut.scheduling.io.watch.FileWatchConfiguration;
import jakarta.inject.Inject;
import jakarta.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import jakarta.validation.ConstraintViolationException;
import lombok.extern.slf4j.Slf4j;
@@ -26,6 +26,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
@Singleton
@Slf4j
@Requires(property = "micronaut.io.watch.enabled", value = "true")
@@ -111,6 +113,8 @@ public class FileChangedEventListener {
}
public void startListening(List<Path> paths) throws IOException, InterruptedException {
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
for (Path path : paths) {
path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY);
}
@@ -189,6 +193,8 @@ public class FileChangedEventListener {
}
private void loadFlowsFromFolder(Path folder) {
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
try {
Files.walkFileTree(folder, new SimpleFileVisitor<Path>() {
@Override
@@ -232,6 +238,8 @@ public class FileChangedEventListener {
}
private Optional<FlowWithSource> parseFlow(String content, Path entry) {
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
try {
FlowWithSource flow = pluginDefaultService.parseFlowWithAllDefaults(tenantId, content, false);
modelValidator.validate(flow);

View File

@@ -0,0 +1,19 @@
package io.kestra.cli.services;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import io.kestra.core.exceptions.KestraRuntimeException;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.StringUtils;
@Singleton
public class TenantIdSelectorService {
//For override purpose in Kestra EE
public String getTenantId(String tenantId) {
if (StringUtils.isNotBlank(tenantId) && !MAIN_TENANT.equals(tenantId)){
throw new KestraRuntimeException("Tenant id can only be 'main'");
}
return MAIN_TENANT;
}
}

View File

@@ -15,6 +15,9 @@ micronaut:
static:
paths: classpath:static
mapping: /static/**
root:
paths: classpath:root
mapping: /**
server:
max-request-size: 10GB
multipart:

View File

@@ -108,6 +108,34 @@ class FlowCreateOrUpdateCommandTest {
}
}
@Test
void should_fail_with_incorrect_tenant() {
URL directory = FlowCreateOrUpdateCommandTest.class.getClassLoader().getResource("flows");
ByteArrayOutputStream err = new ByteArrayOutputStream();
System.setErr(new PrintStream(err));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"--tenant", "incorrect",
directory.getPath(),
};
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
assertThat(err.toString()).contains("Tenant id can only be 'main'");
err.reset();
}
}
@Test
void helper() {
URL directory = FlowCreateOrUpdateCommandTest.class.getClassLoader().getResource("helper");

View File

@@ -21,7 +21,7 @@ import static org.assertj.core.api.Assertions.assertThat;
class PluginDocCommandTest {
public static final String PLUGIN_TEMPLATE_TEST = "plugin-template-test-0.18.0-SNAPSHOT.jar";
public static final String PLUGIN_TEMPLATE_TEST = "plugin-template-test-0.24.0-SNAPSHOT.jar";
@Test
void run() throws IOException, URISyntaxException {

View File

@@ -20,7 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat;
class PluginListCommandTest {
private static final String PLUGIN_TEMPLATE_TEST = "plugin-template-test-0.18.0-SNAPSHOT.jar";
private static final String PLUGIN_TEMPLATE_TEST = "plugin-template-test-0.24.0-SNAPSHOT.jar";
@Test
void shouldListPluginsInstalledLocally() throws IOException, URISyntaxException {

View File

@@ -0,0 +1,33 @@
package io.kestra.cli.commands.servers;
import static org.assertj.core.api.Assertions.assertThat;
import io.kestra.cli.App;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import org.junit.jupiter.api.Test;
public class TenantIdSelectorServiceTest {
@Test
void should_fail_without_tenant_id() {
ByteArrayOutputStream err = new ByteArrayOutputStream();
System.setErr(new PrintStream(err));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
String[] start = {
"server", "standalone",
"-f", "unused",
"--tenant", "wrong_tenant"
};
PicocliRunner.call(App.class, ctx, start);
assertThat(err.toString()).contains("Tenant id can only be 'main'");
err.reset();
}
}
}

View File

@@ -118,12 +118,13 @@ public class JsonSchemaGenerator {
if (jsonNode instanceof ObjectNode clazzSchema && clazzSchema.get("required") instanceof ArrayNode requiredPropsNode && clazzSchema.get("properties") instanceof ObjectNode properties) {
List<String> requiredFieldValues = StreamSupport.stream(requiredPropsNode.spliterator(), false)
.map(JsonNode::asText)
.toList();
.collect(Collectors.toList());
properties.fields().forEachRemaining(e -> {
int indexInRequiredArray = requiredFieldValues.indexOf(e.getKey());
if (indexInRequiredArray != -1 && e.getValue() instanceof ObjectNode valueNode && valueNode.has("default")) {
requiredPropsNode.remove(indexInRequiredArray);
requiredFieldValues.remove(indexInRequiredArray);
}
});

View File

@@ -284,7 +284,7 @@ public class HttpClient implements Closeable {
} else if (cls.isAssignableFrom(Byte[].class)) {
return (T) ArrayUtils.toObject(EntityUtils.toByteArray(entity));
} else {
return (T) JacksonMapper.ofJson().readValue(entity.getContent(), cls);
return (T) JacksonMapper.ofJson(false).readValue(entity.getContent(), cls);
}
}

View File

@@ -1,11 +1,14 @@
package io.kestra.core.metrics;
import io.kestra.core.models.ServerType;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.micronaut.configuration.metrics.aggregator.MeterRegistryConfigurer;
import io.micronaut.context.annotation.Requires;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import io.micronaut.context.annotation.Value;
import io.micronaut.core.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@@ -15,20 +18,26 @@ public class GlobalTagsConfigurer implements MeterRegistryConfigurer<SimpleMeter
@Inject
MetricConfig metricConfig;
@Nullable
@Value("${kestra.server-type}")
ServerType serverType;
@Override
public void configure(SimpleMeterRegistry meterRegistry) {
if (metricConfig.getTags() != null) {
meterRegistry
.config()
.commonTags(
metricConfig.getTags()
.entrySet()
.stream()
.flatMap(e -> Stream.of(e.getKey(), e.getValue()))
.toList()
.toArray(String[]::new)
);
}
String[] tags = Stream
.concat(
metricConfig.getTags() != null ? metricConfig.getTags()
.entrySet()
.stream()
.flatMap(e -> Stream.of(e.getKey(), e.getValue())) : Stream.empty(),
serverType != null ? Stream.of("server_type", serverType.name()) : Stream.empty()
)
.toList()
.toArray(String[]::new);
meterRegistry
.config()
.commonTags(tags);
}
@Override

View File

@@ -1,11 +1,10 @@
package io.kestra.core.models;
import io.kestra.core.utils.MapUtils;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
public record Label(@NotNull String key, @NotNull String value) {
@@ -29,11 +28,36 @@ public record Label(@NotNull String key, @NotNull String value) {
* @return the nested {@link Map}.
*/
public static Map<String, Object> toNestedMap(List<Label> labels) {
Map<String, Object> asMap = labels.stream()
return MapUtils.flattenToNestedMap(toMap(labels));
}
/**
* Static helper method for converting a list of labels to a flat map.
* Key order is kept.
*
* @param labels The list of {@link Label} to be converted.
* @return the flat {@link Map}.
*/
public static Map<String, String> toMap(@Nullable List<Label> labels) {
if (labels == null || labels.isEmpty()) return Collections.emptyMap();
return labels.stream()
.filter(label -> label.value() != null && label.key() != null)
// using an accumulator in case labels with the same key exists: the first is kept
.collect(Collectors.toMap(Label::key, Label::value, (first, second) -> first));
return MapUtils.flattenToNestedMap(asMap);
// using an accumulator in case labels with the same key exists: the second is kept
.collect(Collectors.toMap(Label::key, Label::value, (first, second) -> second, LinkedHashMap::new));
}
/**
* Static helper method for deduplicating a list of labels by their key.
* Value of the last key occurrence is kept.
*
* @param labels The list of {@link Label} to be deduplicated.
* @return the deduplicated {@link List}.
*/
public static List<Label> deduplicate(@Nullable List<Label> labels) {
if (labels == null || labels.isEmpty()) return Collections.emptyList();
return toMap(labels).entrySet().stream()
.map(entry -> new Label(entry.getKey(), entry.getValue()))
.collect(Collectors.toCollection(ArrayList::new));
}
/**

View File

@@ -125,7 +125,7 @@ public record QueryFilter(
END_DATE("endDate") {
@Override
public List<Op> supportedOp() {
return List.of(Op.GREATER_THAN, Op.LESS_THAN, Op.EQUALS, Op.NOT_EQUALS);
return List.of(Op.GREATER_THAN_OR_EQUAL_TO, Op.GREATER_THAN, Op.LESS_THAN_OR_EQUAL_TO, Op.LESS_THAN, Op.EQUALS, Op.NOT_EQUALS);
}
},
STATE("state") {

View File

@@ -15,6 +15,8 @@ import jakarta.validation.constraints.NotNull;
@NoArgsConstructor
public class Setting {
public static final String INSTANCE_UUID = "instance.uuid";
public static final String INSTANCE_VERSION = "instance.version";
@NotNull
private String key;

View File

@@ -24,6 +24,7 @@ import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
import io.kestra.core.services.LabelService;
import io.kestra.core.test.flow.TaskFixture;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.MapUtils;
import io.swagger.v3.oas.annotations.Hidden;
import jakarta.annotation.Nullable;
@@ -132,7 +133,7 @@ public class Execution implements DeletedInterface, TenantInterface {
}
public List<Label> getLabels() {
return Optional.ofNullable(this.labels).orElse(new ArrayList<>());
return ListUtils.emptyOnNull(this.labels);
}
/**
@@ -156,6 +157,7 @@ public class Execution implements DeletedInterface, TenantInterface {
.flowRevision(flow.getRevision())
.state(new State())
.scheduleDate(scheduleDate.map(ChronoZonedDateTime::toInstant).orElse(null))
.variables(flow.getVariables())
.build();
List<Label> executionLabels = new ArrayList<>(LabelService.labelsExcludingSystem(flow));
@@ -176,8 +178,22 @@ public class Execution implements DeletedInterface, TenantInterface {
}
/**
* Customization of Lombok-generated builder.
*/
public static class ExecutionBuilder {
/**
* Enforce unique values of {@link Label} when using the builder.
*
* @param labels The labels.
* @return Deduplicated labels.
*/
public ExecutionBuilder labels(List<Label> labels) {
this.labels = Label.deduplicate(labels);
return this;
}
void prebuild() {
this.originalId = this.id;
this.metadata = ExecutionMetadata.builder()
@@ -225,7 +241,6 @@ public class Execution implements DeletedInterface, TenantInterface {
}
public Execution withLabels(List<Label> labels) {
return new Execution(
this.tenantId,
this.id,
@@ -235,7 +250,7 @@ public class Execution implements DeletedInterface, TenantInterface {
this.taskRunList,
this.inputs,
this.outputs,
labels,
Label.deduplicate(labels),
this.variables,
this.state,
this.parentId,
@@ -365,7 +380,7 @@ public class Execution implements DeletedInterface, TenantInterface {
*
* @param resolvedTasks normal tasks
* @param resolvedErrors errors tasks
* @param resolvedErrors finally tasks
* @param resolvedFinally finally tasks
* @return the flow we need to follow
*/
public List<ResolvedTask> findTaskDependingFlowState(
@@ -991,6 +1006,16 @@ public class Execution implements DeletedInterface, TenantInterface {
return result;
}
/**
* Find all children of this {@link TaskRun}.
*/
public List<TaskRun> findChildren(TaskRun parentTaskRun) {
return taskRunList.stream()
.filter(taskRun -> parentTaskRun.getId().equals(taskRun.getParentTaskRunId()))
.toList();
}
public List<String> findParentsValues(TaskRun taskRun, boolean withCurrent) {
return (withCurrent ?
Stream.concat(findParents(taskRun).stream(), Stream.of(taskRun)) :

View File

@@ -7,6 +7,7 @@ import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.utils.IdUtils;
import io.swagger.v3.oas.annotations.Hidden;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import lombok.*;
@@ -62,6 +63,11 @@ public class TaskRun implements TenantInterface {
@With
Boolean dynamic;
// Set it to true to force execution even if the execution is killed
@Nullable
@With
Boolean forceExecution;
@Deprecated
public void setItems(String items) {
// no-op for backward compatibility
@@ -81,7 +87,8 @@ public class TaskRun implements TenantInterface {
this.outputs,
this.state.withState(state),
this.iteration,
this.dynamic
this.dynamic,
this.forceExecution
);
}
@@ -99,7 +106,8 @@ public class TaskRun implements TenantInterface {
this.outputs,
newState,
this.iteration,
this.dynamic
this.dynamic,
this.forceExecution
);
}
@@ -121,7 +129,8 @@ public class TaskRun implements TenantInterface {
this.outputs,
this.state.withState(State.Type.FAILED),
this.iteration,
this.dynamic
this.dynamic,
this.forceExecution
);
}

View File

@@ -1,20 +1,19 @@
package io.kestra.core.models.flows;
import io.micronaut.core.annotation.Introspected;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Positive;
@SuperBuilder
@Getter
@NoArgsConstructor
@Introspected
public class Concurrency {
@Positive
@Min(1)
@NotNull
private Integer limit;

View File

@@ -19,7 +19,6 @@ import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.services.FlowService;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.validations.FlowValidation;
import io.micronaut.core.annotation.Introspected;
@@ -30,8 +29,6 @@ import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.stream.Collectors;
@@ -187,19 +184,32 @@ public class Flow extends AbstractFlow implements HasUID {
.toList();
}
public List<Task> allErrorsWithChilds() {
public List<Task> allErrorsWithChildren() {
var allErrors = allTasksWithChilds().stream()
.filter(task -> task.isFlowable() && ((FlowableTask<?>) task).getErrors() != null)
.flatMap(task -> ((FlowableTask<?>) task).getErrors().stream())
.collect(Collectors.toCollection(ArrayList::new));
if (this.getErrors() != null && !this.getErrors().isEmpty()) {
if (!ListUtils.isEmpty(this.getErrors())) {
allErrors.addAll(this.getErrors());
}
return allErrors;
}
public List<Task> allFinallyWithChildren() {
var allFinally = allTasksWithChilds().stream()
.filter(task -> task.isFlowable() && ((FlowableTask<?>) task).getFinally() != null)
.flatMap(task -> ((FlowableTask<?>) task).getFinally().stream())
.collect(Collectors.toCollection(ArrayList::new));
if (!ListUtils.isEmpty(this.getFinally())) {
allFinally.addAll(this.getFinally());
}
return allFinally;
}
public Task findParentTasksByTaskId(String taskId) {
return allTasksWithChilds()
.stream()

View File

@@ -3,6 +3,7 @@ package io.kestra.core.models.flows;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.kestra.core.models.tasks.Task;
import io.micronaut.core.annotation.Introspected;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
@@ -115,7 +116,7 @@ public class State {
}
public Instant maxDate() {
if (this.histories.size() == 0) {
if (this.histories.isEmpty()) {
return Instant.now();
}
@@ -123,7 +124,7 @@ public class State {
}
public Instant minDate() {
if (this.histories.size() == 0) {
if (this.histories.isEmpty()) {
return Instant.now();
}
@@ -167,6 +168,11 @@ public class State {
return this.current.isPaused();
}
@JsonIgnore
public boolean isQueued() {
return this.current.isQueued();
}
@JsonIgnore
public boolean isRetrying() {
return this.current.isRetrying();
@@ -200,6 +206,14 @@ public class State {
return this.histories.get(this.histories.size() - 2).state.isPaused();
}
/**
* Return true if the execution has failed, then was restarted.
* This is to disambiguate between a RESTARTED after PAUSED and RESTARTED after FAILED state.
*/
public boolean failedThenRestarted() {
return this.current == Type.RESTARTED && this.histories.get(this.histories.size() - 2).state.isFailed();
}
@Introspected
public enum Type {
CREATED,
@@ -253,9 +267,26 @@ public class State {
return this == Type.KILLED;
}
public boolean isQueued(){
return this == Type.QUEUED;
}
/**
* @return states that are terminal to an execution
*/
public static List<Type> terminatedTypes() {
return Stream.of(Type.values()).filter(type -> type.isTerminated()).toList();
}
/**
* Compute the final 'failure' of a task depending on <code>allowFailure</code> and <code>allowWarning</code>:
* - if both are true -> SUCCESS
* - if only <code>allowFailure</code> is true -> WARNING
* - if none -> FAILED
*/
public static State.Type fail(Task task) {
return task.isAllowFailure() ? (task.isAllowWarning() ? State.Type.SUCCESS : State.Type.WARNING) : State.Type.FAILED;
}
}
@Value

View File

@@ -108,7 +108,7 @@ public class MultiselectInput extends Input<List<String>> implements ItemTypeInt
private List<String> renderExpressionValues(final Function<String, Object> renderer) {
Object result;
try {
result = renderer.apply(expression);
result = renderer.apply(expression.trim());
} catch (Exception e) {
throw ManualConstraintViolation.toConstraintViolationException(
"Cannot render 'expression'. Cause: " + e.getMessage(),

View File

@@ -86,7 +86,7 @@ public class SelectInput extends Input<String> implements RenderableInput {
private List<String> renderExpressionValues(final Function<String, Object> renderer) {
Object result;
try {
result = renderer.apply(expression);
result = renderer.apply(expression.trim());
} catch (Exception e) {
throw ManualConstraintViolation.toConstraintViolationException(
"Cannot render 'expression'. Cause: " + e.getMessage(),

View File

@@ -68,6 +68,19 @@ public class Property<T> {
String getExpression() {
return expression;
}
/**
* Returns a new {@link Property} with no cached rendered value,
* so that the next render will evaluate its original Pebble expression.
* <p>
* The returned property will still cache its rendered result.
* To re-evaluate on a subsequent render, call {@code skipCache()} again.
*
* @return a new {@link Property} without a pre-rendered value
*/
public Property<T> skipCache() {
return Property.ofExpression(expression);
}
/**
* Build a new Property object with a value already set.<br>

View File

@@ -30,7 +30,7 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
* Helper class for task runners and script tasks.
*/
public final class ScriptService {
private static final Pattern INTERNAL_STORAGE_PATTERN = Pattern.compile("(kestra:\\/\\/[-a-zA-Z0-9%._\\+~#=/]*)");
private static final Pattern INTERNAL_STORAGE_PATTERN = Pattern.compile("(kestra:\\/\\/[-\\p{Alnum}._\\+~#=/]*)", Pattern.UNICODE_CHARACTER_CLASS);
// These are the three common additional variables task runners must provide for variable rendering.
public static final String VAR_WORKING_DIR = "workingDir";

View File

@@ -329,6 +329,14 @@ public class DefaultPluginRegistry implements PluginRegistry {
pluginClassByIdentifier.clear();
}
/**
* {@inheritDoc}
**/
@Override
public boolean isVersioningSupported() {
return false;
}
public record PluginBundleIdentifier(@Nullable URL location) {
public static PluginBundleIdentifier CORE = new PluginBundleIdentifier(null);

View File

@@ -151,7 +151,7 @@ public class LocalPluginManager implements PluginManager {
* {@inheritDoc}
**/
@Override
public PluginArtifact install(File file, boolean installForRegistration, @Nullable Path localRepositoryPath) {
public PluginArtifact install(File file, boolean installForRegistration, @Nullable Path localRepositoryPath, boolean forceInstallOnExistingVersions) {
try {
PluginArtifact artifact = PluginArtifact.fromFile(file);
log.info("Installing managed plugin artifact '{}'", artifact);

View File

@@ -55,14 +55,16 @@ public interface PluginManager extends AutoCloseable {
/**
* Installs the given plugin artifact.
*
* @param file the plugin JAR file.
* @param installForRegistration specify whether plugin artifacts should be scanned and registered.
* @param localRepositoryPath the optional local repository path to install artifact.
* @param file the plugin JAR file.
* @param installForRegistration specify whether plugin artifacts should be scanned and registered.
* @param localRepositoryPath the optional local repository path to install artifact.
* @param forceInstallOnExistingVersions specify whether plugin should be forced install upon the existing one
* @return The URI of the installed plugin.
*/
PluginArtifact install(final File file,
boolean installForRegistration,
@Nullable Path localRepositoryPath);
@Nullable Path localRepositoryPath,
boolean forceInstallOnExistingVersions);
/**
* Installs the given plugin artifact.

View File

@@ -116,4 +116,11 @@ public interface PluginRegistry {
default void clear() {
}
/**
* Checks whether plugin-versioning is supported by this registry.
*
* @return {@code true} if supported. Otherwise {@code false}.
*/
boolean isVersioningSupported();
}

View File

@@ -18,9 +18,11 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystemNotFoundException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -202,19 +204,13 @@ public class PluginScanner {
var guidesDirectory = classLoader.getResource("doc/guides");
if (guidesDirectory != null) {
try (var fileSystem = FileSystems.newFileSystem(guidesDirectory.toURI(), Collections.emptyMap())) {
var root = fileSystem.getPath("/doc/guides");
try (var stream = Files.walk(root, 1)) {
stream
.skip(1) // first element is the root element
.sorted(Comparator.comparing(path -> path.getName(path.getParent().getNameCount()).toString()))
.forEach(guide -> {
var guideName = guide.getName(guide.getParent().getNameCount()).toString();
guides.add(guideName.substring(0, guideName.lastIndexOf('.')));
});
}
try {
var root = Path.of(guidesDirectory.toURI());
addGuides(root, guides);
} catch (IOException | URISyntaxException e) {
// silently fail
} catch (FileSystemNotFoundException e) {
addGuidesThroughNewFileSystem(guidesDirectory, guides);
}
}
@@ -243,6 +239,27 @@ public class PluginScanner {
.build();
}
private static void addGuidesThroughNewFileSystem(URL guidesDirectory, List<String> guides) {
try (var fileSystem = FileSystems.newFileSystem(guidesDirectory.toURI(), Collections.emptyMap())) {
var root = fileSystem.getPath("doc/guides");
addGuides(root, guides);
} catch (IOException | URISyntaxException e) {
// silently fail
}
}
private static void addGuides(Path root, List<String> guides) throws IOException {
try (var stream = Files.walk(root, 1)) {
stream
.skip(1) // first element is the root element
.sorted(Comparator.comparing(path -> path.getName(path.getParent().getNameCount()).toString()))
.forEach(guide -> {
var guideName = guide.getName(guide.getParent().getNameCount()).toString();
guides.add(guideName.substring(0, guideName.lastIndexOf('.')));
});
}
}
public static Manifest getManifest(ClassLoader classLoader) {
try {
URL url = classLoader.getResource(JarFile.MANIFEST_NAME);

View File

@@ -86,7 +86,7 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
DeserializationContext context) throws IOException {
Class<? extends Plugin> pluginType = null;
final String identifier = extractPluginRawIdentifier(node);
final String identifier = extractPluginRawIdentifier(node, pluginRegistry.isVersioningSupported());
if (identifier != null) {
log.trace("Looking for Plugin for: {}",
identifier
@@ -103,7 +103,7 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
);
if (DataChart.class.isAssignableFrom(pluginType)) {
final Class<? extends Plugin> dataFilterClass = pluginRegistry.findClassByIdentifier(extractPluginRawIdentifier(node.get("data")));
final Class<? extends Plugin> dataFilterClass = pluginRegistry.findClassByIdentifier(extractPluginRawIdentifier(node.get("data"), pluginRegistry.isVersioningSupported()));
ParameterizedType genericDataFilterClass = (ParameterizedType) dataFilterClass.getGenericSuperclass();
Type dataFieldsEnum = genericDataFilterClass.getActualTypeArguments()[0];
TypeFactory typeFactory = JacksonMapper.ofJson().getTypeFactory();
@@ -142,7 +142,7 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
);
}
static String extractPluginRawIdentifier(final JsonNode node) {
static String extractPluginRawIdentifier(final JsonNode node, final boolean isVersioningSupported) {
String type = Optional.ofNullable(node.get(TYPE)).map(JsonNode::textValue).orElse(null);
String version = Optional.ofNullable(node.get(VERSION)).map(JsonNode::textValue).orElse(null);
@@ -150,6 +150,6 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
return null;
}
return version != null && !version.isEmpty() ? type + ":" + version : type;
return isVersioningSupported && version != null && !version.isEmpty() ? type + ":" + version : type;
}
}

View File

@@ -28,6 +28,7 @@ public interface QueueFactoryInterface {
String SUBFLOWEXECUTIONRESULT_NAMED = "subflowExecutionResultQueue";
String CLUSTER_EVENT_NAMED = "clusterEventQueue";
String SUBFLOWEXECUTIONEND_NAMED = "subflowExecutionEndQueue";
String EXECUTION_RUNNING_NAMED = "executionRunningQueue";
QueueInterface<Execution> execution();
@@ -62,4 +63,6 @@ public interface QueueFactoryInterface {
QueueInterface<SubflowExecutionResult> subflowExecutionResult();
QueueInterface<SubflowExecutionEnd> subflowExecutionEnd();
QueueInterface<ExecutionRunning> executionRunning();
}

View File

@@ -5,6 +5,7 @@ import io.kestra.core.models.Pauseable;
import io.kestra.core.utils.Either;
import java.io.Closeable;
import java.util.List;
import java.util.function.Consumer;
public interface QueueInterface<T> extends Closeable, Pauseable {
@@ -18,7 +19,15 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
emitAsync(null, message);
}
void emitAsync(String consumerGroup, T message) throws QueueException;
default void emitAsync(String consumerGroup, T message) throws QueueException {
emitAsync(consumerGroup, List.of(message));
}
default void emitAsync(List<T> messages) throws QueueException {
emitAsync(null, messages);
}
void emitAsync(String consumerGroup, List<T> messages) throws QueueException;
default void delete(T message) throws QueueException {
delete(null, message);

View File

@@ -27,8 +27,6 @@ public class QueueService {
return ((Executor) object).getExecution().getId();
} else if (object.getClass() == MetricEntry.class) {
return null;
} else if (object.getClass() == ExecutionRunning.class) {
return ((ExecutionRunning) object).getExecution().getId();
} else if (object.getClass() == SubflowExecutionEnd.class) {
return ((SubflowExecutionEnd) object).getParentExecutionId();
} else {

View File

@@ -0,0 +1,12 @@
package io.kestra.core.queues;
import java.io.Serial;
public class UnsupportedMessageException extends QueueException {
@Serial
private static final long serialVersionUID = 1L;
public UnsupportedMessageException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -1,5 +1,6 @@
package io.kestra.core.runners;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.utils.IdUtils;
import jakarta.validation.constraints.NotNull;
@@ -11,7 +12,7 @@ import lombok.With;
@Value
@AllArgsConstructor
@Builder
public class ExecutionRunning {
public class ExecutionRunning implements HasUID {
String tenantId;
@NotNull
@@ -26,6 +27,7 @@ public class ExecutionRunning {
@With
ConcurrencyState concurrencyState;
@Override
public String uid() {
return IdUtils.fromPartsAndSeparator('|', this.tenantId, this.namespace, this.flowId, this.execution.getId());
}

View File

@@ -85,7 +85,8 @@ public class Executor {
}
public Boolean canBeProcessed() {
return !(this.getException() != null || this.getFlow() == null || this.getFlow() instanceof FlowWithException || this.getFlow().getTasks() == null || this.getExecution().isDeleted() || this.getExecution().getState().isPaused());
return !(this.getException() != null || this.getFlow() == null || this.getFlow() instanceof FlowWithException || this.getFlow().getTasks() == null ||
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isQueued());
}
public Executor withFlow(FlowWithSource flow) {

View File

@@ -67,6 +67,9 @@ public class ExecutorService {
@Inject
private WorkerGroupExecutorInterface workerGroupExecutorInterface;
@Inject
private WorkerJobRunningStateStore workerJobRunningStateStore;
protected FlowMetaStoreInterface flowExecutorInterface;
@Inject
@@ -97,49 +100,39 @@ public class ExecutorService {
return this.flowExecutorInterface;
}
public Executor checkConcurrencyLimit(Executor executor, FlowInterface flow, Execution execution, long count) {
// if above the limit, handle concurrency limit based on its behavior
if (count >= flow.getConcurrency().getLimit()) {
public ExecutionRunning processExecutionRunning(FlowInterface flow, int runningCount, ExecutionRunning executionRunning) {
// if concurrency was removed, it can be null as we always get the latest flow definition
if (flow.getConcurrency() != null && runningCount >= flow.getConcurrency().getLimit()) {
return switch (flow.getConcurrency().getBehavior()) {
case QUEUE -> {
var newExecution = execution.withState(State.Type.QUEUED);
ExecutionRunning executionRunning = ExecutionRunning.builder()
.tenantId(flow.getTenantId())
.namespace(flow.getNamespace())
.flowId(flow.getId())
.execution(newExecution)
.concurrencyState(ExecutionRunning.ConcurrencyState.QUEUED)
.build();
// when max concurrency is reached, we throttle the execution and stop processing
logService.logExecution(
newExecution,
executionRunning.getExecution(),
Level.INFO,
"Flow is queued due to concurrency limit exceeded, {} running(s)",
count
"Execution is queued due to concurrency limit exceeded, {} running(s)",
runningCount
);
// return the execution queued
yield executor
.withExecutionRunning(executionRunning)
.withExecution(newExecution, "checkConcurrencyLimit");
var newExecution = executionRunning.getExecution().withState(State.Type.QUEUED);
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_QUEUED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_QUEUED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
yield executionRunning
.withExecution(newExecution)
.withConcurrencyState(ExecutionRunning.ConcurrencyState.QUEUED);
}
case CANCEL ->
executor.withExecution(execution.withState(State.Type.CANCELLED), "checkConcurrencyLimit");
executionRunning
.withExecution(executionRunning.getExecution().withState(State.Type.CANCELLED))
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
case FAIL ->
executor.withException(new IllegalStateException("Flow is FAILED due to concurrency limit exceeded"), "checkConcurrencyLimit");
executionRunning
.withExecution(executionRunning.getExecution().failedExecutionFromExecutor(new IllegalStateException("Execution is FAILED due to concurrency limit exceeded")).getExecution())
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
};
}
// if under the limit, update the executor with a RUNNING ExecutionRunning to track them
var executionRunning = new ExecutionRunning(
flow.getTenantId(),
flow.getNamespace(),
flow.getId(),
executor.getExecution(),
ExecutionRunning.ConcurrencyState.RUNNING
);
return executor.withExecutionRunning(executionRunning);
// if under the limit, run it!
return executionRunning
.withExecution(executionRunning.getExecution().withState(State.Type.RUNNING))
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
}
public Executor process(Executor executor) {
@@ -242,9 +235,9 @@ public class ExecutorService {
try {
state = flowableParent.resolveState(runContext, execution, parentTaskRun);
} catch (Exception e) {
// This will lead to the next task being still executed but at least Kestra will not crash.
// This will lead to the next task being still executed, but at least Kestra will not crash.
// This is the best we can do, Flowable task should not fail, so it's a kind of panic mode.
runContext.logger().error("Unable to resolve state from the Flowable task: " + e.getMessage(), e);
runContext.logger().error("Unable to resolve state from the Flowable task: {}", e.getMessage(), e);
state = Optional.of(State.Type.FAILED);
}
Optional<WorkerTaskResult> endedTask = childWorkerTaskTypeToWorkerTask(
@@ -592,6 +585,23 @@ public class ExecutorService {
list = list.stream().filter(workerTaskResult -> !workerTaskResult.getTaskRun().getId().equals(taskRun.getParentTaskRunId()))
.collect(Collectors.toCollection(ArrayList::new));
}
// If the task is a flowable and its terminated, check that all children are terminated.
// This may not be the case for parallel flowable tasks like Parallel, Dag, ForEach...
// After a fail task, some child flowable may not be correctly terminated.
if (task instanceof FlowableTask<?> && taskRun.getState().isTerminated()) {
List<TaskRun> updated = executor.getExecution().findChildren(taskRun).stream()
.filter(child -> !child.getState().isTerminated())
.map(throwFunction(child -> child.withState(taskRun.getState().getCurrent())))
.toList();
if (!updated.isEmpty()) {
Execution execution = executor.getExecution();
for (TaskRun child : updated) {
execution = execution.withTaskRun(child);
}
executor = executor.withExecution(execution, "handledTerminatedFlowableTasks");
}
}
}
metricRegistry
@@ -664,7 +674,7 @@ public class ExecutorService {
.taskRunId(workerTaskResult.getTaskRun().getId())
.executionId(executor.getExecution().getId())
.date(workerTaskResult.getTaskRun().getState().maxDate().plus(duration != null ? duration : timeout))
.state(duration != null ? behavior.mapToState() : State.Type.FAILED)
.state(duration != null ? behavior.mapToState() : State.Type.fail(pauseTask))
.delayType(ExecutionDelay.DelayType.RESUME_FLOW)
.build();
}
@@ -732,6 +742,7 @@ public class ExecutorService {
List<TaskRun> afterExecutionNexts = FlowableUtils.resolveSequentialNexts(executor.getExecution(), afterExecutionResolvedTasks)
.stream()
.map(throwFunction(NextTaskRun::getTaskRun))
.map(taskRun -> taskRun.withForceExecution(true)) // forceExecution so it would be executed even if the execution is killed
.toList();
if (!afterExecutionNexts.isEmpty()) {
return executor.withTaskRun(afterExecutionNexts, "handleAfterExecution ");
@@ -1072,6 +1083,25 @@ public class ExecutorService {
newExecution = executionService.killParentTaskruns(taskRun, newExecution);
}
executor.withExecution(newExecution, "addWorkerTaskResult");
if (taskRun.getState().isTerminated()) {
log.trace("TaskRun terminated: {}", taskRun);
workerJobRunningStateStore.deleteByKey(taskRun.getId());
metricRegistry
.counter(
MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_COUNT,
MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_COUNT_DESCRIPTION,
metricRegistry.tags(workerTaskResult)
)
.increment();
metricRegistry
.timer(
MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_DURATION,
MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_DURATION_DESCRIPTION,
metricRegistry.tags(workerTaskResult)
)
.record(taskRun.getState().getDuration());
}
}
// Note: as the flow is only used in an error branch and it can take time to load, we pass it thought a Supplier

View File

@@ -286,18 +286,10 @@ public class FlowableUtils {
// start as many tasks as we have concurrency slots
return collect.values().stream()
.map(resolvedTasks -> filterCreated(resolvedTasks, taskRuns, parentTaskRun))
.map(resolvedTasks -> resolveSequentialNexts(execution, resolvedTasks, null, null, parentTaskRun))
.filter(resolvedTasks -> !resolvedTasks.isEmpty())
.limit(concurrencySlots)
.map(resolvedTasks -> resolvedTasks.getFirst().toNextTaskRun(execution))
.toList();
}
private static List<ResolvedTask> filterCreated(List<ResolvedTask> tasks, List<TaskRun> taskRuns, TaskRun parentTaskRun) {
return tasks.stream()
.filter(resolvedTask -> taskRuns.stream()
.noneMatch(taskRun -> FlowableUtils.isTaskRunFor(resolvedTask, taskRun, parentTaskRun))
)
.map(resolvedTasks -> resolvedTasks.getFirst())
.toList();
}

View File

@@ -29,7 +29,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
public class RunContextLogger implements Supplier<org.slf4j.Logger> {
private static final int MAX_MESSAGE_LENGTH = 1024 * 10;
private static final int MAX_MESSAGE_LENGTH = 1024 * 15;
public static final String ORIGINAL_TIMESTAMP_KEY = "originalTimestamp";
private final String loggerName;
@@ -80,7 +80,6 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
}
List<LogEntry> result = new ArrayList<>();
long i = 0;
for (String s : split) {
result.add(LogEntry.builder()
.namespace(logEntry.getNamespace())
@@ -98,7 +97,6 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
.thread(event.getThreadName())
.build()
);
i++;
}
return result;
@@ -331,14 +329,11 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
protected void append(ILoggingEvent e) {
e = this.transform(e);
logEntries(e, logEntry)
.forEach(l -> {
try {
logQueue.emitAsync(l);
} catch (QueueException ex) {
log.warn("Unable to emit logQueue", ex);
}
});
try {
logQueue.emitAsync(logEntries(e, logEntry));
} catch (QueueException ex) {
log.warn("Unable to emit logQueue", ex);
}
}
}

View File

@@ -4,15 +4,11 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.Validator;
import lombok.extern.slf4j.Slf4j;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static io.kestra.core.utils.Rethrow.throwFunction;
@@ -27,12 +23,19 @@ public class RunContextProperty<T> {
private final RunContext runContext;
private final Task task;
private final AbstractTrigger trigger;
private final boolean skipCache;
RunContextProperty(Property<T> property, RunContext runContext) {
this(property, runContext, false);
}
RunContextProperty(Property<T> property, RunContext runContext, boolean skipCache) {
this.property = property;
this.runContext = runContext;
this.task = ((DefaultRunContext) runContext).getTask();
this.trigger = ((DefaultRunContext) runContext).getTrigger();
this.skipCache = skipCache;
}
private void validate() {
@@ -45,6 +48,19 @@ public class RunContextProperty<T> {
log.trace("Unable to do validation: no task or trigger found");
}
}
/**
* Returns a new {@link RunContextProperty} that will always be rendered by evaluating
* its original Pebble expression, without using any previously cached value.
* <p>
* This ensures that each time the property is rendered, the underlying
* expression is re-evaluated to produce a fresh result.
*
* @return a new {@link Property} that bypasses the cache
*/
public RunContextProperty<T> skipCache() {
return new RunContextProperty<>(this.property, this.runContext, true);
}
/**
* Render a property then convert it to its target type and validate it.<br>
@@ -55,13 +71,13 @@ public class RunContextProperty<T> {
* Warning, due to the caching mechanism, this method is not thread-safe.
*/
public Optional<T> as(Class<T> clazz) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.as(prop, this.runContext, clazz)));
validate();
return as;
}
/**
* Render a property with additional variables, then convert it to its target type and validate it.<br>
*
@@ -71,7 +87,7 @@ public class RunContextProperty<T> {
* Warning, due to the caching mechanism, this method is not thread-safe.
*/
public Optional<T> as(Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.as(prop, this.runContext, clazz, variables)));
validate();
@@ -89,7 +105,7 @@ public class RunContextProperty<T> {
*/
@SuppressWarnings("unchecked")
public <I> T asList(Class<I> itemClazz) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz)))
.orElse((T) Collections.emptyList());
@@ -108,7 +124,7 @@ public class RunContextProperty<T> {
*/
@SuppressWarnings("unchecked")
public <I> T asList(Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz, variables)))
.orElse((T) Collections.emptyList());
@@ -127,7 +143,7 @@ public class RunContextProperty<T> {
*/
@SuppressWarnings("unchecked")
public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass)))
.orElse((T) Collections.emptyMap());
@@ -146,11 +162,15 @@ public class RunContextProperty<T> {
*/
@SuppressWarnings("unchecked")
public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass, variables)))
.orElse((T) Collections.emptyMap());
validate();
return as;
}
private Property<T> getProperty() {
return skipCache ? this.property.skipCache() : this.property;
}
}

View File

@@ -12,6 +12,7 @@ import io.kestra.core.models.flows.input.SecretInput;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.utils.ListUtils;
import io.kestra.plugin.core.trigger.Schedule;
import lombok.AllArgsConstructor;
import lombok.With;
@@ -27,6 +28,7 @@ import java.util.function.Consumer;
*/
public final class RunVariables {
public static final String SECRET_CONSUMER_VARIABLE_NAME = "addSecretConsumer";
public static final String FIXTURE_FILES_KEY = "io.kestra.datatype:test_fixtures_files";
/**
* Creates an immutable map representation of the given {@link Task}.
@@ -181,9 +183,6 @@ public final class RunVariables {
// Flow
if (flow != null) {
builder.put("flow", RunVariables.of(flow));
if (flow.getVariables() != null) {
builder.put("vars", flow.getVariables());
}
}
// Task
@@ -298,16 +297,19 @@ public final class RunVariables {
if (execution.getTrigger() != null && execution.getTrigger().getVariables() != null) {
builder.put("trigger", execution.getTrigger().getVariables());
// temporal hack to add back the `schedule`variables
// will be removed in 2.0
if (Schedule.class.getName().equals(execution.getTrigger().getType())) {
// add back its variables inside the `schedule` variables
builder.put("schedule", execution.getTrigger().getVariables());
}
}
if (execution.getLabels() != null) {
builder.put("labels", Label.toNestedMap(execution.getLabels()));
}
if (execution.getVariables() != null) {
builder.putAll(execution.getVariables());
}
if (flow == null) {
Flow flowFromExecution = Flow.builder()
.id(execution.getFlowId())
@@ -319,6 +321,20 @@ public final class RunVariables {
}
}
// variables
Optional.ofNullable(execution)
.map(Execution::getVariables)
.or(() -> Optional.ofNullable(flow).map(FlowInterface::getVariables))
.map(HashMap::new)
.ifPresent(variables -> {
Object fixtureFiles = variables.remove(FIXTURE_FILES_KEY);
builder.put("vars", ImmutableMap.copyOf(variables));
if (fixtureFiles != null) {
builder.put("files", fixtureFiles);
}
});
// Kestra configuration
if (kestraConfiguration != null) {
Map<String, String> kestra = HashMap.newHashMap(2);

View File

@@ -45,7 +45,7 @@ final class Secret {
for (var entry: data.entrySet()) {
if (entry.getValue() instanceof Map map) {
// if some value are of type EncryptedString we decode them and replace the object
if (EncryptedString.TYPE.equalsIgnoreCase((String)map.get("type"))) {
if (map.get("type") instanceof String typeStr && EncryptedString.TYPE.equalsIgnoreCase(typeStr)) {
try {
String decoded = decrypt((String) map.get("value"));
decryptedMap.put(entry.getKey(), decoded);

View File

@@ -4,7 +4,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.metrics.MetricRegistry;
@@ -19,6 +18,7 @@ import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.server.*;
import io.kestra.core.services.LabelService;
import io.kestra.core.services.LogService;
import io.kestra.core.services.MaintenanceService;
import io.kestra.core.services.VariablesService;
import io.kestra.core.services.WorkerGroupService;
import io.kestra.core.storages.StorageContext;
@@ -158,6 +158,9 @@ public class Worker implements Service, Runnable, AutoCloseable {
private TracerFactory tracerFactory;
private Tracer tracer;
@Inject
private MaintenanceService maintenanceService;
/**
* Creates a new {@link Worker} instance.
*
@@ -285,8 +288,12 @@ public class Worker implements Service, Runnable, AutoCloseable {
));
this.clusterEventQueue.ifPresent(clusterEventQueueInterface -> this.receiveCancellations.addFirst(clusterEventQueueInterface.receive(this::clusterEventQueue)));
if (this.maintenanceService.isInMaintenanceMode()) {
enterMaintenance();
} else {
setState(ServiceState.RUNNING);
}
setState(ServiceState.RUNNING);
if (workerGroupKey != null) {
log.info("Worker started with {} thread(s) in group '{}'", numThreads, workerGroupKey);
}
@@ -304,21 +311,25 @@ public class Worker implements Service, Runnable, AutoCloseable {
ClusterEvent clusterEvent = either.getLeft();
log.info("Cluster event received: {}", clusterEvent);
switch (clusterEvent.eventType()) {
case MAINTENANCE_ENTER -> {
this.executionKilledQueue.pause();
this.workerJobQueue.pause();
this.setState(ServiceState.MAINTENANCE);
}
case MAINTENANCE_EXIT -> {
this.executionKilledQueue.resume();
this.workerJobQueue.resume();
this.setState(ServiceState.RUNNING);
}
case MAINTENANCE_ENTER -> enterMaintenance();
case MAINTENANCE_EXIT -> exitMaintenance();
}
}
private void enterMaintenance() {
this.executionKilledQueue.pause();
this.workerJobQueue.pause();
this.setState(ServiceState.MAINTENANCE);
}
private void exitMaintenance() {
this.executionKilledQueue.resume();
this.workerJobQueue.resume();
this.setState(ServiceState.RUNNING);
}
private void setState(final ServiceState state) {
this.state.set(state);
Map<String, Object> properties = new HashMap<>();
@@ -395,11 +406,16 @@ public class Worker implements Service, Runnable, AutoCloseable {
} catch (IllegalVariableEvaluationException e) {
RunContextLogger contextLogger = runContextLoggerFactory.create(currentWorkerTask);
contextLogger.logger().error("Failed evaluating runIf: {}", e.getMessage(), e);
try {
this.workerTaskResultQueue.emit(new WorkerTaskResult(workerTask.fail()));
} catch (QueueException ex) {
log.error("Unable to emit the worker task result for task {} taskrun {}", currentWorkerTask.getTask().getId(), currentWorkerTask.getTaskRun().getId(), e);
}
} catch (QueueException e) {
log.error("Unable to emit the worker task result for task {} taskrun {}", currentWorkerTask.getTask().getId(), currentWorkerTask.getTaskRun().getId(), e);
}
if (workerTaskResult.getTaskRun().getState().isFailed() && !currentWorkerTask.getTask().isAllowFailure()) {
if (workerTaskResult == null || workerTaskResult.getTaskRun().getState().isFailed() && !currentWorkerTask.getTask().isAllowFailure()) {
break;
}
@@ -624,7 +640,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
));
}
if (killedExecution.contains(workerTask.getTaskRun().getExecutionId())) {
if (! Boolean.TRUE.equals(workerTask.getTaskRun().getForceExecution()) && killedExecution.contains(workerTask.getTaskRun().getExecutionId())) {
WorkerTaskResult workerTaskResult = new WorkerTaskResult(workerTask.getTaskRun().withState(KILLED));
try {
this.workerTaskResultQueue.emit(workerTaskResult);
@@ -685,6 +701,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
workerTask = workerTask.withTaskRun(workerTask.getTaskRun().withState(state));
WorkerTaskResult workerTaskResult = new WorkerTaskResult(workerTask.getTaskRun(), dynamicTaskRuns);
this.workerTaskResultQueue.emit(workerTaskResult);
return workerTaskResult;
} catch (QueueException e) {
@@ -695,6 +712,10 @@ public class Worker implements Service, Runnable, AutoCloseable {
// If it's a message too big, we remove the outputs
failed = failed.withOutputs(Variables.empty());
}
if (e instanceof UnsupportedMessageException) {
// we expect the offending char is in the output so we remove it
failed = failed.withOutputs(Variables.empty());
}
WorkerTaskResult workerTaskResult = new WorkerTaskResult(failed);
RunContextLogger contextLogger = runContextLoggerFactory.create(workerTask);
contextLogger.logger().error("Unable to emit the worker task result to the queue: {}", e.getMessage(), e);
@@ -776,7 +797,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
if (!(workerTask.getTask() instanceof RunnableTask<?> task)) {
// This should never happen but better to deal with it than crashing the Worker
var state = workerTask.getTask().isAllowFailure() ? workerTask.getTask().isAllowWarning() ? SUCCESS : WARNING : FAILED;
var state = State.Type.fail(workerTask.getTask());
TaskRunAttempt attempt = TaskRunAttempt.builder()
.state(new io.kestra.core.models.flows.State().withState(state))
.workerId(this.id)

View File

@@ -0,0 +1,20 @@
package io.kestra.core.runners;
/**
* State store containing all workers' jobs in RUNNING state.
*
* @see WorkerJob
*/
public interface WorkerJobRunningStateStore {
/**
* Deletes a running worker job for the given key.
*
* <p>
* A key can be a {@link WorkerTask} Task Run ID.
* </p>
*
* @param key the key of the worker job to be deleted.
*/
void deleteByKey(String key);
}

View File

@@ -48,7 +48,7 @@ public class WorkerTask extends WorkerJob {
* @return this worker task, updated
*/
public TaskRun fail() {
var state = this.task.isAllowFailure() ? this.task.isAllowWarning() ? State.Type.SUCCESS : State.Type.WARNING : State.Type.FAILED;
var state = State.Type.fail(task);
return this.getTaskRun().withState(state);
}
}

View File

@@ -24,7 +24,7 @@ public class DateAddFilter extends AbstractDate implements Filter {
return null;
}
final Long amount = (Long) args.get("amount");
final Long amount = getAsLong(args.get("amount"), lineNumber, self);
final String unit = (String) args.get("unit");
final String timeZone = (String) args.get("timeZone");
final String existingFormat = (String) args.get("existingFormat");
@@ -36,4 +36,24 @@ public class DateAddFilter extends AbstractDate implements Filter {
return format(plus, args, context);
}
public static Long getAsLong(Object value, int lineNumber, PebbleTemplate self) {
if (value instanceof Long longValue) {
return longValue;
} else if (value instanceof Integer integerValue) {
return integerValue.longValue();
} else if (value instanceof Number numberValue) {
return numberValue.longValue();
} else if (value instanceof String stringValue) {
try {
return Long.parseLong(stringValue);
} catch (NumberFormatException e) {
throw new PebbleException(e, "%s can't be converted to long".formatted(stringValue),
lineNumber, self != null ? self.getName() : "Unknown");
}
}
throw new PebbleException(null, "Incorrect %s format, must be a number".formatted(value),
lineNumber, self != null ? self.getName() : "Unknown");
}
}

View File

@@ -30,10 +30,7 @@ import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.server.ServiceType;
import io.kestra.core.services.*;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.Either;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.*;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.core.util.CollectionUtils;
@@ -86,12 +83,15 @@ public abstract class AbstractScheduler implements Scheduler, Service {
private final LogService logService;
protected SchedulerExecutionStateInterface executionState;
private final WorkerGroupExecutorInterface workerGroupExecutorInterface;
private final MaintenanceService maintenanceService;
// must be volatile as it's updated by the flow listener thread and read by the scheduleExecutor thread
private volatile Boolean isReady = false;
private final ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> scheduledFuture;
private final ScheduledExecutorService executionMonitorExecutor = Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> executionMonitorFuture;
@Getter
protected SchedulerTriggerStateInterface triggerState;
@@ -136,6 +136,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
this.serviceStateEventPublisher = applicationContext.getBean(ApplicationEventPublisher.class);
this.executionEventPublisher = applicationContext.getBean(ApplicationEventPublisher.class);
this.workerGroupExecutorInterface = applicationContext.getBean(WorkerGroupExecutorInterface.class);
this.maintenanceService = applicationContext.getBean(MaintenanceService.class);
setState(ServiceState.CREATED);
}
@@ -150,7 +152,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
this.flowListeners.run();
this.flowListeners.listen(this::initializedTriggers);
ScheduledFuture<?> evaluationLoop = scheduleExecutor.scheduleAtFixedRate(
scheduledFuture = scheduleExecutor.scheduleAtFixedRate(
this::handle,
0,
1,
@@ -160,10 +162,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
// look at exception on the evaluation loop thread
Thread.ofVirtual().name("scheduler-evaluation-loop-watch").start(
() -> {
Await.until(evaluationLoop::isDone);
Await.until(scheduledFuture::isDone);
try {
evaluationLoop.get();
scheduledFuture.get();
} catch (CancellationException ignored) {
} catch (ExecutionException | InterruptedException e) {
@@ -175,7 +177,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
);
// Periodically report metrics and logs of running executions
ScheduledFuture<?> monitoringLoop = executionMonitorExecutor.scheduleWithFixedDelay(
executionMonitorFuture = executionMonitorExecutor.scheduleWithFixedDelay(
this::executionMonitor,
30,
10,
@@ -185,10 +187,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
// look at exception on the monitoring loop thread
Thread.ofVirtual().name("scheduler-monitoring-loop-watch").start(
() -> {
Await.until(monitoringLoop::isDone);
Await.until(executionMonitorFuture::isDone);
try {
monitoringLoop.get();
executionMonitorFuture.get();
} catch (CancellationException ignored) {
} catch (ExecutionException | InterruptedException e) {
@@ -289,14 +291,19 @@ public abstract class AbstractScheduler implements Scheduler, Service {
// listen to cluster events
this.clusterEventQueue.ifPresent(clusterEventQueueInterface -> this.receiveCancellations.addFirst(((QueueInterface<ClusterEvent>) clusterEventQueueInterface).receive(this::clusterEventQueue)));
setState(ServiceState.RUNNING);
if (this.maintenanceService.isInMaintenanceMode()) {
enterMaintenance();
} else {
setState(ServiceState.RUNNING);
}
log.info("Scheduler started");
}
// Initialized local trigger state,
// and if some flows were created outside the box, for example from the CLI,
// then we may have some triggers that are not created yet.
/* FIXME: There is a race between Kafka stream consumption & initializedTriggers: we can override a trigger update coming from a stream consumption with an old one because stream consumption is not waiting for trigger initialization
* Example: we see a SUCCESS execution so we reset the trigger's executionId but then the initializedTriggers resubmits an old trigger state for some reasons (evaluationDate for eg.) */
private void initializedTriggers(List<FlowWithSource> flows) {
record FlowAndTrigger(FlowWithSource flow, AbstractTrigger trigger) {
@Override
@@ -366,10 +373,13 @@ public abstract class AbstractScheduler implements Scheduler, Service {
this.triggerState.update(lastUpdate);
}
} else if (recoverMissedSchedules == RecoverMissedSchedules.NONE) {
lastUpdate = trigger.get().toBuilder().nextExecutionDate(schedule.nextEvaluationDate()).build();
} else {
ZonedDateTime nextEvaluationDate = schedule.nextEvaluationDate();
if (recoverMissedSchedules == RecoverMissedSchedules.NONE && !Objects.equals(trigger.get().getNextExecutionDate(), nextEvaluationDate)) {
lastUpdate = trigger.get().toBuilder().nextExecutionDate(nextEvaluationDate).build();
this.triggerState.update(lastUpdate);
this.triggerState.update(lastUpdate);
}
}
// Used for schedulableNextDate
FlowWithWorkerTrigger flowWithWorkerTrigger = FlowWithWorkerTrigger.builder()
@@ -399,31 +409,35 @@ public abstract class AbstractScheduler implements Scheduler, Service {
ClusterEvent clusterEvent = either.getLeft();
log.info("Cluster event received: {}", clusterEvent);
switch (clusterEvent.eventType()) {
case MAINTENANCE_ENTER -> {
this.executionQueue.pause();
this.triggerQueue.pause();
this.workerJobQueue.pause();
this.workerTriggerResultQueue.pause();
this.executionKilledQueue.pause();
this.pauseAdditionalQueues();
this.isPaused.set(true);
this.setState(ServiceState.MAINTENANCE);
}
case MAINTENANCE_EXIT -> {
this.executionQueue.resume();
this.triggerQueue.resume();
this.workerJobQueue.resume();
this.workerTriggerResultQueue.resume();
this.executionKilledQueue.resume();
this.resumeAdditionalQueues();
this.isPaused.set(false);
this.setState(ServiceState.RUNNING);
}
case MAINTENANCE_ENTER -> enterMaintenance();
case MAINTENANCE_EXIT -> exitMaintenance();
}
}
private void enterMaintenance() {
this.executionQueue.pause();
this.triggerQueue.pause();
this.workerJobQueue.pause();
this.workerTriggerResultQueue.pause();
this.executionKilledQueue.pause();
this.pauseAdditionalQueues();
this.isPaused.set(true);
this.setState(ServiceState.MAINTENANCE);
}
private void exitMaintenance() {
this.executionQueue.resume();
this.triggerQueue.resume();
this.workerJobQueue.resume();
this.workerTriggerResultQueue.resume();
this.executionKilledQueue.resume();
this.resumeAdditionalQueues();
this.isPaused.set(false);
this.setState(ServiceState.RUNNING);
}
protected void resumeAdditionalQueues() {
// by default: do nothing
}
@@ -996,6 +1010,9 @@ public abstract class AbstractScheduler implements Scheduler, Service {
}
setState(ServiceState.TERMINATING);
this.receiveCancellations.forEach(Runnable::run);
ExecutorsUtils.closeScheduledThreadPool(this.scheduleExecutor, Duration.ofSeconds(5), List.of(scheduledFuture));
ExecutorsUtils.closeScheduledThreadPool(executionMonitorExecutor, Duration.ofSeconds(5), List.of(executionMonitorFuture));
try {
if (onClose != null) {
onClose.run();
@@ -1003,9 +1020,6 @@ public abstract class AbstractScheduler implements Scheduler, Service {
} catch (Exception e) {
log.error("Unexpected error while terminating scheduler.", e);
}
this.receiveCancellations.forEach(Runnable::run);
this.scheduleExecutor.shutdown();
this.executionMonitorExecutor.shutdown();
setState(ServiceState.TERMINATED_GRACEFULLY);
if (log.isDebugEnabled()) {

View File

@@ -5,6 +5,7 @@ import com.amazon.ion.IonSystem;
import com.amazon.ion.system.*;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
@@ -36,6 +37,8 @@ import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import static com.fasterxml.jackson.core.StreamReadConstraints.DEFAULT_MAX_STRING_LEN;
public final class JacksonMapper {
public static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<>() {};
public static final TypeReference<List<Object>> LIST_TYPE_REFERENCE = new TypeReference<>() {};
@@ -43,6 +46,12 @@ public final class JacksonMapper {
private JacksonMapper() {}
static {
StreamReadConstraints.overrideDefaultStreamReadConstraints(
StreamReadConstraints.builder().maxNameLength(DEFAULT_MAX_STRING_LEN).build()
);
}
private static final ObjectMapper MAPPER = JacksonMapper.configure(
new ObjectMapper()
);
@@ -52,7 +61,7 @@ public final class JacksonMapper {
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
public static ObjectMapper ofJson() {
return MAPPER;
return JacksonMapper.ofJson(true);
}
public static ObjectMapper ofJson(boolean strict) {

View File

@@ -1,6 +1,7 @@
package io.kestra.core.server;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.utils.ExecutorsUtils;
import io.micronaut.core.annotation.Introspected;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
@@ -8,9 +9,11 @@ import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -25,6 +28,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
protected final ServerConfig serverConfig;
private final AtomicBoolean isStopped = new AtomicBoolean(false);
private ScheduledExecutorService scheduledExecutorService;
private ScheduledFuture<?> scheduledFuture;
private Instant lastScheduledExecution;
/**
@@ -98,7 +102,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, name));
Duration scheduleInterval = getScheduleInterval();
log.debug("Scheduling '{}' at fixed rate {}.", name, scheduleInterval);
scheduledExecutorService.scheduleAtFixedRate(
scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(
this,
0,
scheduleInterval.toSeconds(),
@@ -133,20 +137,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
@Override
public void close() {
if (isStopped.compareAndSet(false, true) && scheduledExecutorService != null) {
scheduledExecutorService.shutdown();
if (scheduledExecutorService.isTerminated()) {
return;
}
try {
if (!scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS)) {
log.debug("Failed to wait for scheduled '{}' task termination. Cause: Timeout", name);
}
log.debug("Stopped scheduled '{}' task.", name);
} catch (InterruptedException e) {
scheduledExecutorService.shutdownNow();
Thread.currentThread().interrupt();
log.debug("Failed to wait for scheduled '{}' task termination. Cause: Interrupted.", name);
}
ExecutorsUtils.closeScheduledThreadPool(scheduledExecutorService, Duration.ofSeconds(5), List.of(scheduledFuture));
}
}
}

View File

@@ -111,7 +111,7 @@ public interface Service extends AutoCloseable {
* </pre>
*/
enum ServiceState {
CREATED(1, 2, 3), // 0
CREATED(1, 2, 3, 4, 9), // 0
RUNNING(2, 3, 4, 9), // 1
ERROR(4), // 2
DISCONNECTED(4, 7), // 3

View File

@@ -215,9 +215,10 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
stateLock.lock();
// Optional callback to be executed at the end.
Runnable returnCallback = null;
localServiceState = localServiceState(service);
try {
localServiceState = localServiceState(service);
if (localServiceState == null) {
return null; // service has been unregistered.
}
@@ -266,7 +267,7 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
// Update the local instance
this.serviceRegistry.register(localServiceState.with(remoteInstance));
} catch (Exception e) {
final ServiceInstance localInstance = localServiceState(service).instance();
final ServiceInstance localInstance = localServiceState.instance();
log.error("[Service id={}, type='{}', hostname='{}'] Failed to update state to {}. Error: {}",
localInstance.uid(),
localInstance.type(),
@@ -282,7 +283,7 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
returnCallback.run();
}
}
return localServiceState(service).instance();
return Optional.ofNullable(localServiceState(service)).map(LocalServiceState::instance).orElse(null);
}
private void mayDisableStateUpdate(final Service service, final ServiceInstance instance) {
@@ -336,9 +337,11 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
final Service service,
final ServiceInstance instance,
final boolean isLivenessEnabled) {
// Never shutdown STANDALONE server or WEB_SERVER service.
if (instance.server().type().equals(ServerInstance.Type.STANDALONE) ||
instance.is(ServiceType.WEBSERVER)) {
// Never shutdown STANDALONE server or WEBSERVER and INDEXER services.
if (ServerInstance.Type.STANDALONE.equals(instance.server().type()) ||
instance.is(ServiceType.INDEXER) ||
instance.is(ServiceType.WEBSERVER)
) {
// Force the RUNNING state.
return Optional.of(instance.state(Service.ServiceState.RUNNING, now, null));
}

View File

@@ -212,7 +212,16 @@ public class ExecutionService {
// We need to remove global error tasks and flowable error tasks if any
flow
.allErrorsWithChilds()
.allErrorsWithChildren()
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
// We need to remove global finally tasks and flowable error tasks if any
flow
.allFinallyWithChildren()
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
// We need to remove afterExecution tasks
ListUtils.emptyOnNull(flow.getAfterExecution())
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
// Build and launch new execution
@@ -315,6 +324,32 @@ public class ExecutionService {
return revision != null ? newExecution.withFlowRevision(revision) : newExecution;
}
public Execution changeTaskRunState(final Execution execution, Flow flow, String taskRunId, State.Type newState) throws Exception {
Execution newExecution = markAs(execution, flow, taskRunId, newState);
// if the execution was terminated, it could have executed errors/finally/afterExecutions, we must remove them as the execution will be restarted
if (execution.getState().isTerminated()) {
List<TaskRun> newTaskRuns = newExecution.getTaskRunList();
// We need to remove global error tasks and flowable error tasks if any
flow
.allErrorsWithChildren()
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
// We need to remove global finally tasks and flowable error tasks if any
flow
.allFinallyWithChildren()
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
// We need to remove afterExecution tasks
ListUtils.emptyOnNull(flow.getAfterExecution())
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
return newExecution.withTaskRunList(newTaskRuns);
} else {
return newExecution;
}
}
public Execution markAs(final Execution execution, FlowInterface flow, String taskRunId, State.Type newState) throws Exception {
return this.markAs(execution, flow, taskRunId, newState, null);
}

View File

@@ -176,7 +176,7 @@ public class FlowService {
previous :
FlowWithSource.of(flowToImport.toBuilder().revision(previous.getRevision() + 1).build(), source)
)
.orElseGet(() -> FlowWithSource.of(flowToImport, source).toBuilder().revision(1).build());
.orElseGet(() -> FlowWithSource.of(flowToImport, source).toBuilder().tenantId(tenantId).revision(1).build());
} else {
return maybeExisting
.map(previous -> repository().update(flow, previous))

View File

@@ -0,0 +1,14 @@
package io.kestra.core.services;
import jakarta.inject.Singleton;
@Singleton
public class MaintenanceService {
/**
* @return true if the cluster is in maintenance mode
*/
public boolean isInMaintenanceMode() {
// maintenance mode is an EE feature
return false;
}
}

View File

@@ -168,18 +168,15 @@ public class PluginDefaultService {
try {
return this.injectAllDefaults(flow, false);
} catch (Exception e) {
RunContextLogger
.logEntries(
Execution.loggingEventFromException(e),
LogEntry.of(execution)
)
.forEach(logEntry -> {
try {
logQueue.emitAsync(logEntry);
} catch (QueueException e1) {
// silently do nothing
}
});
try {
logQueue.emitAsync(RunContextLogger
.logEntries(
Execution.loggingEventFromException(e),
LogEntry.of(execution)
));
} catch (QueueException e1) {
// silently do nothing
}
return readWithoutDefaultsOrThrow(flow);
}
}

View File

@@ -5,16 +5,19 @@ import io.kestra.core.test.TestState;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import java.net.URI;
import java.util.List;
public record UnitTestResult(
@NotNull
String unitTestId,
String testId,
@NotNull
String unitTestType,
String testType,
@NotNull
String executionId,
@NotNull
URI url,
@NotNull
TestState state,
@NotNull
List<AssertionResult> assertionResults,
@@ -22,14 +25,13 @@ public record UnitTestResult(
List<AssertionRunError> errors,
Fixtures fixtures
) {
public static UnitTestResult of(String unitTestId, String unitTestType, String executionId, List<AssertionResult> results, List<AssertionRunError> errors, @Nullable Fixtures fixtures) {
public static UnitTestResult of(String unitTestId, String unitTestType, String executionId, URI url, List<AssertionResult> results, List<AssertionRunError> errors, @Nullable Fixtures fixtures) {
TestState state;
if(!errors.isEmpty()){
state = TestState.ERROR;
} else {
state = results.stream().anyMatch(assertion -> !assertion.isSuccess()) ? TestState.FAILED : TestState.SUCCESS;
}
return new UnitTestResult(unitTestId, unitTestType, executionId, state, results, errors, fixtures);
return new UnitTestResult(unitTestId, unitTestType, executionId, url, state, results, errors, fixtures);
}
}

View File

@@ -3,12 +3,16 @@ package io.kestra.core.utils;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.*;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
@Singleton
@Slf4j
public class ExecutorsUtils {
@Inject
private ThreadMainFactoryBuilder threadFactoryBuilder;
@@ -61,6 +65,29 @@ public class ExecutorsUtils {
);
}
public static void closeScheduledThreadPool(ScheduledExecutorService scheduledExecutorService, Duration gracePeriod, List<ScheduledFuture<?>> taskFutures) {
scheduledExecutorService.shutdown();
if (scheduledExecutorService.isTerminated()) {
return;
}
try {
if (!scheduledExecutorService.awaitTermination(gracePeriod.toMillis(), TimeUnit.MILLISECONDS)) {
log.warn("Failed to shutdown the ScheduledThreadPoolExecutor during grace period, forcing it to shutdown now");
// Ensure the scheduled task reaches a terminal state to avoid possible memory leak
ListUtils.emptyOnNull(taskFutures).forEach(taskFuture -> taskFuture.cancel(true));
scheduledExecutorService.shutdownNow();
}
log.debug("Stopped scheduled ScheduledThreadPoolExecutor.");
} catch (InterruptedException e) {
scheduledExecutorService.shutdownNow();
Thread.currentThread().interrupt();
log.debug("Failed to shutdown the ScheduledThreadPoolExecutor.");
}
}
private ExecutorService wrap(String name, ExecutorService executorService) {
return ExecutorServiceMetrics.monitor(
meterRegistry,

View File

@@ -73,7 +73,7 @@ public class GraphUtils {
)))
.orElse(Collections.emptyMap());
triggersDeclarations.forEach(trigger -> {
triggersDeclarations.stream().filter(trigger -> trigger != null).forEach(trigger -> {
GraphTrigger triggerNode = new GraphTrigger(trigger, triggersById.get(trigger.getId()));
triggerCluster.addNode(triggerNode);
triggerCluster.addEdge(triggerCluster.getRoot(), triggerNode, new Relation());

View File

@@ -1,5 +1,7 @@
package io.kestra.core.utils;
import io.kestra.core.models.Setting;
import io.kestra.core.repositories.SettingRepositoryInterface;
import io.micronaut.context.env.Environment;
import io.micronaut.context.env.PropertiesPropertySourceLoader;
import io.micronaut.context.env.PropertySource;
@@ -29,6 +31,9 @@ public class VersionProvider {
@Inject
private Environment environment;
@Inject
private Optional<SettingRepositoryInterface> settingRepository; // repositories are not always there on unit tests
@PostConstruct
public void start() {
final Optional<PropertySource> gitProperties = new PropertiesPropertySourceLoader()
@@ -40,6 +45,18 @@ public class VersionProvider {
this.revision = loadRevision(gitProperties);
this.date = loadTime(gitProperties);
this.version = loadVersion(buildProperties, gitProperties);
// check the version in the settings and update if needed, we did't use it would allow us to detect incompatible update later if needed
if (settingRepository.isPresent()) {
Optional<Setting> versionSetting = settingRepository.get().findByKey(Setting.INSTANCE_VERSION);
if (versionSetting.isEmpty() || !versionSetting.get().getValue().equals(this.version)) {
settingRepository.get().save(Setting.builder()
.key(Setting.INSTANCE_VERSION)
.value(this.version)
.build()
);
}
}
}
private String loadVersion(final Optional<PropertySource> buildProperties,

Some files were not shown because too many files have changed in this diff Show More