Compare commits

...

181 Commits

Author SHA1 Message Date
github-actions[bot]
b09b1fdafe chore(version): update to version '0.23.14' 2025-09-02 12:21:34 +00:00
Roman Acevedo
adcab1893b ci: fix setversion-tag.yml not triggering a main.yml job on a pushed tag
the missing token: ${{ secrets.GH_PERSONAL_TOKEN }} is the only difference between this CI and EE CI, so it is probably the right fix

# Conflicts:
#	.github/workflows/setversion-tag.yml
2025-09-02 10:40:47 +02:00
Loïc Mathieu
5710c79954 fix(executions): clear errors/finally/afterExecution branches when changing the state of a taskrun
As changing the state of a taskrun will restart the flow, if we didn't clear those branches, the flow would not resart properly.

Fixes https://github.com/kestra-io/kestra-ee/issues/3211
2025-08-29 16:25:29 +02:00
github-actions[bot]
55a2384253 chore(version): update to version '0.23.13' 2025-08-26 10:35:39 +00:00
brian.mulier
4975c907a7 fix(logs): emitAsync is now keeping messages order 2025-08-25 16:54:13 +02:00
brian.mulier
87d508648d fix(logs): higher max message length to keep stacktraces in a single log 2025-08-25 16:53:34 +02:00
brian.mulier
85da1089ec chore(deps): bump Micronaut platform to 4.9.2
closes #10626
closes #10788
2025-08-25 16:53:34 +02:00
Loïc Mathieu
68e1b9c80f fix(system): properly close the ScheduledExecutorService tasks
This avoids having running threads while the component is supposed to be closed.
2025-08-20 15:52:48 +02:00
nKwiatkowski
21c24e0349 fix: retry flaky test 2025-08-20 11:51:48 +02:00
nKwiatkowski
ed8a908b22 fix: retry flaky test 2025-08-20 11:50:49 +02:00
Nicolas K.
86d97bed77 fix(test): disable kafka concurrency queue test (#10755)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-08-19 18:12:59 +02:00
nKwiatkowski
f2c3cf4f8c chore: update version to 0.23.12 2025-08-19 15:17:56 +02:00
brian.mulier
b3c896eccb fix(core): change cache policy on files returned by webserver that needs to stay fresh
closes #7499
2025-08-19 13:13:53 +02:00
brian.mulier
58d36f5948 fix(dashboards): quick fix to avoid infinite refresh and restore refresh dashboard feature 2025-08-19 13:05:31 +02:00
Florian Hussonnois
922a655a4c fix(core): fix preconditions rendering for ExecutionOutputs (#10651)
Ensure that preconditions are always re-rendered for any
new executions

Changes:
* add new fluent skipCache methods on RunContextProperty and Property
  classes

Fix: #10651
2025-08-18 21:00:06 +02:00
YannC.
94f0e211ba fix: compilation issue 2025-08-14 15:45:19 +02:00
Loïc Mathieu
2b590bf955 fix(execution): parallel flowable may not ends all child flowable
Parallel flowable tasks like `Parallel`, `Dag` and `ForEach` are racy. When a task fail in a branch, other concurrent branches that have flowable may never ends.
We make sure that all children are terminated when a flowable is itself terminated.

Fixes #6780
2025-08-14 12:25:56 +02:00
Loïc Mathieu
b61eeaff8c fix(execution): concurrency limit didn't work with afterExecutions
This is because the execution is never considered fully terminated so concurrency limit is not handled properly.
This should also affect SLA, trigger lock, and other cleaning stuff.

The root issue is that, with a worker task from afterExecution, there are no other update on the execution itself (as it's already terminated) so no execution messages are again processed by the executor.

Because of that, the worker task result message from the afterExecution block is the last message, but unfortunatly as messages from the worker task result have no flow attached, the computation of the final termination is incorrect.
The solution is to load the flow if null inside the executor and the execution is terminated which should only occurs inside afterExecution.

Fixes #10657
Fixes #8459
Fixes #8609
2025-08-13 09:32:25 +02:00
Prayag
26d7fa47d3 fix(core): Enter key is now validating filter / refreshing data (#9630)
closes #9471

---------

Co-authored-by: brian.mulier <bmmulier@hotmail.fr>
2025-08-12 17:29:14 +02:00
Loïc Mathieu
bece420c9a fix(executions): SLA monitor should take into account restarted executions 2025-08-12 11:49:14 +02:00
Loïc Mathieu
081066888f fix(executions): concurrency limit exceeded when restarting an execution
Fixes #7880
2025-08-12 11:49:02 +02:00
YannC.
c721fe68a7 chore: update version to 0.23.11 2025-08-12 10:47:34 +02:00
Loïc Mathieu
88c77084f5 fix(executions): correctly fail the request when trying to resume an execution with the wrong inputs
Fixes #9959
2025-08-12 09:40:23 +02:00
brian.mulier
3846ac87e3 fix(dashboard): properly use time filters in queries
closes kestra-io/kestra-ee#4389
2025-08-11 22:29:56 +02:00
Nicolas K.
b9c843f01d feat(releases): add test jar to meven central deployment (#10675)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-08-11 17:17:06 +02:00
brian.mulier
9686718130 tests(core): add a test to taskrunners to ensure it's working multiple times on the same working directory
part of kestra-io/plugin-ee-kubernetes#45
2025-08-11 15:00:32 +02:00
Loïc Mathieu
da40f46b4a fix(executions): properly fail the task if it contains unsupported unicode sequence
This occurs in Postgres using the `\u0000` unicode sequence. Postgres refuse to store any JSONB with this sequence as it has no textual representation.
We now properly detect that and fail the task.

Fixes #10326
2025-08-11 11:54:35 +02:00
Piyush Bhaskar
4ae207ed81 fix(flows): copy trigger url propely. (#10645) 2025-08-08 13:14:49 +05:30
brian.mulier
0a2fa4d3b2 fix(core): ensure props with defaults are not marked as required in generated doc 2025-08-07 15:09:46 +02:00
brian.mulier
995d5c1ac2 fix(core): wrong @NotNull import leading to key not being marked as required
closes #9287
2025-08-07 15:09:46 +02:00
Loïc Mathieu
92bf135c02 fix(test): RestactCaseTest.restartFailedWithFinally() should use executionService.isTerminated() 2025-08-06 09:56:43 +02:00
brian.mulier
d318281342 chore(version): upgrade to version 0.23.10 2025-08-05 10:56:31 +02:00
Loïc Mathieu
3f68749276 fix(executions): Don't create outputs from the Subflow task when we didn't wait
As, well, if we didn't wait for the subflow execution, we cannot have access to its outputs.
2025-07-31 13:07:46 +02:00
brian.mulier
bc07dfbf1c fix(core): avoid follow execution from being discarded too early
closes #10472
closes #7623
2025-07-31 10:26:16 +02:00
Piyush Bhaskar
3b3aa495b0 fix(core): update running count and status of executions in concurrency (#10418) 2025-07-29 18:30:40 +05:30
Loïc Mathieu
a945780e4d chore(version): upgrade to version 0.23.9 2025-07-29 14:57:00 +02:00
Roman Acevedo
d512f86927 fix(cli): tenantService was injected directly, this is not working in cli 2025-07-29 14:42:33 +02:00
github-actions[bot]
7f355dd730 chore(core): localize to languages other than english (#10405)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-29 10:50:29 +02:00
Loïc Mathieu
ffa33b1a7a chore(version): version 0.23.8 2025-07-29 10:11:12 +02:00
Roman Acevedo
a5b4ec3b2e fix(triggers): bulk action on triggers did not take into account this is async (#10307) 2025-07-29 09:31:43 +02:00
Miloš Paunović
5585e9df47 fix(namespaces): make sure the namespace parameter is properly passed when reading a file (#10384)
Relates to https://github.com/kestra-io/kestra/issues/10363.
Relates to https://github.com/kestra-io/kestra-ee/issues/4514.
2025-07-29 09:20:29 +02:00
YannC
f8cb335a16 fix: set postgres and mysql queue offset as a bigint (#10344) 2025-07-28 16:28:47 +02:00
Loïc Mathieu
af9129f900 fix(core): compilation issue 2025-07-28 15:08:57 +02:00
yuri
177ba35e15 fix(core): amend misc label-related issues (#10044)
* fix(core): amend misc label-related issues

* re-enabled bulk update of label value
* re-enabled merging flow-execution labels by key
* made duplicated keys rejection readable
* forced multiple validations within `RequestUtils`
* ensured existing labels can be overriden
* added multiple tests validating complex scenarios

BREAKING CHANGE: switched from first to last label value override
BREAKING CHANGE: preventing empty key/value labels
BREAKING CHANGE: preventing whitespace in key

* fix(core): reflect feedback

* Deduplicated a list inside the `Labels` task.
* Worked around label mutation at `Worker`.
* Attempted to deduplicate labels within `Execution` as possible.

* fix(core): remove irrelevant changes
2025-07-28 11:31:19 +02:00
Florian Hussonnois
b99946deff fix(system): avoid potential NPE in ServiceLivenessManager (#10338)
Avoid a potential NPE in ServiceLivenessManager when
a local service is unregistered during shutdown before the liveness probe completes

Fix: #10338
2025-07-25 12:34:51 +02:00
Florian Hussonnois
19428ad344 fix(system): ignore state transition failure for indexer
Fix: kestra-io/kestra-ee#4474
2025-07-25 12:34:44 +02:00
Loïc Mathieu
162764ff0d fix(executions): flow concurrency limit not honors when executions are created at a high rate
This is due to the fact that we now process the execution queue concurrently so there is a race when counting currently running executions. This can be seen easily using a ForEachItem as it could create tens or hundreds of executions almost instantly leading to almost all those executions started as they would all see 0 executions running...

Using a dedicated execution running queue, as done in EE, would serialize the messages and fix the issue.

However, if using multiple executor instances and concurrency limit = 1, there is a theoretical race as no locks will be done if no execution is running. A max surge of executions could be as high as the number of executor but this race is less probable to happen in real world scenario.

Fixes #10167
2025-07-25 12:06:26 +02:00
Florian Hussonnois
ccd7b43b97 fix(core): fix search filtering based on endDate field (kestra-io/kestra-ee#4446)
Related-to: kestra-io/kestra-ee#4446
2025-07-25 11:14:35 +02:00
Loïc Mathieu
53f881ed60 fix(executions): race condition inside nested ForEach with concurrency
Fixes #10167
2025-07-25 09:46:46 +02:00
Piyush Bhaskar
0759aaeae8 fix(executions): update query parameter for state filtering (#10315) 2025-07-24 14:42:01 +05:30
Miloš Paunović
fc8b389d09 fix(executions): make sure outputs do not overflow over right drawer (#10238)
Closes https://github.com/kestra-io/kestra/issues/10232.
2025-07-24 10:40:22 +02:00
MilosPaunovic
8355eb191e chore(flows): show small execution charts on flow listing
Co-authored-by: YannC. <ycoornaert@kestra.io>
2025-07-24 10:34:32 +02:00
Piyush Bhaskar
50f72f8ea3 fix(core): check null uri (#10309) 2025-07-24 12:48:42 +05:30
Nicolas K.
ae14d980a4 fix(cli): #10062 add tenant to load flows properly at the startup (#10290)
* fix(cli): #10062 add tenant to load flows properly at the startup

* fix(cli): #10062 add fallback tenant to ee service

* fix(cli): #10062 use tenant id in all cli

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-07-23 15:36:42 +02:00
Florian Hussonnois
bc1a08b418 chore(version): bump to version '0.23.7' 2025-07-22 15:50:22 +02:00
Nicolas K.
e264c0b75d fix(pebble): #8953 add more flexible day number conversion method (#10205)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-07-22 14:59:24 +02:00
Piyush Bhaskar
fccbb6b648 fix(logs): update query filter to show logs ns and flowwise (#10248) 2025-07-22 18:04:27 +05:30
MilosPaunovic
a243c563d3 fix(namespaces)*: prevent overwriting namespace file content with undefined string
Relates to https://github.com/kestra-io/kestra-ee/issues/4439.
2025-07-22 14:28:28 +02:00
Loïc Mathieu
45ad1f6ee4 fix(tests): strengthen awaitExecution predicate
In some test situation, awaitExecution may receive old messages so we strenghten the predicate to be sure to wait for the correct execution: the one that ends successfully
2025-07-22 12:47:40 +02:00
Piyush Bhaskar
8359bfc680 fix(triggers): ensure clearing the selection. (#10245) 2025-07-22 16:00:31 +05:30
YannC.
30a808188c fix: handle label filter with and instead or for flow
close #4390
2025-07-22 09:42:06 +02:00
Loïc Mathieu
5121ceb63a fixx(system): compilation issue 2025-07-21 12:36:03 +02:00
Loïc Mathieu
1dae994910 fix(executions)*: restart with finally or afterExecution
When a flow fail and is restarted and contains either a finally or an afterExecution block, those are not resetted so the restart will skip all task and terminate the flow.
The fix will reset the status of those tasks so they are restarted.

Fixes #10155
2025-07-21 12:27:54 +02:00
Loïc Mathieu
26a82fce95 fix(executions): support unicode file name inside the internal storage
Fixes #9550
2025-07-21 12:27:25 +02:00
Bart Ledoux
a8584a8a33 feat(flows): add setting to disable hovers in editor 2025-07-17 10:43:05 +02:00
Piyush Bhaskar
5737216b34 fix(triggers): only updates the trigger that matches both flowId and triggerId (#10156) 2025-07-17 14:10:44 +05:30
YannC.
747c424f1f chore(version): upgrade to v0.23.6 2025-07-15 14:52:54 +02:00
brian-mulier-p
33bfc979c5 fix(core): trim expressions in select & multiselect to be able to use '|' instead of '>-' (#10017)
closes #10016
2025-07-09 16:39:02 +02:00
nKwiatkowski
58ceb66cfb chore(version): upgrade to v0.23.5 2025-07-08 15:18:25 +02:00
Loïc Mathieu
a08266593f fix(webserver)*: bulk set labels remove existing labels
FIxes #9764
2025-07-07 15:26:09 +02:00
Loïc Mathieu
d5d5f457b4 fix(system): force running after execution tasks even if the execution is killed
Fixes #9852
2025-07-07 12:41:31 +02:00
François Delbrayelle
cacac2239d fix(taskrunner): abstract task runner (#9769) 2025-07-04 09:50:08 +02:00
nKwiatkowski
5c45bd5eb5 feat(cicd): #4006 add javadoc and sources to cli release 2025-07-03 14:58:12 +02:00
Miloš Paunović
fdf126202c fix(namespaces)*: take pagination into account when browsing namespace flows (#9849)
Closes https://github.com/kestra-io/kestra/issues/9805.
2025-07-02 11:48:53 +02:00
Nicolas K.
0f3c745bb9 feat(cicd): #4006 change signing method (#9854)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-07-02 11:14:07 +02:00
YannC.
5a6a0ff3e3 chore(version): upgrade to v0.23.4 2025-07-01 17:56:42 +02:00
Loïc Mathieu
f5f88e18ce feat(cluster): persist maintenance mode in the database
Part-of: https://github.com/kestra-io/kestra-ee/issues/3735
2025-07-01 17:56:42 +02:00
Nicolas K.
12f521860e feat(cicd): #4006 migrate to maven central (#9807)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-07-01 16:19:11 +02:00
Nicolas K.
b6cf3e1f93 feat(cicd): #4006 migrate sonatype to maven central (#9803)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-07-01 15:02:05 +02:00
YannC.
7125885ea9 fix(triggers): correctly replace the update triggers when disabling 2025-07-01 14:21:22 +02:00
YannC.
0b29a4a247 feat(triggers): avoid clearing selection when refreshing in triggers list 2025-07-01 14:21:22 +02:00
Piyush Bhaskar
0377f87c66 feat(tenant): all routes on /main tenant 2025-07-01 11:57:13 +05:30
Loïc Mathieu
06bd0c6380 fix(system)*: mitigate possible deadlock for execution delay and SLA
In case multiple instances of the executor are started, the execution delay loop and the monitoring SLA loop have a risk of duplicate execution resume or execution SLA violation computation.
This could create some race conditions and duplicate execution update.
But this may also risk to create some deadlocks as two instances of the executor may try to lock the same exection to restart it (or fail it due to SLA).
2025-06-30 14:33:54 +02:00
brian.mulier
cd39995f24 fix(core): use namespace prefix instead of equals
On the namespace/flows, namespace/executions pages and when having a default namespace on Logs page

closes kestra-io/kestra-ee#4200
2025-06-25 17:48:54 +02:00
Loïc Mathieu
938e156bd5 chore(system): call the close runnable later 2025-06-25 14:37:46 +02:00
brian.mulier
1fb7943738 chore(version): update to version '0.23.3' 2025-06-24 17:33:04 +02:00
brian-mulier-p
09d648cf86 fix(variables): put fixtures files with arbitrary key and extract it back as root level "files" variable (#9689) 2025-06-24 17:32:37 +02:00
brian.mulier
02a22faed4 chore(version): update to version '0.23.2' 2025-06-24 14:19:20 +02:00
Ludovic DEHON
169d6610f5 test(core): fix falling test on schedule 2025-06-24 14:19:20 +02:00
Loïc Mathieu
e253958cf4 fix(system): possible NPE on trigger when computing variables 2025-06-24 14:19:20 +02:00
brian-mulier-p
c75f06a036 fix: avoid failure to deserialize json objects that have unknown fields with http client (#9668)
closes #9667
2025-06-24 14:19:20 +02:00
Loïc Mathieu
b3b1b7a5cb feat(executions)*: add tasks to set and unset execution variables
Closes #9555
2025-06-24 14:19:20 +02:00
Loïc Mathieu
34e07b9e2b fix(execution): parent flow never ends when subflow fail due to SLA
This is because the executor didn't have the flow inside it so the execution is not correctly terminated.
It may fix other issues (like flow triggers, purge, ...)

Fixes #9618
2025-06-20 18:04:12 +02:00
Loïc Mathieu
85b449c926 fix(system): flow graph fail to be created while editting a flow
Fixes #9551

It is not the validation per se that fail, it's the graph dependency computation that is also done while editing a flow that fail.
2025-06-20 12:09:18 +02:00
Loïc Mathieu
0017ead9b3 fix(system)*: runIf inside a WorkingDirectory can crash the Worker
Fixes #9639
2025-06-20 12:09:04 +02:00
Barthélémy Ledoux
b0292f02f7 fix(ui): default value for expression cannot be null (#9636) 2025-06-20 11:12:32 +02:00
Piyush Bhaskar
202dc7308d feat(namespaces): show ns description (#9610)
* feat(namespaces): show ns description

* add slot and data for description
2025-06-20 13:59:03 +05:30
François Delbrayelle
3273a9a40c fix(plugin-versioning): replace current JAR if more recent (#9629) 2025-06-20 09:51:21 +02:00
Loïc Mathieu
bd303f4529 fix(system): support allowFailure and allowWarning for the Pause task
Fixes #9416
2025-06-19 17:34:38 +02:00
Barthélémy Ledoux
db57326f0f tests: nocode editor (#9624) 2025-06-19 14:21:15 +02:00
github-actions[bot]
90a576490f chore(version): update to version '0.23.1' 2025-06-19 10:32:53 +00:00
Loïc Mathieu
2cdd968100 feat(system): store version in the settings 2025-06-19 12:23:20 +02:00
Barthélémy Ledoux
adfc3bf526 perf(ui): load a sample schema while waiting (#9558) 2025-06-19 11:34:15 +02:00
Nicolas K.
3a61f9b1ba Fix/tutorial flows with migration (#9620)
* fix(core): #9609 delete tutorial flows and triggers before migrating the database

* fix(core): #9609 delete tutorial flows and triggers before migrating the database for EE version

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-06-19 10:58:29 +02:00
YannC
64e3014426 fix: correctly use default tenant when synchronizing file with local (#9605)
close #9568
2025-06-19 10:04:58 +02:00
François Delbrayelle
1f68e5f4ed fix(podman): do not pass the tag directly to pullImageCmd (withTag) (#9607) 2025-06-18 18:50:54 +02:00
François Delbrayelle
9bfa888e36 fix(plugin): FileSystems.newFileSystem caused a Path component should be / in plugins tests (#9570) 2025-06-18 16:03:45 +02:00
github-actions[bot]
691a77538a chore(version): update to version '0.23.0' 2025-06-17 09:35:23 +00:00
Bart Ledoux
b07086f553 chore: update ui-libs 2025-06-17 11:21:21 +02:00
Ludovic DEHON
ee12c884e9 fix(tasks): sleep example are a full one 2025-06-16 15:02:34 +02:00
Barthélémy Ledoux
712d6da84f fix(ui): make file panel appear beside main panel in namespace (#9546) 2025-06-16 14:45:05 +02:00
Bart Ledoux
fcc5fa2056 fix: package-lock 2025-06-16 14:44:01 +02:00
Loïc Mathieu
dace30ded7 fix(system): compilation issue 2025-06-16 14:18:55 +02:00
github-actions[bot]
2b578f0f94 chore(version): update to version '0.23.0-rc5-SNAPSHOT' 2025-06-16 12:05:27 +00:00
Florian Hussonnois
91f958b26b fix(executor): delete WorkerJobRunning for any terminated task (#9493)
Make ExecutorService responsible for deleting WorkerJobRunning
when a terminated TaskRun is added to an execution.

Changes:
 - Remove unecessary read before delete on WorkerJobRunning table.

Close: #9493
2025-06-16 14:03:11 +02:00
Bart Ledoux
d7fc6894fe tests: fix storybook tests 2025-06-16 13:29:34 +02:00
Bart Ledoux
c286348d27 fix(ui): make array and KV Pairs work in nocode 2025-06-16 12:17:23 +02:00
brian.mulier
de4ec49721 fix(core): yaml utils migration 2025-06-16 11:18:47 +02:00
Barthélémy Ledoux
1966ac6012 fix: cleanup empty metadata to fix variable creation (#9529) 2025-06-16 11:17:52 +02:00
Barthélémy Ledoux
a293a37ec9 fix(ui): nocode API calls on EE needs tenant (#9527) 2025-06-16 11:17:43 +02:00
Barthélémy Ledoux
f295724bb6 fix: small tweaks on tabs (#9520) 2025-06-16 11:17:34 +02:00
Barthélémy Ledoux
06505ad977 fix(ui): snafu on duplicate input pair (#9514) 2025-06-16 11:15:30 +02:00
Barthélémy Ledoux
cb31ef642f fix(ui): [nocode] make dag tasks work (#9506) 2025-06-16 11:14:17 +02:00
Barthélémy Ledoux
c320323371 fix(ui): nocode updating inputs from yaml (#9430) 2025-06-16 11:12:35 +02:00
Barthélémy Ledoux
a190cdd0e7 fix(ui): add datepicker to nocode string field (#9351)
Co-authored-by: GitHub Action <actions@github.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
2025-06-16 11:12:27 +02:00
Barthélémy Ledoux
0678f7c5e9 fix(ui): rename namespace field (#9492) 2025-06-16 11:08:05 +02:00
Barthélémy Ledoux
f39ba5c95e fix(ui): prevent cursor change in Editor component when modelValue is updated from outside (#9371) 2025-06-16 11:07:55 +02:00
Karuna Tata
b4e334c5d8 feat(ui): drag and convert tabs to panels (#9198)
Co-authored-by: Barthélémy Ledoux <bledoux@kestra.io>
2025-06-16 11:07:37 +02:00
Bart Ledoux
561380c942 fix(ui): restore add button as a button 2025-06-16 11:07:25 +02:00
Satvik Kushwaha
68b4867b5a fix(ui): make download and preview visible for text ouputs (#8348)
Co-authored-by: Barthélémy Ledoux <ledouxb@me.com>
2025-06-16 11:06:24 +02:00
Barthélémy Ledoux
cb7f99d107 fix(ui): variables should work with duplicated keys (#9425) 2025-06-16 11:05:17 +02:00
Barthélémy Ledoux
efac7146ff fix: properly detect condition fields (#9353) 2025-06-16 11:02:41 +02:00
Barthélémy Ledoux
11de42c0b8 fix(ui): nocode - open onPause in a new tab (#9366) 2025-06-16 11:02:31 +02:00
Barthélémy Ledoux
b58d9e10dd fix: initialize array fields without any value (#9367) 2025-06-16 11:00:04 +02:00
Barthélémy Ledoux
e25e70d37e refactor: load nocode root form from server schema (#9327) 2025-06-16 10:59:53 +02:00
Karuna Tata
f2dac28997 fix(ui): clear selection of retry form radio buttons (#9268)
Co-authored-by: Barthélémy Ledoux <ledouxb@me.com>
thank you so much for this geat work ! ❤️
2025-06-16 10:59:44 +02:00
Barthélémy Ledoux
0ac8819d95 fix(ui): allow key of sub-tasks to be other than tasks (#9333) 2025-06-16 10:59:24 +02:00
Ludovic DEHON
d261de0df3 fix(core): robots.txt was not served
close kestra-io/kestra#9015
2025-06-13 23:01:48 +02:00
brian.mulier
02cac65614 fix(core): filters was triggering endless refresh
closes #9508
2025-06-13 16:25:34 +02:00
MilosPaunovic
5064687b7e fix(core)*: make sure tour always opens with code & topology tabs visible (#9513)
Closes https://github.com/kestra-io/kestra-ee/issues/4073.
2025-06-13 08:55:20 +02:00
YannC
7c8419b266 fix(ui): Better duplicate key management in the pair component (#9431)
* fix(ui): Better duplicate key mananage in the pair component

close #9220

* fix(ui): add a have-error prop on inputText that show a red shadow

* refactor: simplify inputpair component (#9491)

* fix: only show lock if disabled

* alertState define order

---------

Co-authored-by: Barthélémy Ledoux <bledoux@kestra.io>
2025-06-12 13:28:02 +02:00
Roman Acevedo
84e4c62c6d fix(tests): test editor was showing previous shown plugin doc
fixes https://github.com/kestra-io/kestra-ee/issues/4066
2025-06-12 13:21:29 +02:00
Nicolas K.
9aa605e23b Feat/rework compatibility layer (#9490)
* feat(core): rework compatibility layer

* feat(core): #4062 rework compatibility layer

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-06-12 10:42:49 +02:00
Roman Acevedo
faa77aed79 feat(tests): add execution url in test result 2025-06-12 10:03:05 +02:00
brian-mulier-p
fdce552528 feat(core): introduce tasksWithState autocompletion (#9485)
part of #8350
2025-06-12 09:55:57 +02:00
brian.mulier
a028a61792 fix(core): avoid infinite load upon route redirect (#9480)
closes #9479
2025-06-11 17:03:52 +02:00
brian.mulier
023a77a320 fix(core): properly map labels filters from query (#9480)
closes #9324
2025-06-11 17:03:52 +02:00
brian.mulier
bfee04bca2 fix(core): prevent incompatible timeRange & start/endDate filters + prevent multiple scope filters (#9480)
closes #9240
2025-06-11 17:03:52 +02:00
YannC
3756f01bdf fix(ui): base the required prop on the requiredProperties list (#9433)
close #9377
2025-06-11 13:09:27 +02:00
YannC
c1240d7391 feat(ui): allow to close a tab with mouse middle click like in a navigator/ide (#9434) 2025-06-11 08:55:13 +02:00
YannC
ac37ae6032 fix(core): use Min annotation instead of Positive (#9432)
close #9380
2025-06-10 17:15:11 +02:00
github-actions[bot]
9e51b100b0 chore(version): update to version '0.23.0-rc3-SNAPSHOT' 2025-06-10 12:51:54 +00:00
Miloš Paunović
bc81e01608 fix(core)*: properly display chart colors for logs (#9429) 2025-06-10 13:51:56 +02:00
YannC.
9f2162c942 feat(): add Kestra plugin in the list 2025-06-10 12:44:09 +02:00
brian-mulier-p
97992d99ee fix(core): handle properly dot in nested keys & commas in quoted filter values (#9410) 2025-06-10 11:55:30 +02:00
brian.mulier
f90f6b8429 chore(deps): bump vitest to 3.2.3 2025-06-10 11:55:30 +02:00
brian.mulier
0f7360ae81 build(tests): replace workspaces with proper storybook config + working aliases 2025-06-10 11:53:11 +02:00
Florian Hussonnois
938590f31f fix(plugins): check whether plugin registry support versioning (#9122) 2025-06-10 11:49:40 +02:00
YannC.
b2d1c84a86 fix(): display correctly doc/chart preview when editing custom dashboard
close #9411
2025-06-10 10:25:41 +02:00
Ludovic DEHON
d7ca302830 feat(system): add server_type as global metrics tags 2025-06-10 09:23:14 +02:00
Roman Acevedo
8656e852cc build(ci): fix setversion workflow not making tag push trigger main 2025-06-09 18:03:49 +02:00
brian-mulier-p
cc72336350 fix(core): avoid adding invalid keys from query parameters to filter (#9383)
closes #9364
2025-06-09 18:03:49 +02:00
Roman Acevedo
316d89764e tests(core): add storybook on executions filters (#9354) 2025-06-09 18:03:49 +02:00
Barthélémy Ledoux
4873bf4d36 chore: upgrade storybook (#9326) 2025-06-09 14:40:21 +02:00
Florian Hussonnois
204bf7f5e1 chore: add script to update gradle kestraVersion prop on plugins 2025-06-09 14:31:45 +02:00
Loïc Mathieu
1e0950fdf8 fix(system): import flow should set the tenantId 2025-06-09 13:51:53 +02:00
github-actions[bot]
4cddc704f4 chore(version): update to version '0.23.0-rc2-SNAPSHOT' 2025-06-09 10:48:43 +00:00
Miloš Paunović
f2f0e29f93 fix(namespaces): properly load flows when changing namespace (#9393)
Closes https://github.com/kestra-io/kestra/issues/9352.
2025-06-09 12:34:36 +02:00
Miloš Paunović
95011e022e fix(namespaces): reload namespace once the id parameter changes (#9372)
Closes https://github.com/kestra-io/kestra-ee/issues/3630.
2025-06-06 12:25:37 +02:00
brian.mulier
65503b708a fix(core): add DefaultFilterLanguage as default in KestraFilter
closes #9365
2025-06-05 17:42:34 +02:00
brian-mulier-p
876b8cb2e6 fix(core): avoid crashing in case of taskrun having too large value (#9359)
closes #9312
2025-06-05 14:11:37 +02:00
Nicolas K.
f3b7592dfa fix(flows): #9319 error when puase with timeout trigger an execution (#9334)
* fix(flows): #9319 error when puase with timeout trigger an execution even after it's terminated

* fix(flows): only skip paused flow when execution is terminated

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-06-05 10:15:49 +02:00
brian.mulier
4dbeaf86bb fix(core): larger debounce for filter 2025-06-05 09:48:53 +02:00
brian.mulier
f98e78399d fix(core): handle whitespaces in label key and value 2025-06-05 09:48:43 +02:00
brian.mulier
71dac0f311 fix(core): smarter autocomplete order in editor 2025-06-05 09:48:00 +02:00
brian-mulier-p
3077d0ac7a fix(core): additional plugins are now properly shown in plugin docs (#9329)
closes kestra-io/plugin-langchain4j#61
2025-06-05 09:46:57 +02:00
YannC.
9504bbaffe fix(ci): put back bump helm chart and remove if condition 2025-06-05 08:48:56 +02:00
YannC.
159c9373ad fix(ci): checkout actions from main branch 2025-06-04 21:12:56 +02:00
YannC.
55b9088b55 fix(ci): modify actions order 2025-06-04 21:06:17 +02:00
YannC.
601d1a0abb fix(ci): Correctly pass all the secrets through all workflows 2025-06-04 15:10:33 +02:00
Florian Hussonnois
4a1cf98f26 chore(version): bump to version '0.23.0-rc1-SNAPSHOT' 2025-06-04 14:07:30 +02:00
329 changed files with 8525 additions and 4296 deletions

View File

@@ -43,6 +43,9 @@ jobs:
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
end:
runs-on: ubuntu-latest

View File

@@ -22,11 +22,11 @@ jobs:
echo "Invalid release version. Must match regex: ^[0-9]+(\.[0-9]+)(\.[0-9]+)-(rc[0-9])?(-SNAPSHOT)?$"
exit 1
fi
# Extract the major and minor versions
BASE_VERSION=$(echo "$RELEASE_VERSION" | sed -E 's/^([0-9]+\.[0-9]+)\..*/\1/')
RELEASE_BRANCH="refs/heads/releases/v${BASE_VERSION}.x"
CURRENT_BRANCH="$GITHUB_REF"
if ! [[ "$CURRENT_BRANCH" == "$RELEASE_BRANCH" ]]; then
echo "Invalid release branch. Expected $RELEASE_BRANCH, was $CURRENT_BRANCH"
@@ -34,11 +34,14 @@ jobs:
fi
# Checkout
- uses: actions/checkout@v4
- name: Checkout
uses: actions/checkout@v5
with:
fetch-depth: 0
token: ${{ secrets.GH_PERSONAL_TOKEN }}
- name: Configure Git
# Configure
- name: Git - Configure
run: |
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
git config --global user.name "github-actions[bot]"
@@ -54,4 +57,4 @@ jobs:
git commit -m"chore(version): update to version '$RELEASE_VERSION'"
git push
git tag -a "v$RELEASE_VERSION" -m"v$RELEASE_VERSION"
git push origin "v$RELEASE_VERSION"
git push --tags

View File

@@ -6,23 +6,15 @@ on:
GH_PERSONAL_TOKEN:
description: "The Github personal token."
required: true
push:
tags:
- '*'
SLACK_RELEASES_WEBHOOK_URL:
description: "The Slack webhook URL."
required: true
jobs:
publish:
name: Github - Release
runs-on: ubuntu-latest
steps:
# Download Exec
- name: Artifacts - Download executable
uses: actions/download-artifact@v4
if: startsWith(github.ref, 'refs/tags/v')
with:
name: exe
path: build/executable
# Check out
- name: Checkout - Repository
uses: actions/checkout@v4
@@ -36,11 +28,20 @@ jobs:
with:
repository: kestra-io/actions
sparse-checkout-cone-mode: true
ref: fix/core-release
path: actions
sparse-checkout: |
.github/actions
# Download Exec
# Must be done after checkout actions
- name: Artifacts - Download executable
uses: actions/download-artifact@v4
if: startsWith(github.ref, 'refs/tags/v')
with:
name: exe
path: build/executable
# GitHub Release
- name: Create GitHub release
uses: ./actions/.github/actions/github-release
@@ -49,3 +50,16 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
# Trigger gha workflow to bump helm chart version
- name: GitHub - Trigger the Helm chart version bump
uses: peter-evans/repository-dispatch@v3
with:
token: ${{ secrets.GH_PERSONAL_TOKEN }}
repository: kestra-io/helm-charts
event-type: update-helm-chart-version
client-payload: |-
{
"new_version": "${{ github.ref_name }}",
"github_repository": "${{ github.repository }}",
"github_actor": "${{ github.actor }}"
}

View File

@@ -39,8 +39,8 @@ jobs:
- name: Publish - Release package to Maven Central
shell: bash
env:
ORG_GRADLE_PROJECT_sonatypeUsername: ${{ secrets.SONATYPE_USER }}
ORG_GRADLE_PROJECT_sonatypePassword: ${{ secrets.SONATYPE_PASSWORD }}
ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.SONATYPE_USER }}
ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }}
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE}}
@@ -50,7 +50,7 @@ jobs:
echo "signing.password=${SONATYPE_GPG_PASSWORD}" >> ~/.gradle/gradle.properties
echo "signing.secretKeyRingFile=${HOME}/.gradle/secring.gpg" >> ~/.gradle/gradle.properties
echo ${SONATYPE_GPG_FILE} | base64 -d > ~/.gradle/secring.gpg
./gradlew publishToSonatype ${{ startsWith(github.ref, 'refs/tags/v') && 'closeAndReleaseSonatypeStagingRepository' || '' }}
./gradlew publishToMavenCentral
# Gradle dependency
- name: Java - Gradle dependency graph

View File

@@ -42,6 +42,12 @@ on:
SONATYPE_GPG_FILE:
description: "The Sonatype GPG file."
required: true
GH_PERSONAL_TOKEN:
description: "The Github personal token."
required: true
SLACK_RELEASES_WEBHOOK_URL:
description: "The Slack webhook URL."
required: true
jobs:
build-artifacts:
name: Build - Artifacts
@@ -77,4 +83,5 @@ jobs:
if: startsWith(github.ref, 'refs/tags/v')
uses: ./.github/workflows/workflow-github-release.yml
secrets:
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}

View File

@@ -61,6 +61,7 @@
#plugin-jenkins:io.kestra.plugin:plugin-jenkins:LATEST
#plugin-jira:io.kestra.plugin:plugin-jira:LATEST
#plugin-kafka:io.kestra.plugin:plugin-kafka:LATEST
#plugin-kestra:io.kestra.plugin:plugin-kestra:LATEST
#plugin-kubernetes:io.kestra.plugin:plugin-kubernetes:LATEST
#plugin-langchain4j:io.kestra.plugin:plugin-langchain4j:LATEST
#plugin-ldap:io.kestra.plugin:plugin-ldap:LATEST

View File

@@ -31,12 +31,10 @@ plugins {
id 'com.github.node-gradle.node' version '7.1.0'
// release
id "io.github.gradle-nexus.publish-plugin" version "2.0.0"
id 'net.researchgate.release' version '3.1.0'
id "com.gorylenko.gradle-git-properties" version "2.5.0"
id 'signing'
id 'ru.vyarus.pom' version '3.0.0' apply false
id 'ru.vyarus.github-info' version '2.0.0' apply false
id "com.vanniktech.maven.publish" version "0.33.0"
// OWASP dependency check
id "org.owasp.dependencycheck" version "12.1.1" apply false
@@ -414,6 +412,7 @@ distTar.dependsOn shadowJar
startScripts.dependsOn shadowJar
startShadowScripts.dependsOn jar
shadowJar.dependsOn 'ui:assembleFrontend'
shadowJar.dependsOn jar
/**********************************************************************************************************************\
* Executable Jar
@@ -484,24 +483,11 @@ tasks.register('runStandalone', JavaExec) {
/**********************************************************************************************************************\
* Publish
**********************************************************************************************************************/
nexusPublishing {
repositoryDescription = "${project.group}:${rootProject.name}:${project.version}"
useStaging = !project.version.endsWith("-SNAPSHOT")
repositories {
sonatype {
nexusUrl.set(uri("https://s01.oss.sonatype.org/service/local/"))
snapshotRepositoryUrl.set(uri("https://s01.oss.sonatype.org/content/repositories/snapshots/"))
}
}
}
subprojects {subProject ->
subprojects {
if (it.name != 'jmh-benchmarks') {
apply plugin: "maven-publish"
if (subProject.name != 'jmh-benchmarks' && subProject.name != rootProject.name) {
apply plugin: 'signing'
apply plugin: 'ru.vyarus.pom'
apply plugin: 'ru.vyarus.github-info'
apply plugin: "com.vanniktech.maven.publish"
javadoc {
options {
@@ -535,56 +521,120 @@ subprojects {
}
}
github {
user 'kestra-io'
license 'Apache'
repository 'kestra'
site 'https://kestra.io'
//These modules should not be published
def unpublishedModules = ["jdbc-mysql", "jdbc-postgres", "webserver"]
if (subProject.name in unpublishedModules){
return
}
maven.pom {
description = 'The modern, scalable orchestrator & scheduler open source platform'
mavenPublishing {
publishToMavenCentral(true)
signAllPublications()
developers {
developer {
id = "tchiotludo"
name = "Ludovic Dehon"
coordinates(
"${rootProject.group}",
subProject.name == "cli" ? rootProject.name : subProject.name,
"${rootProject.version}"
)
pom {
name = project.name
description = "${project.group}:${project.name}:${rootProject.version}"
url = "https://github.com/kestra-io/${rootProject.name}"
licenses {
license {
name = "The Apache License, Version 2.0"
url = "http://www.apache.org/licenses/LICENSE-2.0.txt"
}
}
developers {
developer {
id = "tchiotludo"
name = "Ludovic Dehon"
email = "ldehon@kestra.io"
}
}
scm {
connection = 'scm:git:'
url = "https://github.com/kestra-io/${rootProject.name}"
}
}
}
publishing {
publications {
sonatypePublication(MavenPublication) {
version project.version
afterEvaluate {
publishing {
publications {
withType(MavenPublication).configureEach { publication ->
if (project.name.contains('cli')) {
groupId "io.kestra"
artifactId "kestra"
artifact shadowJar
artifact executableJar
} else if (project.name.contains('platform')){
groupId project.group
artifactId project.name
} else {
from components.java
groupId project.group
artifactId project.name
artifact sourcesJar
artifact javadocJar
artifact testsJar
if (subProject.name == "platform") {
// Clear all artifacts except the BOM
publication.artifacts.clear()
}
}
}
}
}
signing {
// only sign JARs that we publish to Sonatype
required { gradle.taskGraph.hasTask("publishSonatypePublicationPublicationToSonatypeRepository") }
sign publishing.publications.sonatypePublication
if (subProject.name == 'cli') {
/* Make sure the special publication is wired *after* every plugin */
subProject.afterEvaluate {
/* 1. Remove the default java component so Gradle stops expecting
the standard cli-*.jar, sources, javadoc, etc. */
components.removeAll { it.name == "java" }
/* 2. Replace the publications artifacts with shadow + exec */
publishing.publications.withType(MavenPublication).configureEach { pub ->
pub.artifacts.clear()
// main shadow JAR built at root
pub.artifact(rootProject.tasks.named("shadowJar").get()) {
extension = "jar"
}
// executable ZIP built at root
pub.artifact(rootProject.tasks.named("executableJar").get().archiveFile) {
classifier = "exec"
extension = "zip"
}
pub.artifact(tasks.named("sourcesJar").get())
pub.artifact(tasks.named("javadocJar").get())
}
/* 3. Disable Gradle-module metadata for this publication to
avoid the “artifact removed from java component” error. */
tasks.withType(GenerateModuleMetadata).configureEach { it.enabled = false }
/* 4. Make every publish task in :cli wait for the two artifacts */
tasks.matching { it.name.startsWith("publish") }.configureEach {
dependsOn rootProject.tasks.named("shadowJar")
dependsOn rootProject.tasks.named("executableJar")
}
}
}
if (subProject.name != 'platform' && subProject.name != 'cli') {
// only if a test source set actually exists (avoids empty artifacts)
def hasTests = subProject.extensions.findByName('sourceSets')?.findByName('test') != null
if (hasTests) {
// wire the artifact onto every Maven publication of this subproject
publishing {
publications {
withType(MavenPublication).configureEach { pub ->
// keep the normal java component + sources/javadoc already configured
pub.artifact(subProject.tasks.named('testsJar').get())
}
}
}
// make sure publish tasks build the tests jar first
tasks.matching { it.name.startsWith('publish') }.configureEach {
dependsOn subProject.tasks.named('testsJar')
}
}
}
tasks.withType(GenerateModuleMetadata).configureEach {
@@ -595,6 +645,7 @@ subprojects {
}
/**********************************************************************************************************************\
* Version
**********************************************************************************************************************/

View File

@@ -37,4 +37,4 @@ dependencies {
//test
testImplementation "org.wiremock:wiremock-jetty12"
}
}

View File

@@ -1,7 +1,5 @@
package io.kestra.cli;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.http.HttpHeaders;
import io.micronaut.http.HttpRequest;
@@ -16,16 +14,15 @@ import io.micronaut.http.netty.body.NettyJsonHandler;
import io.micronaut.json.JsonMapper;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import lombok.Builder;
import lombok.Value;
import lombok.extern.jackson.Jacksonized;
import picocli.CommandLine;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import lombok.Builder;
import lombok.Value;
import lombok.extern.jackson.Jacksonized;
import picocli.CommandLine;
public abstract class AbstractApiCommand extends AbstractCommand {
@CommandLine.Option(names = {"--server"}, description = "Kestra server url", defaultValue = "http://localhost:8080")
@@ -37,7 +34,7 @@ public abstract class AbstractApiCommand extends AbstractCommand {
@CommandLine.Option(names = {"--user"}, paramLabel = "<user:password>", description = "Server user and password")
protected String user;
@CommandLine.Option(names = {"--tenant"}, description = "Tenant identifier (EE only, when multi-tenancy is enabled)")
@CommandLine.Option(names = {"--tenant"}, description = "Tenant identifier (EE only)")
protected String tenantId;
@CommandLine.Option(names = {"--api-token"}, description = "API Token (EE only).")
@@ -87,12 +84,12 @@ public abstract class AbstractApiCommand extends AbstractCommand {
return request;
}
protected String apiUri(String path) {
protected String apiUri(String path, String tenantId) {
if (path == null || !path.startsWith("/")) {
throw new IllegalArgumentException("'path' must be non-null and start with '/'");
}
return tenantId == null ? "/api/v1/" + MAIN_TENANT + path : "/api/v1/" + tenantId + path;
return "/api/v1/" + tenantId + path;
}
@Builder

View File

@@ -1,5 +1,6 @@
package io.kestra.cli;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.models.validations.ValidateConstraintViolation;
import io.kestra.core.serializers.YamlParser;
@@ -9,6 +10,7 @@ import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import picocli.CommandLine;
import java.io.IOException;
@@ -31,6 +33,9 @@ public abstract class AbstractValidateCommand extends AbstractApiCommand {
@CommandLine.Parameters(index = "0", description = "the directory containing files to check")
protected Path directory;
@Inject
private TenantIdSelectorService tenantService;
/** {@inheritDoc} **/
@Override
protected boolean loadExternalPlugins() {
@@ -112,7 +117,7 @@ public abstract class AbstractValidateCommand extends AbstractApiCommand {
try(DefaultHttpClient client = client()) {
MutableHttpRequest<String> request = HttpRequest
.POST(apiUri("/flows/validate"), body).contentType(MediaType.APPLICATION_YAML);
.POST(apiUri("/flows/validate", tenantService.getTenantId(tenantId)), body).contentType(MediaType.APPLICATION_YAML);
List<ValidateConstraintViolation> validations = client.toBlocking().retrieve(
this.requestOptions(request),

View File

@@ -2,11 +2,13 @@ package io.kestra.cli.commands.flows;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -23,6 +25,9 @@ public class FlowCreateCommand extends AbstractApiCommand {
@CommandLine.Parameters(index = "0", description = "The file containing the flow")
public Path flowFile;
@Inject
private TenantIdSelectorService tenantService;
@SuppressWarnings("deprecation")
@Override
public Integer call() throws Exception {
@@ -34,7 +39,7 @@ public class FlowCreateCommand extends AbstractApiCommand {
try(DefaultHttpClient client = client()) {
MutableHttpRequest<String> request = HttpRequest
.POST(apiUri("/flows"), body).contentType(MediaType.APPLICATION_YAML);
.POST(apiUri("/flows", tenantService.getTenantId(tenantId)), body).contentType(MediaType.APPLICATION_YAML);
client.toBlocking().retrieve(
this.requestOptions(request),

View File

@@ -2,10 +2,12 @@ package io.kestra.cli.commands.flows;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -23,6 +25,9 @@ public class FlowDeleteCommand extends AbstractApiCommand {
@CommandLine.Parameters(index = "1", description = "The ID of the flow")
public String id;
@Inject
private TenantIdSelectorService tenantService;
@SuppressWarnings("deprecation")
@Override
public Integer call() throws Exception {
@@ -30,7 +35,7 @@ public class FlowDeleteCommand extends AbstractApiCommand {
try(DefaultHttpClient client = client()) {
MutableHttpRequest<String> request = HttpRequest
.DELETE(apiUri("/flows/" + namespace + "/" + id ));
.DELETE(apiUri("/flows/" + namespace + "/" + id, tenantService.getTenantId(tenantId)));
client.toBlocking().exchange(
this.requestOptions(request)

View File

@@ -2,7 +2,7 @@ package io.kestra.cli.commands.flows;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.AbstractValidateCommand;
import io.micronaut.context.ApplicationContext;
import io.kestra.cli.services.TenantIdSelectorService;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MediaType;
@@ -25,9 +25,8 @@ import java.nio.file.Path;
public class FlowExportCommand extends AbstractApiCommand {
private static final String DEFAULT_FILE_NAME = "flows.zip";
// @FIXME: Keep it for bug in micronaut that need to have inject on top level command to inject on abstract classe
@Inject
private ApplicationContext applicationContext;
private TenantIdSelectorService tenantService;
@CommandLine.Option(names = {"--namespace"}, description = "The namespace of flows to export")
public String namespace;
@@ -41,7 +40,7 @@ public class FlowExportCommand extends AbstractApiCommand {
try(DefaultHttpClient client = client()) {
MutableHttpRequest<Object> request = HttpRequest
.GET(apiUri("/flows/export/by-query") + (namespace != null ? "?namespace=" + namespace : ""))
.GET(apiUri("/flows/export/by-query", tenantService.getTenantId(tenantId)) + (namespace != null ? "?namespace=" + namespace : ""))
.accept(MediaType.APPLICATION_OCTET_STREAM);
HttpResponse<byte[]> response = client.toBlocking().exchange(this.requestOptions(request), byte[].class);

View File

@@ -1,7 +1,8 @@
package io.kestra.cli.commands.flows;
import com.google.common.collect.ImmutableMap;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
@@ -30,7 +31,7 @@ import java.util.concurrent.TimeoutException;
description = "Test a flow"
)
@Slf4j
public class FlowTestCommand extends AbstractCommand {
public class FlowTestCommand extends AbstractApiCommand {
@Inject
private ApplicationContext applicationContext;
@@ -76,6 +77,7 @@ public class FlowTestCommand extends AbstractCommand {
FlowRepositoryInterface flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
FlowInputOutput flowInputOutput = applicationContext.getBean(FlowInputOutput.class);
RunnerUtils runnerUtils = applicationContext.getBean(RunnerUtils.class);
TenantIdSelectorService tenantService = applicationContext.getBean(TenantIdSelectorService.class);
Map<String, Object> inputs = new HashMap<>();
@@ -89,7 +91,7 @@ public class FlowTestCommand extends AbstractCommand {
try {
runner.run();
repositoryLoader.load(file.toFile());
repositoryLoader.load(tenantService.getTenantId(tenantId), file.toFile());
List<Flow> all = flowRepository.findAllForAllTenants();
if (all.size() != 1) {

View File

@@ -2,11 +2,13 @@ package io.kestra.cli.commands.flows;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -29,6 +31,9 @@ public class FlowUpdateCommand extends AbstractApiCommand {
@CommandLine.Parameters(index = "2", description = "The ID of the flow")
public String id;
@Inject
private TenantIdSelectorService tenantService;
@SuppressWarnings("deprecation")
@Override
public Integer call() throws Exception {
@@ -40,7 +45,7 @@ public class FlowUpdateCommand extends AbstractApiCommand {
try(DefaultHttpClient client = client()) {
MutableHttpRequest<String> request = HttpRequest
.PUT(apiUri("/flows/" + namespace + "/" + id ), body).contentType(MediaType.APPLICATION_YAML);
.PUT(apiUri("/flows/" + namespace + "/" + id, tenantService.getTenantId(tenantId)), body).contentType(MediaType.APPLICATION_YAML);
client.toBlocking().retrieve(
this.requestOptions(request),

View File

@@ -2,6 +2,7 @@ package io.kestra.cli.commands.flows;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.serializers.YamlParser;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
@@ -9,6 +10,7 @@ import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import jakarta.validation.ConstraintViolationException;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -36,6 +38,9 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
@CommandLine.Option(names = {"--namespace"}, description = "The parent namespace of the flows, if not set, every namespace are allowed.")
public String namespace;
@Inject
private TenantIdSelectorService tenantIdSelectorService;
@SuppressWarnings("deprecation")
@Override
public Integer call() throws Exception {
@@ -66,7 +71,7 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
namespaceQuery = "&namespace=" + namespace;
}
MutableHttpRequest<String> request = HttpRequest
.POST(apiUri("/flows/bulk") + "?allowNamespaceChild=true&delete=" + delete + namespaceQuery, body).contentType(MediaType.APPLICATION_YAML);
.POST(apiUri("/flows/bulk", tenantIdSelectorService.getTenantId(tenantId)) + "?allowNamespaceChild=true&delete=" + delete + namespaceQuery, body).contentType(MediaType.APPLICATION_YAML);
List<UpdateResult> updated = client.toBlocking().retrieve(
this.requestOptions(request),

View File

@@ -1,6 +1,7 @@
package io.kestra.cli.commands.flows;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.services.FlowService;
@@ -22,6 +23,9 @@ public class FlowValidateCommand extends AbstractValidateCommand {
@Inject
private FlowService flowService;
@Inject
private TenantIdSelectorService tenantService;
@Override
public Integer call() throws Exception {
return this.call(
@@ -35,7 +39,7 @@ public class FlowValidateCommand extends AbstractValidateCommand {
FlowWithSource flow = (FlowWithSource) object;
List<String> warnings = new ArrayList<>();
warnings.addAll(flowService.deprecationPaths(flow).stream().map(deprecation -> deprecation + " is deprecated").toList());
warnings.addAll(flowService.warnings(flow, this.tenantId));
warnings.addAll(flowService.warnings(flow, tenantService.getTenantId(tenantId)));
return warnings;
},
(Object object) -> {

View File

@@ -3,6 +3,7 @@ package io.kestra.cli.commands.flows.namespaces;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.commands.AbstractServiceNamespaceUpdateCommand;
import io.kestra.cli.commands.flows.IncludeHelperExpander;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.serializers.YamlParser;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
@@ -10,6 +11,7 @@ import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import jakarta.validation.ConstraintViolationException;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -30,6 +32,9 @@ public class FlowNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCo
@CommandLine.Option(names = {"--override-namespaces"}, negatable = true, description = "Replace namespace of all flows by the one provided")
public boolean override = false;
@Inject
private TenantIdSelectorService tenantService;
@SuppressWarnings("deprecation")
@Override
public Integer call() throws Exception {
@@ -59,7 +64,7 @@ public class FlowNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCo
}
try(DefaultHttpClient client = client()) {
MutableHttpRequest<String> request = HttpRequest
.POST(apiUri("/flows/") + namespace + "?delete=" + delete, body).contentType(MediaType.APPLICATION_YAML);
.POST(apiUri("/flows/", tenantService.getTenantId(tenantId)) + namespace + "?delete=" + delete, body).contentType(MediaType.APPLICATION_YAML);
List<UpdateResult> updated = client.toBlocking().retrieve(
this.requestOptions(request),

View File

@@ -2,12 +2,14 @@ package io.kestra.cli.commands.namespaces.files;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.utils.KestraIgnore;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MediaType;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.multipart.MultipartBody;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -34,6 +36,9 @@ public class NamespaceFilesUpdateCommand extends AbstractApiCommand {
@CommandLine.Option(names = {"--delete"}, negatable = true, description = "Whether missing should be deleted")
public boolean delete = false;
@Inject
private TenantIdSelectorService tenantService;
private static final String KESTRA_IGNORE_FILE = ".kestraignore";
@Override
@@ -44,7 +49,7 @@ public class NamespaceFilesUpdateCommand extends AbstractApiCommand {
try (var files = Files.walk(from); DefaultHttpClient client = client()) {
if (delete) {
client.toBlocking().exchange(this.requestOptions(HttpRequest.DELETE(apiUri("/namespaces/") + namespace + "/files?path=" + to, null)));
client.toBlocking().exchange(this.requestOptions(HttpRequest.DELETE(apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/files?path=" + to, null)));
}
KestraIgnore kestraIgnore = new KestraIgnore(from);
@@ -62,7 +67,7 @@ public class NamespaceFilesUpdateCommand extends AbstractApiCommand {
client.toBlocking().exchange(
this.requestOptions(
HttpRequest.POST(
apiUri("/namespaces/") + namespace + "/files?path=" + destination,
apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/files?path=" + destination,
body
).contentType(MediaType.MULTIPART_FORM_DATA)
)

View File

@@ -3,11 +3,13 @@ package io.kestra.cli.commands.namespaces.kv;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.serializers.JacksonMapper;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import picocli.CommandLine.Option;
@@ -42,6 +44,9 @@ public class KvUpdateCommand extends AbstractApiCommand {
@Option(names = {"-f", "--file-value"}, description = "The file from which to read the value to set. If this is provided, it will take precedence over any specified value.")
public Path fileValue;
@Inject
private TenantIdSelectorService tenantService;
@Override
public Integer call() throws Exception {
super.call();
@@ -56,7 +61,7 @@ public class KvUpdateCommand extends AbstractApiCommand {
Duration ttl = expiration == null ? null : Duration.parse(expiration);
MutableHttpRequest<String> request = HttpRequest
.PUT(apiUri("/namespaces/") + namespace + "/kv/" + key, value)
.PUT(apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/kv/" + key, value)
.contentType(MediaType.APPLICATION_JSON_TYPE);
if (ttl != null) {

View File

@@ -2,6 +2,7 @@ package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.cli.services.FileChangedEventListener;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
@@ -44,6 +45,9 @@ public class StandAloneCommand extends AbstractServerCommand {
@CommandLine.Option(names = {"-f", "--flow-path"}, description = "the flow path containing flow to inject at startup (when running with a memory flow repository)")
private File flowPath;
@CommandLine.Option(names = "--tenant", description = "Tenant identifier, Required to load flows from path with the enterprise edition")
private String tenantId;
@CommandLine.Option(names = {"--worker-thread"}, description = "the number of worker threads, defaults to four times the number of available processors. Set it to 0 to avoid starting a worker.")
private int workerThread = defaultWorkerThread();
@@ -98,7 +102,8 @@ public class StandAloneCommand extends AbstractServerCommand {
if (flowPath != null) {
try {
LocalFlowRepositoryLoader localFlowRepositoryLoader = applicationContext.getBean(LocalFlowRepositoryLoader.class);
localFlowRepositoryLoader.load(null, this.flowPath);
TenantIdSelectorService tenantIdSelectorService = applicationContext.getBean(TenantIdSelectorService.class);
localFlowRepositoryLoader.load(tenantIdSelectorService.getTenantId(this.tenantId), this.flowPath);
} catch (IOException e) {
throw new CommandLine.ParameterException(this.spec.commandLine(), "Invalid flow path", e);
}

View File

@@ -2,8 +2,8 @@ package io.kestra.cli.commands.templates;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.templates.TemplateEnabled;
import io.micronaut.context.ApplicationContext;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MediaType;
@@ -27,9 +27,8 @@ import java.nio.file.Path;
public class TemplateExportCommand extends AbstractApiCommand {
private static final String DEFAULT_FILE_NAME = "templates.zip";
// @FIXME: Keep it for bug in micronaut that need to have inject on top level command to inject on abstract classe
@Inject
private ApplicationContext applicationContext;
private TenantIdSelectorService tenantService;
@CommandLine.Option(names = {"--namespace"}, description = "The namespace of templates to export")
public String namespace;
@@ -43,7 +42,7 @@ public class TemplateExportCommand extends AbstractApiCommand {
try(DefaultHttpClient client = client()) {
MutableHttpRequest<Object> request = HttpRequest
.GET(apiUri("/templates/export/by-query") + (namespace != null ? "?namespace=" + namespace : ""))
.GET(apiUri("/templates/export/by-query", tenantService.getTenantId(tenantId)) + (namespace != null ? "?namespace=" + namespace : ""))
.accept(MediaType.APPLICATION_OCTET_STREAM);
HttpResponse<byte[]> response = client.toBlocking().exchange(this.requestOptions(request), byte[].class);

View File

@@ -2,6 +2,7 @@ package io.kestra.cli.commands.templates.namespaces;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.commands.AbstractServiceNamespaceUpdateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.core.serializers.YamlParser;
@@ -10,6 +11,7 @@ import io.micronaut.http.HttpRequest;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@@ -27,6 +29,9 @@ import jakarta.validation.ConstraintViolationException;
@TemplateEnabled
public class TemplateNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCommand {
@Inject
private TenantIdSelectorService tenantService;
@Override
public Integer call() throws Exception {
super.call();
@@ -44,7 +49,7 @@ public class TemplateNamespaceUpdateCommand extends AbstractServiceNamespaceUpda
try (DefaultHttpClient client = client()) {
MutableHttpRequest<List<Template>> request = HttpRequest
.POST(apiUri("/templates/") + namespace + "?delete=" + delete, templates);
.POST(apiUri("/templates/", tenantService.getTenantId(tenantId)) + namespace + "?delete=" + delete, templates);
List<UpdateResult> updated = client.toBlocking().retrieve(
this.requestOptions(request),

View File

@@ -12,8 +12,8 @@ import io.kestra.core.services.PluginDefaultService;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
import io.micronaut.scheduling.io.watch.FileWatchConfiguration;
import jakarta.inject.Inject;
import jakarta.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import jakarta.validation.ConstraintViolationException;
import lombok.extern.slf4j.Slf4j;
@@ -26,6 +26,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
@Singleton
@Slf4j
@Requires(property = "micronaut.io.watch.enabled", value = "true")
@@ -111,6 +113,8 @@ public class FileChangedEventListener {
}
public void startListening(List<Path> paths) throws IOException, InterruptedException {
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
for (Path path : paths) {
path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY);
}
@@ -189,6 +193,8 @@ public class FileChangedEventListener {
}
private void loadFlowsFromFolder(Path folder) {
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
try {
Files.walkFileTree(folder, new SimpleFileVisitor<Path>() {
@Override
@@ -232,6 +238,8 @@ public class FileChangedEventListener {
}
private Optional<FlowWithSource> parseFlow(String content, Path entry) {
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
try {
FlowWithSource flow = pluginDefaultService.parseFlowWithAllDefaults(tenantId, content, false);
modelValidator.validate(flow);

View File

@@ -0,0 +1,19 @@
package io.kestra.cli.services;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import io.kestra.core.exceptions.KestraRuntimeException;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.StringUtils;
@Singleton
public class TenantIdSelectorService {
//For override purpose in Kestra EE
public String getTenantId(String tenantId) {
if (StringUtils.isNotBlank(tenantId) && !MAIN_TENANT.equals(tenantId)){
throw new KestraRuntimeException("Tenant id can only be 'main'");
}
return MAIN_TENANT;
}
}

View File

@@ -15,6 +15,9 @@ micronaut:
static:
paths: classpath:static
mapping: /static/**
root:
paths: classpath:root
mapping: /**
server:
max-request-size: 10GB
multipart:

View File

@@ -108,6 +108,34 @@ class FlowCreateOrUpdateCommandTest {
}
}
@Test
void should_fail_with_incorrect_tenant() {
URL directory = FlowCreateOrUpdateCommandTest.class.getClassLoader().getResource("flows");
ByteArrayOutputStream err = new ByteArrayOutputStream();
System.setErr(new PrintStream(err));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"--tenant", "incorrect",
directory.getPath(),
};
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
assertThat(err.toString()).contains("Tenant id can only be 'main'");
err.reset();
}
}
@Test
void helper() {
URL directory = FlowCreateOrUpdateCommandTest.class.getClassLoader().getResource("helper");

View File

@@ -21,7 +21,7 @@ import static org.assertj.core.api.Assertions.assertThat;
class PluginDocCommandTest {
public static final String PLUGIN_TEMPLATE_TEST = "plugin-template-test-0.18.0-SNAPSHOT.jar";
public static final String PLUGIN_TEMPLATE_TEST = "plugin-template-test-0.24.0-SNAPSHOT.jar";
@Test
void run() throws IOException, URISyntaxException {

View File

@@ -20,7 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat;
class PluginListCommandTest {
private static final String PLUGIN_TEMPLATE_TEST = "plugin-template-test-0.18.0-SNAPSHOT.jar";
private static final String PLUGIN_TEMPLATE_TEST = "plugin-template-test-0.24.0-SNAPSHOT.jar";
@Test
void shouldListPluginsInstalledLocally() throws IOException, URISyntaxException {

View File

@@ -0,0 +1,33 @@
package io.kestra.cli.commands.servers;
import static org.assertj.core.api.Assertions.assertThat;
import io.kestra.cli.App;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import org.junit.jupiter.api.Test;
public class TenantIdSelectorServiceTest {
@Test
void should_fail_without_tenant_id() {
ByteArrayOutputStream err = new ByteArrayOutputStream();
System.setErr(new PrintStream(err));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
String[] start = {
"server", "standalone",
"-f", "unused",
"--tenant", "wrong_tenant"
};
PicocliRunner.call(App.class, ctx, start);
assertThat(err.toString()).contains("Tenant id can only be 'main'");
err.reset();
}
}
}

View File

@@ -118,12 +118,13 @@ public class JsonSchemaGenerator {
if (jsonNode instanceof ObjectNode clazzSchema && clazzSchema.get("required") instanceof ArrayNode requiredPropsNode && clazzSchema.get("properties") instanceof ObjectNode properties) {
List<String> requiredFieldValues = StreamSupport.stream(requiredPropsNode.spliterator(), false)
.map(JsonNode::asText)
.toList();
.collect(Collectors.toList());
properties.fields().forEachRemaining(e -> {
int indexInRequiredArray = requiredFieldValues.indexOf(e.getKey());
if (indexInRequiredArray != -1 && e.getValue() instanceof ObjectNode valueNode && valueNode.has("default")) {
requiredPropsNode.remove(indexInRequiredArray);
requiredFieldValues.remove(indexInRequiredArray);
}
});

View File

@@ -284,7 +284,7 @@ public class HttpClient implements Closeable {
} else if (cls.isAssignableFrom(Byte[].class)) {
return (T) ArrayUtils.toObject(EntityUtils.toByteArray(entity));
} else {
return (T) JacksonMapper.ofJson().readValue(entity.getContent(), cls);
return (T) JacksonMapper.ofJson(false).readValue(entity.getContent(), cls);
}
}

View File

@@ -1,11 +1,14 @@
package io.kestra.core.metrics;
import io.kestra.core.models.ServerType;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.micronaut.configuration.metrics.aggregator.MeterRegistryConfigurer;
import io.micronaut.context.annotation.Requires;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import io.micronaut.context.annotation.Value;
import io.micronaut.core.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@@ -15,20 +18,26 @@ public class GlobalTagsConfigurer implements MeterRegistryConfigurer<SimpleMeter
@Inject
MetricConfig metricConfig;
@Nullable
@Value("${kestra.server-type}")
ServerType serverType;
@Override
public void configure(SimpleMeterRegistry meterRegistry) {
if (metricConfig.getTags() != null) {
meterRegistry
.config()
.commonTags(
metricConfig.getTags()
.entrySet()
.stream()
.flatMap(e -> Stream.of(e.getKey(), e.getValue()))
.toList()
.toArray(String[]::new)
);
}
String[] tags = Stream
.concat(
metricConfig.getTags() != null ? metricConfig.getTags()
.entrySet()
.stream()
.flatMap(e -> Stream.of(e.getKey(), e.getValue())) : Stream.empty(),
serverType != null ? Stream.of("server_type", serverType.name()) : Stream.empty()
)
.toList()
.toArray(String[]::new);
meterRegistry
.config()
.commonTags(tags);
}
@Override

View File

@@ -1,11 +1,10 @@
package io.kestra.core.models;
import io.kestra.core.utils.MapUtils;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
public record Label(@NotNull String key, @NotNull String value) {
@@ -29,11 +28,36 @@ public record Label(@NotNull String key, @NotNull String value) {
* @return the nested {@link Map}.
*/
public static Map<String, Object> toNestedMap(List<Label> labels) {
Map<String, Object> asMap = labels.stream()
return MapUtils.flattenToNestedMap(toMap(labels));
}
/**
* Static helper method for converting a list of labels to a flat map.
* Key order is kept.
*
* @param labels The list of {@link Label} to be converted.
* @return the flat {@link Map}.
*/
public static Map<String, String> toMap(@Nullable List<Label> labels) {
if (labels == null || labels.isEmpty()) return Collections.emptyMap();
return labels.stream()
.filter(label -> label.value() != null && label.key() != null)
// using an accumulator in case labels with the same key exists: the first is kept
.collect(Collectors.toMap(Label::key, Label::value, (first, second) -> first));
return MapUtils.flattenToNestedMap(asMap);
// using an accumulator in case labels with the same key exists: the second is kept
.collect(Collectors.toMap(Label::key, Label::value, (first, second) -> second, LinkedHashMap::new));
}
/**
* Static helper method for deduplicating a list of labels by their key.
* Value of the last key occurrence is kept.
*
* @param labels The list of {@link Label} to be deduplicated.
* @return the deduplicated {@link List}.
*/
public static List<Label> deduplicate(@Nullable List<Label> labels) {
if (labels == null || labels.isEmpty()) return Collections.emptyList();
return toMap(labels).entrySet().stream()
.map(entry -> new Label(entry.getKey(), entry.getValue()))
.collect(Collectors.toCollection(ArrayList::new));
}
/**

View File

@@ -125,7 +125,7 @@ public record QueryFilter(
END_DATE("endDate") {
@Override
public List<Op> supportedOp() {
return List.of(Op.GREATER_THAN, Op.LESS_THAN, Op.EQUALS, Op.NOT_EQUALS);
return List.of(Op.GREATER_THAN_OR_EQUAL_TO, Op.GREATER_THAN, Op.LESS_THAN_OR_EQUAL_TO, Op.LESS_THAN, Op.EQUALS, Op.NOT_EQUALS);
}
},
STATE("state") {

View File

@@ -15,6 +15,8 @@ import jakarta.validation.constraints.NotNull;
@NoArgsConstructor
public class Setting {
public static final String INSTANCE_UUID = "instance.uuid";
public static final String INSTANCE_VERSION = "instance.version";
@NotNull
private String key;

View File

@@ -24,6 +24,7 @@ import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
import io.kestra.core.services.LabelService;
import io.kestra.core.test.flow.TaskFixture;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.MapUtils;
import io.swagger.v3.oas.annotations.Hidden;
import jakarta.annotation.Nullable;
@@ -132,7 +133,7 @@ public class Execution implements DeletedInterface, TenantInterface {
}
public List<Label> getLabels() {
return Optional.ofNullable(this.labels).orElse(new ArrayList<>());
return ListUtils.emptyOnNull(this.labels);
}
/**
@@ -156,6 +157,7 @@ public class Execution implements DeletedInterface, TenantInterface {
.flowRevision(flow.getRevision())
.state(new State())
.scheduleDate(scheduleDate.map(ChronoZonedDateTime::toInstant).orElse(null))
.variables(flow.getVariables())
.build();
List<Label> executionLabels = new ArrayList<>(LabelService.labelsExcludingSystem(flow));
@@ -176,8 +178,22 @@ public class Execution implements DeletedInterface, TenantInterface {
}
/**
* Customization of Lombok-generated builder.
*/
public static class ExecutionBuilder {
/**
* Enforce unique values of {@link Label} when using the builder.
*
* @param labels The labels.
* @return Deduplicated labels.
*/
public ExecutionBuilder labels(List<Label> labels) {
this.labels = Label.deduplicate(labels);
return this;
}
void prebuild() {
this.originalId = this.id;
this.metadata = ExecutionMetadata.builder()
@@ -225,7 +241,6 @@ public class Execution implements DeletedInterface, TenantInterface {
}
public Execution withLabels(List<Label> labels) {
return new Execution(
this.tenantId,
this.id,
@@ -235,7 +250,7 @@ public class Execution implements DeletedInterface, TenantInterface {
this.taskRunList,
this.inputs,
this.outputs,
labels,
Label.deduplicate(labels),
this.variables,
this.state,
this.parentId,
@@ -365,7 +380,7 @@ public class Execution implements DeletedInterface, TenantInterface {
*
* @param resolvedTasks normal tasks
* @param resolvedErrors errors tasks
* @param resolvedErrors finally tasks
* @param resolvedFinally finally tasks
* @return the flow we need to follow
*/
public List<ResolvedTask> findTaskDependingFlowState(
@@ -991,6 +1006,16 @@ public class Execution implements DeletedInterface, TenantInterface {
return result;
}
/**
* Find all children of this {@link TaskRun}.
*/
public List<TaskRun> findChildren(TaskRun parentTaskRun) {
return taskRunList.stream()
.filter(taskRun -> parentTaskRun.getId().equals(taskRun.getParentTaskRunId()))
.toList();
}
public List<String> findParentsValues(TaskRun taskRun, boolean withCurrent) {
return (withCurrent ?
Stream.concat(findParents(taskRun).stream(), Stream.of(taskRun)) :

View File

@@ -7,6 +7,7 @@ import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.utils.IdUtils;
import io.swagger.v3.oas.annotations.Hidden;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import lombok.*;
@@ -62,6 +63,11 @@ public class TaskRun implements TenantInterface {
@With
Boolean dynamic;
// Set it to true to force execution even if the execution is killed
@Nullable
@With
Boolean forceExecution;
@Deprecated
public void setItems(String items) {
// no-op for backward compatibility
@@ -81,7 +87,8 @@ public class TaskRun implements TenantInterface {
this.outputs,
this.state.withState(state),
this.iteration,
this.dynamic
this.dynamic,
this.forceExecution
);
}
@@ -99,7 +106,8 @@ public class TaskRun implements TenantInterface {
this.outputs,
newState,
this.iteration,
this.dynamic
this.dynamic,
this.forceExecution
);
}
@@ -121,7 +129,8 @@ public class TaskRun implements TenantInterface {
this.outputs,
this.state.withState(State.Type.FAILED),
this.iteration,
this.dynamic
this.dynamic,
this.forceExecution
);
}

View File

@@ -1,20 +1,19 @@
package io.kestra.core.models.flows;
import io.micronaut.core.annotation.Introspected;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Positive;
@SuperBuilder
@Getter
@NoArgsConstructor
@Introspected
public class Concurrency {
@Positive
@Min(1)
@NotNull
private Integer limit;

View File

@@ -19,7 +19,6 @@ import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.services.FlowService;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.validations.FlowValidation;
import io.micronaut.core.annotation.Introspected;
@@ -30,8 +29,6 @@ import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.stream.Collectors;
@@ -187,19 +184,32 @@ public class Flow extends AbstractFlow implements HasUID {
.toList();
}
public List<Task> allErrorsWithChilds() {
public List<Task> allErrorsWithChildren() {
var allErrors = allTasksWithChilds().stream()
.filter(task -> task.isFlowable() && ((FlowableTask<?>) task).getErrors() != null)
.flatMap(task -> ((FlowableTask<?>) task).getErrors().stream())
.collect(Collectors.toCollection(ArrayList::new));
if (this.getErrors() != null && !this.getErrors().isEmpty()) {
if (!ListUtils.isEmpty(this.getErrors())) {
allErrors.addAll(this.getErrors());
}
return allErrors;
}
public List<Task> allFinallyWithChildren() {
var allFinally = allTasksWithChilds().stream()
.filter(task -> task.isFlowable() && ((FlowableTask<?>) task).getFinally() != null)
.flatMap(task -> ((FlowableTask<?>) task).getFinally().stream())
.collect(Collectors.toCollection(ArrayList::new));
if (!ListUtils.isEmpty(this.getFinally())) {
allFinally.addAll(this.getFinally());
}
return allFinally;
}
public Task findParentTasksByTaskId(String taskId) {
return allTasksWithChilds()
.stream()

View File

@@ -3,6 +3,7 @@ package io.kestra.core.models.flows;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.kestra.core.models.tasks.Task;
import io.micronaut.core.annotation.Introspected;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
@@ -115,7 +116,7 @@ public class State {
}
public Instant maxDate() {
if (this.histories.size() == 0) {
if (this.histories.isEmpty()) {
return Instant.now();
}
@@ -123,7 +124,7 @@ public class State {
}
public Instant minDate() {
if (this.histories.size() == 0) {
if (this.histories.isEmpty()) {
return Instant.now();
}
@@ -167,6 +168,11 @@ public class State {
return this.current.isPaused();
}
@JsonIgnore
public boolean isQueued() {
return this.current.isQueued();
}
@JsonIgnore
public boolean isRetrying() {
return this.current.isRetrying();
@@ -200,6 +206,14 @@ public class State {
return this.histories.get(this.histories.size() - 2).state.isPaused();
}
/**
* Return true if the execution has failed, then was restarted.
* This is to disambiguate between a RESTARTED after PAUSED and RESTARTED after FAILED state.
*/
public boolean failedThenRestarted() {
return this.current == Type.RESTARTED && this.histories.get(this.histories.size() - 2).state.isFailed();
}
@Introspected
public enum Type {
CREATED,
@@ -253,9 +267,26 @@ public class State {
return this == Type.KILLED;
}
public boolean isQueued(){
return this == Type.QUEUED;
}
/**
* @return states that are terminal to an execution
*/
public static List<Type> terminatedTypes() {
return Stream.of(Type.values()).filter(type -> type.isTerminated()).toList();
}
/**
* Compute the final 'failure' of a task depending on <code>allowFailure</code> and <code>allowWarning</code>:
* - if both are true -> SUCCESS
* - if only <code>allowFailure</code> is true -> WARNING
* - if none -> FAILED
*/
public static State.Type fail(Task task) {
return task.isAllowFailure() ? (task.isAllowWarning() ? State.Type.SUCCESS : State.Type.WARNING) : State.Type.FAILED;
}
}
@Value

View File

@@ -108,7 +108,7 @@ public class MultiselectInput extends Input<List<String>> implements ItemTypeInt
private List<String> renderExpressionValues(final Function<String, Object> renderer) {
Object result;
try {
result = renderer.apply(expression);
result = renderer.apply(expression.trim());
} catch (Exception e) {
throw ManualConstraintViolation.toConstraintViolationException(
"Cannot render 'expression'. Cause: " + e.getMessage(),

View File

@@ -86,7 +86,7 @@ public class SelectInput extends Input<String> implements RenderableInput {
private List<String> renderExpressionValues(final Function<String, Object> renderer) {
Object result;
try {
result = renderer.apply(expression);
result = renderer.apply(expression.trim());
} catch (Exception e) {
throw ManualConstraintViolation.toConstraintViolationException(
"Cannot render 'expression'. Cause: " + e.getMessage(),

View File

@@ -68,6 +68,19 @@ public class Property<T> {
String getExpression() {
return expression;
}
/**
* Returns a new {@link Property} with no cached rendered value,
* so that the next render will evaluate its original Pebble expression.
* <p>
* The returned property will still cache its rendered result.
* To re-evaluate on a subsequent render, call {@code skipCache()} again.
*
* @return a new {@link Property} without a pre-rendered value
*/
public Property<T> skipCache() {
return Property.ofExpression(expression);
}
/**
* Build a new Property object with a value already set.<br>

View File

@@ -30,7 +30,7 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
* Helper class for task runners and script tasks.
*/
public final class ScriptService {
private static final Pattern INTERNAL_STORAGE_PATTERN = Pattern.compile("(kestra:\\/\\/[-a-zA-Z0-9%._\\+~#=/]*)");
private static final Pattern INTERNAL_STORAGE_PATTERN = Pattern.compile("(kestra:\\/\\/[-\\p{Alnum}._\\+~#=/]*)", Pattern.UNICODE_CHARACTER_CLASS);
// These are the three common additional variables task runners must provide for variable rendering.
public static final String VAR_WORKING_DIR = "workingDir";

View File

@@ -329,6 +329,14 @@ public class DefaultPluginRegistry implements PluginRegistry {
pluginClassByIdentifier.clear();
}
/**
* {@inheritDoc}
**/
@Override
public boolean isVersioningSupported() {
return false;
}
public record PluginBundleIdentifier(@Nullable URL location) {
public static PluginBundleIdentifier CORE = new PluginBundleIdentifier(null);

View File

@@ -151,7 +151,7 @@ public class LocalPluginManager implements PluginManager {
* {@inheritDoc}
**/
@Override
public PluginArtifact install(File file, boolean installForRegistration, @Nullable Path localRepositoryPath) {
public PluginArtifact install(File file, boolean installForRegistration, @Nullable Path localRepositoryPath, boolean forceInstallOnExistingVersions) {
try {
PluginArtifact artifact = PluginArtifact.fromFile(file);
log.info("Installing managed plugin artifact '{}'", artifact);

View File

@@ -55,14 +55,16 @@ public interface PluginManager extends AutoCloseable {
/**
* Installs the given plugin artifact.
*
* @param file the plugin JAR file.
* @param installForRegistration specify whether plugin artifacts should be scanned and registered.
* @param localRepositoryPath the optional local repository path to install artifact.
* @param file the plugin JAR file.
* @param installForRegistration specify whether plugin artifacts should be scanned and registered.
* @param localRepositoryPath the optional local repository path to install artifact.
* @param forceInstallOnExistingVersions specify whether plugin should be forced install upon the existing one
* @return The URI of the installed plugin.
*/
PluginArtifact install(final File file,
boolean installForRegistration,
@Nullable Path localRepositoryPath);
@Nullable Path localRepositoryPath,
boolean forceInstallOnExistingVersions);
/**
* Installs the given plugin artifact.

View File

@@ -116,4 +116,11 @@ public interface PluginRegistry {
default void clear() {
}
/**
* Checks whether plugin-versioning is supported by this registry.
*
* @return {@code true} if supported. Otherwise {@code false}.
*/
boolean isVersioningSupported();
}

View File

@@ -18,9 +18,11 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystemNotFoundException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -202,19 +204,13 @@ public class PluginScanner {
var guidesDirectory = classLoader.getResource("doc/guides");
if (guidesDirectory != null) {
try (var fileSystem = FileSystems.newFileSystem(guidesDirectory.toURI(), Collections.emptyMap())) {
var root = fileSystem.getPath("/doc/guides");
try (var stream = Files.walk(root, 1)) {
stream
.skip(1) // first element is the root element
.sorted(Comparator.comparing(path -> path.getName(path.getParent().getNameCount()).toString()))
.forEach(guide -> {
var guideName = guide.getName(guide.getParent().getNameCount()).toString();
guides.add(guideName.substring(0, guideName.lastIndexOf('.')));
});
}
try {
var root = Path.of(guidesDirectory.toURI());
addGuides(root, guides);
} catch (IOException | URISyntaxException e) {
// silently fail
} catch (FileSystemNotFoundException e) {
addGuidesThroughNewFileSystem(guidesDirectory, guides);
}
}
@@ -243,6 +239,27 @@ public class PluginScanner {
.build();
}
private static void addGuidesThroughNewFileSystem(URL guidesDirectory, List<String> guides) {
try (var fileSystem = FileSystems.newFileSystem(guidesDirectory.toURI(), Collections.emptyMap())) {
var root = fileSystem.getPath("doc/guides");
addGuides(root, guides);
} catch (IOException | URISyntaxException e) {
// silently fail
}
}
private static void addGuides(Path root, List<String> guides) throws IOException {
try (var stream = Files.walk(root, 1)) {
stream
.skip(1) // first element is the root element
.sorted(Comparator.comparing(path -> path.getName(path.getParent().getNameCount()).toString()))
.forEach(guide -> {
var guideName = guide.getName(guide.getParent().getNameCount()).toString();
guides.add(guideName.substring(0, guideName.lastIndexOf('.')));
});
}
}
public static Manifest getManifest(ClassLoader classLoader) {
try {
URL url = classLoader.getResource(JarFile.MANIFEST_NAME);

View File

@@ -86,7 +86,7 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
DeserializationContext context) throws IOException {
Class<? extends Plugin> pluginType = null;
final String identifier = extractPluginRawIdentifier(node);
final String identifier = extractPluginRawIdentifier(node, pluginRegistry.isVersioningSupported());
if (identifier != null) {
log.trace("Looking for Plugin for: {}",
identifier
@@ -103,7 +103,7 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
);
if (DataChart.class.isAssignableFrom(pluginType)) {
final Class<? extends Plugin> dataFilterClass = pluginRegistry.findClassByIdentifier(extractPluginRawIdentifier(node.get("data")));
final Class<? extends Plugin> dataFilterClass = pluginRegistry.findClassByIdentifier(extractPluginRawIdentifier(node.get("data"), pluginRegistry.isVersioningSupported()));
ParameterizedType genericDataFilterClass = (ParameterizedType) dataFilterClass.getGenericSuperclass();
Type dataFieldsEnum = genericDataFilterClass.getActualTypeArguments()[0];
TypeFactory typeFactory = JacksonMapper.ofJson().getTypeFactory();
@@ -142,7 +142,7 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
);
}
static String extractPluginRawIdentifier(final JsonNode node) {
static String extractPluginRawIdentifier(final JsonNode node, final boolean isVersioningSupported) {
String type = Optional.ofNullable(node.get(TYPE)).map(JsonNode::textValue).orElse(null);
String version = Optional.ofNullable(node.get(VERSION)).map(JsonNode::textValue).orElse(null);
@@ -150,6 +150,6 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
return null;
}
return version != null && !version.isEmpty() ? type + ":" + version : type;
return isVersioningSupported && version != null && !version.isEmpty() ? type + ":" + version : type;
}
}

View File

@@ -28,6 +28,7 @@ public interface QueueFactoryInterface {
String SUBFLOWEXECUTIONRESULT_NAMED = "subflowExecutionResultQueue";
String CLUSTER_EVENT_NAMED = "clusterEventQueue";
String SUBFLOWEXECUTIONEND_NAMED = "subflowExecutionEndQueue";
String EXECUTION_RUNNING_NAMED = "executionRunningQueue";
QueueInterface<Execution> execution();
@@ -62,4 +63,6 @@ public interface QueueFactoryInterface {
QueueInterface<SubflowExecutionResult> subflowExecutionResult();
QueueInterface<SubflowExecutionEnd> subflowExecutionEnd();
QueueInterface<ExecutionRunning> executionRunning();
}

View File

@@ -5,6 +5,7 @@ import io.kestra.core.models.Pauseable;
import io.kestra.core.utils.Either;
import java.io.Closeable;
import java.util.List;
import java.util.function.Consumer;
public interface QueueInterface<T> extends Closeable, Pauseable {
@@ -18,7 +19,15 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
emitAsync(null, message);
}
void emitAsync(String consumerGroup, T message) throws QueueException;
default void emitAsync(String consumerGroup, T message) throws QueueException {
emitAsync(consumerGroup, List.of(message));
}
default void emitAsync(List<T> messages) throws QueueException {
emitAsync(null, messages);
}
void emitAsync(String consumerGroup, List<T> messages) throws QueueException;
default void delete(T message) throws QueueException {
delete(null, message);

View File

@@ -27,8 +27,6 @@ public class QueueService {
return ((Executor) object).getExecution().getId();
} else if (object.getClass() == MetricEntry.class) {
return null;
} else if (object.getClass() == ExecutionRunning.class) {
return ((ExecutionRunning) object).getExecution().getId();
} else if (object.getClass() == SubflowExecutionEnd.class) {
return ((SubflowExecutionEnd) object).getParentExecutionId();
} else {

View File

@@ -0,0 +1,12 @@
package io.kestra.core.queues;
import java.io.Serial;
public class UnsupportedMessageException extends QueueException {
@Serial
private static final long serialVersionUID = 1L;
public UnsupportedMessageException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -1,5 +1,6 @@
package io.kestra.core.runners;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.utils.IdUtils;
import jakarta.validation.constraints.NotNull;
@@ -11,7 +12,7 @@ import lombok.With;
@Value
@AllArgsConstructor
@Builder
public class ExecutionRunning {
public class ExecutionRunning implements HasUID {
String tenantId;
@NotNull
@@ -26,6 +27,7 @@ public class ExecutionRunning {
@With
ConcurrencyState concurrencyState;
@Override
public String uid() {
return IdUtils.fromPartsAndSeparator('|', this.tenantId, this.namespace, this.flowId, this.execution.getId());
}

View File

@@ -85,7 +85,8 @@ public class Executor {
}
public Boolean canBeProcessed() {
return !(this.getException() != null || this.getFlow() == null || this.getFlow() instanceof FlowWithException || this.getFlow().getTasks() == null || this.getExecution().isDeleted() || this.getExecution().getState().isPaused());
return !(this.getException() != null || this.getFlow() == null || this.getFlow() instanceof FlowWithException || this.getFlow().getTasks() == null ||
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isQueued());
}
public Executor withFlow(FlowWithSource flow) {

View File

@@ -67,6 +67,9 @@ public class ExecutorService {
@Inject
private WorkerGroupExecutorInterface workerGroupExecutorInterface;
@Inject
private WorkerJobRunningStateStore workerJobRunningStateStore;
protected FlowMetaStoreInterface flowExecutorInterface;
@Inject
@@ -97,49 +100,39 @@ public class ExecutorService {
return this.flowExecutorInterface;
}
public Executor checkConcurrencyLimit(Executor executor, FlowInterface flow, Execution execution, long count) {
// if above the limit, handle concurrency limit based on its behavior
if (count >= flow.getConcurrency().getLimit()) {
public ExecutionRunning processExecutionRunning(FlowInterface flow, int runningCount, ExecutionRunning executionRunning) {
// if concurrency was removed, it can be null as we always get the latest flow definition
if (flow.getConcurrency() != null && runningCount >= flow.getConcurrency().getLimit()) {
return switch (flow.getConcurrency().getBehavior()) {
case QUEUE -> {
var newExecution = execution.withState(State.Type.QUEUED);
ExecutionRunning executionRunning = ExecutionRunning.builder()
.tenantId(flow.getTenantId())
.namespace(flow.getNamespace())
.flowId(flow.getId())
.execution(newExecution)
.concurrencyState(ExecutionRunning.ConcurrencyState.QUEUED)
.build();
// when max concurrency is reached, we throttle the execution and stop processing
logService.logExecution(
newExecution,
executionRunning.getExecution(),
Level.INFO,
"Flow is queued due to concurrency limit exceeded, {} running(s)",
count
"Execution is queued due to concurrency limit exceeded, {} running(s)",
runningCount
);
// return the execution queued
yield executor
.withExecutionRunning(executionRunning)
.withExecution(newExecution, "checkConcurrencyLimit");
var newExecution = executionRunning.getExecution().withState(State.Type.QUEUED);
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_QUEUED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_QUEUED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
yield executionRunning
.withExecution(newExecution)
.withConcurrencyState(ExecutionRunning.ConcurrencyState.QUEUED);
}
case CANCEL ->
executor.withExecution(execution.withState(State.Type.CANCELLED), "checkConcurrencyLimit");
executionRunning
.withExecution(executionRunning.getExecution().withState(State.Type.CANCELLED))
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
case FAIL ->
executor.withException(new IllegalStateException("Flow is FAILED due to concurrency limit exceeded"), "checkConcurrencyLimit");
executionRunning
.withExecution(executionRunning.getExecution().failedExecutionFromExecutor(new IllegalStateException("Execution is FAILED due to concurrency limit exceeded")).getExecution())
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
};
}
// if under the limit, update the executor with a RUNNING ExecutionRunning to track them
var executionRunning = new ExecutionRunning(
flow.getTenantId(),
flow.getNamespace(),
flow.getId(),
executor.getExecution(),
ExecutionRunning.ConcurrencyState.RUNNING
);
return executor.withExecutionRunning(executionRunning);
// if under the limit, run it!
return executionRunning
.withExecution(executionRunning.getExecution().withState(State.Type.RUNNING))
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
}
public Executor process(Executor executor) {
@@ -242,9 +235,9 @@ public class ExecutorService {
try {
state = flowableParent.resolveState(runContext, execution, parentTaskRun);
} catch (Exception e) {
// This will lead to the next task being still executed but at least Kestra will not crash.
// This will lead to the next task being still executed, but at least Kestra will not crash.
// This is the best we can do, Flowable task should not fail, so it's a kind of panic mode.
runContext.logger().error("Unable to resolve state from the Flowable task: " + e.getMessage(), e);
runContext.logger().error("Unable to resolve state from the Flowable task: {}", e.getMessage(), e);
state = Optional.of(State.Type.FAILED);
}
Optional<WorkerTaskResult> endedTask = childWorkerTaskTypeToWorkerTask(
@@ -592,6 +585,23 @@ public class ExecutorService {
list = list.stream().filter(workerTaskResult -> !workerTaskResult.getTaskRun().getId().equals(taskRun.getParentTaskRunId()))
.collect(Collectors.toCollection(ArrayList::new));
}
// If the task is a flowable and its terminated, check that all children are terminated.
// This may not be the case for parallel flowable tasks like Parallel, Dag, ForEach...
// After a fail task, some child flowable may not be correctly terminated.
if (task instanceof FlowableTask<?> && taskRun.getState().isTerminated()) {
List<TaskRun> updated = executor.getExecution().findChildren(taskRun).stream()
.filter(child -> !child.getState().isTerminated())
.map(throwFunction(child -> child.withState(taskRun.getState().getCurrent())))
.toList();
if (!updated.isEmpty()) {
Execution execution = executor.getExecution();
for (TaskRun child : updated) {
execution = execution.withTaskRun(child);
}
executor = executor.withExecution(execution, "handledTerminatedFlowableTasks");
}
}
}
metricRegistry
@@ -664,7 +674,7 @@ public class ExecutorService {
.taskRunId(workerTaskResult.getTaskRun().getId())
.executionId(executor.getExecution().getId())
.date(workerTaskResult.getTaskRun().getState().maxDate().plus(duration != null ? duration : timeout))
.state(duration != null ? behavior.mapToState() : State.Type.FAILED)
.state(duration != null ? behavior.mapToState() : State.Type.fail(pauseTask))
.delayType(ExecutionDelay.DelayType.RESUME_FLOW)
.build();
}
@@ -732,6 +742,7 @@ public class ExecutorService {
List<TaskRun> afterExecutionNexts = FlowableUtils.resolveSequentialNexts(executor.getExecution(), afterExecutionResolvedTasks)
.stream()
.map(throwFunction(NextTaskRun::getTaskRun))
.map(taskRun -> taskRun.withForceExecution(true)) // forceExecution so it would be executed even if the execution is killed
.toList();
if (!afterExecutionNexts.isEmpty()) {
return executor.withTaskRun(afterExecutionNexts, "handleAfterExecution ");
@@ -1072,6 +1083,25 @@ public class ExecutorService {
newExecution = executionService.killParentTaskruns(taskRun, newExecution);
}
executor.withExecution(newExecution, "addWorkerTaskResult");
if (taskRun.getState().isTerminated()) {
log.trace("TaskRun terminated: {}", taskRun);
workerJobRunningStateStore.deleteByKey(taskRun.getId());
metricRegistry
.counter(
MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_COUNT,
MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_COUNT_DESCRIPTION,
metricRegistry.tags(workerTaskResult)
)
.increment();
metricRegistry
.timer(
MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_DURATION,
MetricRegistry.METRIC_EXECUTOR_TASKRUN_ENDED_DURATION_DESCRIPTION,
metricRegistry.tags(workerTaskResult)
)
.record(taskRun.getState().getDuration());
}
}
// Note: as the flow is only used in an error branch and it can take time to load, we pass it thought a Supplier

View File

@@ -286,18 +286,10 @@ public class FlowableUtils {
// start as many tasks as we have concurrency slots
return collect.values().stream()
.map(resolvedTasks -> filterCreated(resolvedTasks, taskRuns, parentTaskRun))
.map(resolvedTasks -> resolveSequentialNexts(execution, resolvedTasks, null, null, parentTaskRun))
.filter(resolvedTasks -> !resolvedTasks.isEmpty())
.limit(concurrencySlots)
.map(resolvedTasks -> resolvedTasks.getFirst().toNextTaskRun(execution))
.toList();
}
private static List<ResolvedTask> filterCreated(List<ResolvedTask> tasks, List<TaskRun> taskRuns, TaskRun parentTaskRun) {
return tasks.stream()
.filter(resolvedTask -> taskRuns.stream()
.noneMatch(taskRun -> FlowableUtils.isTaskRunFor(resolvedTask, taskRun, parentTaskRun))
)
.map(resolvedTasks -> resolvedTasks.getFirst())
.toList();
}

View File

@@ -29,7 +29,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
public class RunContextLogger implements Supplier<org.slf4j.Logger> {
private static final int MAX_MESSAGE_LENGTH = 1024 * 10;
private static final int MAX_MESSAGE_LENGTH = 1024 * 15;
public static final String ORIGINAL_TIMESTAMP_KEY = "originalTimestamp";
private final String loggerName;
@@ -80,7 +80,6 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
}
List<LogEntry> result = new ArrayList<>();
long i = 0;
for (String s : split) {
result.add(LogEntry.builder()
.namespace(logEntry.getNamespace())
@@ -98,7 +97,6 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
.thread(event.getThreadName())
.build()
);
i++;
}
return result;
@@ -331,14 +329,11 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
protected void append(ILoggingEvent e) {
e = this.transform(e);
logEntries(e, logEntry)
.forEach(l -> {
try {
logQueue.emitAsync(l);
} catch (QueueException ex) {
log.warn("Unable to emit logQueue", ex);
}
});
try {
logQueue.emitAsync(logEntries(e, logEntry));
} catch (QueueException ex) {
log.warn("Unable to emit logQueue", ex);
}
}
}

View File

@@ -4,15 +4,11 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.Validator;
import lombok.extern.slf4j.Slf4j;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static io.kestra.core.utils.Rethrow.throwFunction;
@@ -27,12 +23,19 @@ public class RunContextProperty<T> {
private final RunContext runContext;
private final Task task;
private final AbstractTrigger trigger;
private final boolean skipCache;
RunContextProperty(Property<T> property, RunContext runContext) {
this(property, runContext, false);
}
RunContextProperty(Property<T> property, RunContext runContext, boolean skipCache) {
this.property = property;
this.runContext = runContext;
this.task = ((DefaultRunContext) runContext).getTask();
this.trigger = ((DefaultRunContext) runContext).getTrigger();
this.skipCache = skipCache;
}
private void validate() {
@@ -45,6 +48,19 @@ public class RunContextProperty<T> {
log.trace("Unable to do validation: no task or trigger found");
}
}
/**
* Returns a new {@link RunContextProperty} that will always be rendered by evaluating
* its original Pebble expression, without using any previously cached value.
* <p>
* This ensures that each time the property is rendered, the underlying
* expression is re-evaluated to produce a fresh result.
*
* @return a new {@link Property} that bypasses the cache
*/
public RunContextProperty<T> skipCache() {
return new RunContextProperty<>(this.property, this.runContext, true);
}
/**
* Render a property then convert it to its target type and validate it.<br>
@@ -55,13 +71,13 @@ public class RunContextProperty<T> {
* Warning, due to the caching mechanism, this method is not thread-safe.
*/
public Optional<T> as(Class<T> clazz) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.as(prop, this.runContext, clazz)));
validate();
return as;
}
/**
* Render a property with additional variables, then convert it to its target type and validate it.<br>
*
@@ -71,7 +87,7 @@ public class RunContextProperty<T> {
* Warning, due to the caching mechanism, this method is not thread-safe.
*/
public Optional<T> as(Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.as(prop, this.runContext, clazz, variables)));
validate();
@@ -89,7 +105,7 @@ public class RunContextProperty<T> {
*/
@SuppressWarnings("unchecked")
public <I> T asList(Class<I> itemClazz) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz)))
.orElse((T) Collections.emptyList());
@@ -108,7 +124,7 @@ public class RunContextProperty<T> {
*/
@SuppressWarnings("unchecked")
public <I> T asList(Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz, variables)))
.orElse((T) Collections.emptyList());
@@ -127,7 +143,7 @@ public class RunContextProperty<T> {
*/
@SuppressWarnings("unchecked")
public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass)))
.orElse((T) Collections.emptyMap());
@@ -146,11 +162,15 @@ public class RunContextProperty<T> {
*/
@SuppressWarnings("unchecked")
public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass, variables)))
.orElse((T) Collections.emptyMap());
validate();
return as;
}
private Property<T> getProperty() {
return skipCache ? this.property.skipCache() : this.property;
}
}

View File

@@ -12,6 +12,7 @@ import io.kestra.core.models.flows.input.SecretInput;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.utils.ListUtils;
import io.kestra.plugin.core.trigger.Schedule;
import lombok.AllArgsConstructor;
import lombok.With;
@@ -27,6 +28,7 @@ import java.util.function.Consumer;
*/
public final class RunVariables {
public static final String SECRET_CONSUMER_VARIABLE_NAME = "addSecretConsumer";
public static final String FIXTURE_FILES_KEY = "io.kestra.datatype:test_fixtures_files";
/**
* Creates an immutable map representation of the given {@link Task}.
@@ -181,9 +183,6 @@ public final class RunVariables {
// Flow
if (flow != null) {
builder.put("flow", RunVariables.of(flow));
if (flow.getVariables() != null) {
builder.put("vars", flow.getVariables());
}
}
// Task
@@ -298,16 +297,19 @@ public final class RunVariables {
if (execution.getTrigger() != null && execution.getTrigger().getVariables() != null) {
builder.put("trigger", execution.getTrigger().getVariables());
// temporal hack to add back the `schedule`variables
// will be removed in 2.0
if (Schedule.class.getName().equals(execution.getTrigger().getType())) {
// add back its variables inside the `schedule` variables
builder.put("schedule", execution.getTrigger().getVariables());
}
}
if (execution.getLabels() != null) {
builder.put("labels", Label.toNestedMap(execution.getLabels()));
}
if (execution.getVariables() != null) {
builder.putAll(execution.getVariables());
}
if (flow == null) {
Flow flowFromExecution = Flow.builder()
.id(execution.getFlowId())
@@ -319,6 +321,20 @@ public final class RunVariables {
}
}
// variables
Optional.ofNullable(execution)
.map(Execution::getVariables)
.or(() -> Optional.ofNullable(flow).map(FlowInterface::getVariables))
.map(HashMap::new)
.ifPresent(variables -> {
Object fixtureFiles = variables.remove(FIXTURE_FILES_KEY);
builder.put("vars", ImmutableMap.copyOf(variables));
if (fixtureFiles != null) {
builder.put("files", fixtureFiles);
}
});
// Kestra configuration
if (kestraConfiguration != null) {
Map<String, String> kestra = HashMap.newHashMap(2);

View File

@@ -4,7 +4,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.metrics.MetricRegistry;
@@ -19,6 +18,7 @@ import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.server.*;
import io.kestra.core.services.LabelService;
import io.kestra.core.services.LogService;
import io.kestra.core.services.MaintenanceService;
import io.kestra.core.services.VariablesService;
import io.kestra.core.services.WorkerGroupService;
import io.kestra.core.storages.StorageContext;
@@ -158,6 +158,9 @@ public class Worker implements Service, Runnable, AutoCloseable {
private TracerFactory tracerFactory;
private Tracer tracer;
@Inject
private MaintenanceService maintenanceService;
/**
* Creates a new {@link Worker} instance.
*
@@ -285,8 +288,12 @@ public class Worker implements Service, Runnable, AutoCloseable {
));
this.clusterEventQueue.ifPresent(clusterEventQueueInterface -> this.receiveCancellations.addFirst(clusterEventQueueInterface.receive(this::clusterEventQueue)));
if (this.maintenanceService.isInMaintenanceMode()) {
enterMaintenance();
} else {
setState(ServiceState.RUNNING);
}
setState(ServiceState.RUNNING);
if (workerGroupKey != null) {
log.info("Worker started with {} thread(s) in group '{}'", numThreads, workerGroupKey);
}
@@ -304,21 +311,25 @@ public class Worker implements Service, Runnable, AutoCloseable {
ClusterEvent clusterEvent = either.getLeft();
log.info("Cluster event received: {}", clusterEvent);
switch (clusterEvent.eventType()) {
case MAINTENANCE_ENTER -> {
this.executionKilledQueue.pause();
this.workerJobQueue.pause();
this.setState(ServiceState.MAINTENANCE);
}
case MAINTENANCE_EXIT -> {
this.executionKilledQueue.resume();
this.workerJobQueue.resume();
this.setState(ServiceState.RUNNING);
}
case MAINTENANCE_ENTER -> enterMaintenance();
case MAINTENANCE_EXIT -> exitMaintenance();
}
}
private void enterMaintenance() {
this.executionKilledQueue.pause();
this.workerJobQueue.pause();
this.setState(ServiceState.MAINTENANCE);
}
private void exitMaintenance() {
this.executionKilledQueue.resume();
this.workerJobQueue.resume();
this.setState(ServiceState.RUNNING);
}
private void setState(final ServiceState state) {
this.state.set(state);
Map<String, Object> properties = new HashMap<>();
@@ -395,11 +406,16 @@ public class Worker implements Service, Runnable, AutoCloseable {
} catch (IllegalVariableEvaluationException e) {
RunContextLogger contextLogger = runContextLoggerFactory.create(currentWorkerTask);
contextLogger.logger().error("Failed evaluating runIf: {}", e.getMessage(), e);
try {
this.workerTaskResultQueue.emit(new WorkerTaskResult(workerTask.fail()));
} catch (QueueException ex) {
log.error("Unable to emit the worker task result for task {} taskrun {}", currentWorkerTask.getTask().getId(), currentWorkerTask.getTaskRun().getId(), e);
}
} catch (QueueException e) {
log.error("Unable to emit the worker task result for task {} taskrun {}", currentWorkerTask.getTask().getId(), currentWorkerTask.getTaskRun().getId(), e);
}
if (workerTaskResult.getTaskRun().getState().isFailed() && !currentWorkerTask.getTask().isAllowFailure()) {
if (workerTaskResult == null || workerTaskResult.getTaskRun().getState().isFailed() && !currentWorkerTask.getTask().isAllowFailure()) {
break;
}
@@ -624,7 +640,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
));
}
if (killedExecution.contains(workerTask.getTaskRun().getExecutionId())) {
if (! Boolean.TRUE.equals(workerTask.getTaskRun().getForceExecution()) && killedExecution.contains(workerTask.getTaskRun().getExecutionId())) {
WorkerTaskResult workerTaskResult = new WorkerTaskResult(workerTask.getTaskRun().withState(KILLED));
try {
this.workerTaskResultQueue.emit(workerTaskResult);
@@ -685,6 +701,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
workerTask = workerTask.withTaskRun(workerTask.getTaskRun().withState(state));
WorkerTaskResult workerTaskResult = new WorkerTaskResult(workerTask.getTaskRun(), dynamicTaskRuns);
this.workerTaskResultQueue.emit(workerTaskResult);
return workerTaskResult;
} catch (QueueException e) {
@@ -695,6 +712,10 @@ public class Worker implements Service, Runnable, AutoCloseable {
// If it's a message too big, we remove the outputs
failed = failed.withOutputs(Variables.empty());
}
if (e instanceof UnsupportedMessageException) {
// we expect the offending char is in the output so we remove it
failed = failed.withOutputs(Variables.empty());
}
WorkerTaskResult workerTaskResult = new WorkerTaskResult(failed);
RunContextLogger contextLogger = runContextLoggerFactory.create(workerTask);
contextLogger.logger().error("Unable to emit the worker task result to the queue: {}", e.getMessage(), e);
@@ -776,7 +797,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
if (!(workerTask.getTask() instanceof RunnableTask<?> task)) {
// This should never happen but better to deal with it than crashing the Worker
var state = workerTask.getTask().isAllowFailure() ? workerTask.getTask().isAllowWarning() ? SUCCESS : WARNING : FAILED;
var state = State.Type.fail(workerTask.getTask());
TaskRunAttempt attempt = TaskRunAttempt.builder()
.state(new io.kestra.core.models.flows.State().withState(state))
.workerId(this.id)

View File

@@ -0,0 +1,20 @@
package io.kestra.core.runners;
/**
* State store containing all workers' jobs in RUNNING state.
*
* @see WorkerJob
*/
public interface WorkerJobRunningStateStore {
/**
* Deletes a running worker job for the given key.
*
* <p>
* A key can be a {@link WorkerTask} Task Run ID.
* </p>
*
* @param key the key of the worker job to be deleted.
*/
void deleteByKey(String key);
}

View File

@@ -48,7 +48,7 @@ public class WorkerTask extends WorkerJob {
* @return this worker task, updated
*/
public TaskRun fail() {
var state = this.task.isAllowFailure() ? this.task.isAllowWarning() ? State.Type.SUCCESS : State.Type.WARNING : State.Type.FAILED;
var state = State.Type.fail(task);
return this.getTaskRun().withState(state);
}
}

View File

@@ -24,7 +24,7 @@ public class DateAddFilter extends AbstractDate implements Filter {
return null;
}
final Long amount = (Long) args.get("amount");
final Long amount = getAsLong(args.get("amount"), lineNumber, self);
final String unit = (String) args.get("unit");
final String timeZone = (String) args.get("timeZone");
final String existingFormat = (String) args.get("existingFormat");
@@ -36,4 +36,24 @@ public class DateAddFilter extends AbstractDate implements Filter {
return format(plus, args, context);
}
public static Long getAsLong(Object value, int lineNumber, PebbleTemplate self) {
if (value instanceof Long longValue) {
return longValue;
} else if (value instanceof Integer integerValue) {
return integerValue.longValue();
} else if (value instanceof Number numberValue) {
return numberValue.longValue();
} else if (value instanceof String stringValue) {
try {
return Long.parseLong(stringValue);
} catch (NumberFormatException e) {
throw new PebbleException(e, "%s can't be converted to long".formatted(stringValue),
lineNumber, self != null ? self.getName() : "Unknown");
}
}
throw new PebbleException(null, "Incorrect %s format, must be a number".formatted(value),
lineNumber, self != null ? self.getName() : "Unknown");
}
}

View File

@@ -30,10 +30,7 @@ import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.server.ServiceType;
import io.kestra.core.services.*;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.Either;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.*;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.core.util.CollectionUtils;
@@ -86,12 +83,15 @@ public abstract class AbstractScheduler implements Scheduler, Service {
private final LogService logService;
protected SchedulerExecutionStateInterface executionState;
private final WorkerGroupExecutorInterface workerGroupExecutorInterface;
private final MaintenanceService maintenanceService;
// must be volatile as it's updated by the flow listener thread and read by the scheduleExecutor thread
private volatile Boolean isReady = false;
private final ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> scheduledFuture;
private final ScheduledExecutorService executionMonitorExecutor = Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> executionMonitorFuture;
@Getter
protected SchedulerTriggerStateInterface triggerState;
@@ -136,6 +136,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
this.serviceStateEventPublisher = applicationContext.getBean(ApplicationEventPublisher.class);
this.executionEventPublisher = applicationContext.getBean(ApplicationEventPublisher.class);
this.workerGroupExecutorInterface = applicationContext.getBean(WorkerGroupExecutorInterface.class);
this.maintenanceService = applicationContext.getBean(MaintenanceService.class);
setState(ServiceState.CREATED);
}
@@ -150,7 +152,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
this.flowListeners.run();
this.flowListeners.listen(this::initializedTriggers);
ScheduledFuture<?> evaluationLoop = scheduleExecutor.scheduleAtFixedRate(
scheduledFuture = scheduleExecutor.scheduleAtFixedRate(
this::handle,
0,
1,
@@ -160,10 +162,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
// look at exception on the evaluation loop thread
Thread.ofVirtual().name("scheduler-evaluation-loop-watch").start(
() -> {
Await.until(evaluationLoop::isDone);
Await.until(scheduledFuture::isDone);
try {
evaluationLoop.get();
scheduledFuture.get();
} catch (CancellationException ignored) {
} catch (ExecutionException | InterruptedException e) {
@@ -175,7 +177,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
);
// Periodically report metrics and logs of running executions
ScheduledFuture<?> monitoringLoop = executionMonitorExecutor.scheduleWithFixedDelay(
executionMonitorFuture = executionMonitorExecutor.scheduleWithFixedDelay(
this::executionMonitor,
30,
10,
@@ -185,10 +187,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
// look at exception on the monitoring loop thread
Thread.ofVirtual().name("scheduler-monitoring-loop-watch").start(
() -> {
Await.until(monitoringLoop::isDone);
Await.until(executionMonitorFuture::isDone);
try {
monitoringLoop.get();
executionMonitorFuture.get();
} catch (CancellationException ignored) {
} catch (ExecutionException | InterruptedException e) {
@@ -289,8 +291,11 @@ public abstract class AbstractScheduler implements Scheduler, Service {
// listen to cluster events
this.clusterEventQueue.ifPresent(clusterEventQueueInterface -> this.receiveCancellations.addFirst(((QueueInterface<ClusterEvent>) clusterEventQueueInterface).receive(this::clusterEventQueue)));
setState(ServiceState.RUNNING);
if (this.maintenanceService.isInMaintenanceMode()) {
enterMaintenance();
} else {
setState(ServiceState.RUNNING);
}
log.info("Scheduler started");
}
@@ -399,31 +404,35 @@ public abstract class AbstractScheduler implements Scheduler, Service {
ClusterEvent clusterEvent = either.getLeft();
log.info("Cluster event received: {}", clusterEvent);
switch (clusterEvent.eventType()) {
case MAINTENANCE_ENTER -> {
this.executionQueue.pause();
this.triggerQueue.pause();
this.workerJobQueue.pause();
this.workerTriggerResultQueue.pause();
this.executionKilledQueue.pause();
this.pauseAdditionalQueues();
this.isPaused.set(true);
this.setState(ServiceState.MAINTENANCE);
}
case MAINTENANCE_EXIT -> {
this.executionQueue.resume();
this.triggerQueue.resume();
this.workerJobQueue.resume();
this.workerTriggerResultQueue.resume();
this.executionKilledQueue.resume();
this.resumeAdditionalQueues();
this.isPaused.set(false);
this.setState(ServiceState.RUNNING);
}
case MAINTENANCE_ENTER -> enterMaintenance();
case MAINTENANCE_EXIT -> exitMaintenance();
}
}
private void enterMaintenance() {
this.executionQueue.pause();
this.triggerQueue.pause();
this.workerJobQueue.pause();
this.workerTriggerResultQueue.pause();
this.executionKilledQueue.pause();
this.pauseAdditionalQueues();
this.isPaused.set(true);
this.setState(ServiceState.MAINTENANCE);
}
private void exitMaintenance() {
this.executionQueue.resume();
this.triggerQueue.resume();
this.workerJobQueue.resume();
this.workerTriggerResultQueue.resume();
this.executionKilledQueue.resume();
this.resumeAdditionalQueues();
this.isPaused.set(false);
this.setState(ServiceState.RUNNING);
}
protected void resumeAdditionalQueues() {
// by default: do nothing
}
@@ -996,6 +1005,9 @@ public abstract class AbstractScheduler implements Scheduler, Service {
}
setState(ServiceState.TERMINATING);
this.receiveCancellations.forEach(Runnable::run);
ExecutorsUtils.closeScheduledThreadPool(this.scheduleExecutor, Duration.ofSeconds(5), List.of(scheduledFuture));
ExecutorsUtils.closeScheduledThreadPool(executionMonitorExecutor, Duration.ofSeconds(5), List.of(executionMonitorFuture));
try {
if (onClose != null) {
onClose.run();
@@ -1003,9 +1015,6 @@ public abstract class AbstractScheduler implements Scheduler, Service {
} catch (Exception e) {
log.error("Unexpected error while terminating scheduler.", e);
}
this.receiveCancellations.forEach(Runnable::run);
this.scheduleExecutor.shutdown();
this.executionMonitorExecutor.shutdown();
setState(ServiceState.TERMINATED_GRACEFULLY);
if (log.isDebugEnabled()) {

View File

@@ -5,6 +5,7 @@ import com.amazon.ion.IonSystem;
import com.amazon.ion.system.*;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
@@ -36,6 +37,8 @@ import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import static com.fasterxml.jackson.core.StreamReadConstraints.DEFAULT_MAX_STRING_LEN;
public final class JacksonMapper {
public static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<>() {};
public static final TypeReference<List<Object>> LIST_TYPE_REFERENCE = new TypeReference<>() {};
@@ -43,6 +46,12 @@ public final class JacksonMapper {
private JacksonMapper() {}
static {
StreamReadConstraints.overrideDefaultStreamReadConstraints(
StreamReadConstraints.builder().maxNameLength(DEFAULT_MAX_STRING_LEN).build()
);
}
private static final ObjectMapper MAPPER = JacksonMapper.configure(
new ObjectMapper()
);
@@ -52,7 +61,7 @@ public final class JacksonMapper {
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
public static ObjectMapper ofJson() {
return MAPPER;
return JacksonMapper.ofJson(true);
}
public static ObjectMapper ofJson(boolean strict) {

View File

@@ -1,6 +1,7 @@
package io.kestra.core.server;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.utils.ExecutorsUtils;
import io.micronaut.core.annotation.Introspected;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
@@ -8,9 +9,11 @@ import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -25,6 +28,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
protected final ServerConfig serverConfig;
private final AtomicBoolean isStopped = new AtomicBoolean(false);
private ScheduledExecutorService scheduledExecutorService;
private ScheduledFuture<?> scheduledFuture;
private Instant lastScheduledExecution;
/**
@@ -98,7 +102,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, name));
Duration scheduleInterval = getScheduleInterval();
log.debug("Scheduling '{}' at fixed rate {}.", name, scheduleInterval);
scheduledExecutorService.scheduleAtFixedRate(
scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(
this,
0,
scheduleInterval.toSeconds(),
@@ -133,20 +137,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
@Override
public void close() {
if (isStopped.compareAndSet(false, true) && scheduledExecutorService != null) {
scheduledExecutorService.shutdown();
if (scheduledExecutorService.isTerminated()) {
return;
}
try {
if (!scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS)) {
log.debug("Failed to wait for scheduled '{}' task termination. Cause: Timeout", name);
}
log.debug("Stopped scheduled '{}' task.", name);
} catch (InterruptedException e) {
scheduledExecutorService.shutdownNow();
Thread.currentThread().interrupt();
log.debug("Failed to wait for scheduled '{}' task termination. Cause: Interrupted.", name);
}
ExecutorsUtils.closeScheduledThreadPool(scheduledExecutorService, Duration.ofSeconds(5), List.of(scheduledFuture));
}
}
}

View File

@@ -111,7 +111,7 @@ public interface Service extends AutoCloseable {
* </pre>
*/
enum ServiceState {
CREATED(1, 2, 3), // 0
CREATED(1, 2, 3, 4, 9), // 0
RUNNING(2, 3, 4, 9), // 1
ERROR(4), // 2
DISCONNECTED(4, 7), // 3

View File

@@ -215,9 +215,10 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
stateLock.lock();
// Optional callback to be executed at the end.
Runnable returnCallback = null;
localServiceState = localServiceState(service);
try {
localServiceState = localServiceState(service);
if (localServiceState == null) {
return null; // service has been unregistered.
}
@@ -266,7 +267,7 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
// Update the local instance
this.serviceRegistry.register(localServiceState.with(remoteInstance));
} catch (Exception e) {
final ServiceInstance localInstance = localServiceState(service).instance();
final ServiceInstance localInstance = localServiceState.instance();
log.error("[Service id={}, type='{}', hostname='{}'] Failed to update state to {}. Error: {}",
localInstance.uid(),
localInstance.type(),
@@ -282,7 +283,7 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
returnCallback.run();
}
}
return localServiceState(service).instance();
return Optional.ofNullable(localServiceState(service)).map(LocalServiceState::instance).orElse(null);
}
private void mayDisableStateUpdate(final Service service, final ServiceInstance instance) {
@@ -336,9 +337,11 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
final Service service,
final ServiceInstance instance,
final boolean isLivenessEnabled) {
// Never shutdown STANDALONE server or WEB_SERVER service.
if (instance.server().type().equals(ServerInstance.Type.STANDALONE) ||
instance.is(ServiceType.WEBSERVER)) {
// Never shutdown STANDALONE server or WEBSERVER and INDEXER services.
if (ServerInstance.Type.STANDALONE.equals(instance.server().type()) ||
instance.is(ServiceType.INDEXER) ||
instance.is(ServiceType.WEBSERVER)
) {
// Force the RUNNING state.
return Optional.of(instance.state(Service.ServiceState.RUNNING, now, null));
}

View File

@@ -212,7 +212,16 @@ public class ExecutionService {
// We need to remove global error tasks and flowable error tasks if any
flow
.allErrorsWithChilds()
.allErrorsWithChildren()
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
// We need to remove global finally tasks and flowable error tasks if any
flow
.allFinallyWithChildren()
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
// We need to remove afterExecution tasks
ListUtils.emptyOnNull(flow.getAfterExecution())
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
// Build and launch new execution
@@ -315,6 +324,32 @@ public class ExecutionService {
return revision != null ? newExecution.withFlowRevision(revision) : newExecution;
}
public Execution changeTaskRunState(final Execution execution, Flow flow, String taskRunId, State.Type newState) throws Exception {
Execution newExecution = markAs(execution, flow, taskRunId, newState);
// if the execution was terminated, it could have executed errors/finally/afterExecutions, we must remove them as the execution will be restarted
if (execution.getState().isTerminated()) {
List<TaskRun> newTaskRuns = newExecution.getTaskRunList();
// We need to remove global error tasks and flowable error tasks if any
flow
.allErrorsWithChildren()
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
// We need to remove global finally tasks and flowable error tasks if any
flow
.allFinallyWithChildren()
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
// We need to remove afterExecution tasks
ListUtils.emptyOnNull(flow.getAfterExecution())
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
return newExecution.withTaskRunList(newTaskRuns);
} else {
return newExecution;
}
}
public Execution markAs(final Execution execution, FlowInterface flow, String taskRunId, State.Type newState) throws Exception {
return this.markAs(execution, flow, taskRunId, newState, null);
}

View File

@@ -176,7 +176,7 @@ public class FlowService {
previous :
FlowWithSource.of(flowToImport.toBuilder().revision(previous.getRevision() + 1).build(), source)
)
.orElseGet(() -> FlowWithSource.of(flowToImport, source).toBuilder().revision(1).build());
.orElseGet(() -> FlowWithSource.of(flowToImport, source).toBuilder().tenantId(tenantId).revision(1).build());
} else {
return maybeExisting
.map(previous -> repository().update(flow, previous))

View File

@@ -0,0 +1,14 @@
package io.kestra.core.services;
import jakarta.inject.Singleton;
@Singleton
public class MaintenanceService {
/**
* @return true if the cluster is in maintenance mode
*/
public boolean isInMaintenanceMode() {
// maintenance mode is an EE feature
return false;
}
}

View File

@@ -168,18 +168,15 @@ public class PluginDefaultService {
try {
return this.injectAllDefaults(flow, false);
} catch (Exception e) {
RunContextLogger
.logEntries(
Execution.loggingEventFromException(e),
LogEntry.of(execution)
)
.forEach(logEntry -> {
try {
logQueue.emitAsync(logEntry);
} catch (QueueException e1) {
// silently do nothing
}
});
try {
logQueue.emitAsync(RunContextLogger
.logEntries(
Execution.loggingEventFromException(e),
LogEntry.of(execution)
));
} catch (QueueException e1) {
// silently do nothing
}
return readWithoutDefaultsOrThrow(flow);
}
}

View File

@@ -5,16 +5,19 @@ import io.kestra.core.test.TestState;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import java.net.URI;
import java.util.List;
public record UnitTestResult(
@NotNull
String unitTestId,
String testId,
@NotNull
String unitTestType,
String testType,
@NotNull
String executionId,
@NotNull
URI url,
@NotNull
TestState state,
@NotNull
List<AssertionResult> assertionResults,
@@ -22,14 +25,13 @@ public record UnitTestResult(
List<AssertionRunError> errors,
Fixtures fixtures
) {
public static UnitTestResult of(String unitTestId, String unitTestType, String executionId, List<AssertionResult> results, List<AssertionRunError> errors, @Nullable Fixtures fixtures) {
public static UnitTestResult of(String unitTestId, String unitTestType, String executionId, URI url, List<AssertionResult> results, List<AssertionRunError> errors, @Nullable Fixtures fixtures) {
TestState state;
if(!errors.isEmpty()){
state = TestState.ERROR;
} else {
state = results.stream().anyMatch(assertion -> !assertion.isSuccess()) ? TestState.FAILED : TestState.SUCCESS;
}
return new UnitTestResult(unitTestId, unitTestType, executionId, state, results, errors, fixtures);
return new UnitTestResult(unitTestId, unitTestType, executionId, url, state, results, errors, fixtures);
}
}

View File

@@ -3,12 +3,16 @@ package io.kestra.core.utils;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.*;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
@Singleton
@Slf4j
public class ExecutorsUtils {
@Inject
private ThreadMainFactoryBuilder threadFactoryBuilder;
@@ -61,6 +65,29 @@ public class ExecutorsUtils {
);
}
public static void closeScheduledThreadPool(ScheduledExecutorService scheduledExecutorService, Duration gracePeriod, List<ScheduledFuture<?>> taskFutures) {
scheduledExecutorService.shutdown();
if (scheduledExecutorService.isTerminated()) {
return;
}
try {
if (!scheduledExecutorService.awaitTermination(gracePeriod.toMillis(), TimeUnit.MILLISECONDS)) {
log.warn("Failed to shutdown the ScheduledThreadPoolExecutor during grace period, forcing it to shutdown now");
// Ensure the scheduled task reaches a terminal state to avoid possible memory leak
ListUtils.emptyOnNull(taskFutures).forEach(taskFuture -> taskFuture.cancel(true));
scheduledExecutorService.shutdownNow();
}
log.debug("Stopped scheduled ScheduledThreadPoolExecutor.");
} catch (InterruptedException e) {
scheduledExecutorService.shutdownNow();
Thread.currentThread().interrupt();
log.debug("Failed to shutdown the ScheduledThreadPoolExecutor.");
}
}
private ExecutorService wrap(String name, ExecutorService executorService) {
return ExecutorServiceMetrics.monitor(
meterRegistry,

View File

@@ -73,7 +73,7 @@ public class GraphUtils {
)))
.orElse(Collections.emptyMap());
triggersDeclarations.forEach(trigger -> {
triggersDeclarations.stream().filter(trigger -> trigger != null).forEach(trigger -> {
GraphTrigger triggerNode = new GraphTrigger(trigger, triggersById.get(trigger.getId()));
triggerCluster.addNode(triggerNode);
triggerCluster.addEdge(triggerCluster.getRoot(), triggerNode, new Relation());

View File

@@ -1,5 +1,7 @@
package io.kestra.core.utils;
import io.kestra.core.models.Setting;
import io.kestra.core.repositories.SettingRepositoryInterface;
import io.micronaut.context.env.Environment;
import io.micronaut.context.env.PropertiesPropertySourceLoader;
import io.micronaut.context.env.PropertySource;
@@ -29,6 +31,9 @@ public class VersionProvider {
@Inject
private Environment environment;
@Inject
private Optional<SettingRepositoryInterface> settingRepository; // repositories are not always there on unit tests
@PostConstruct
public void start() {
final Optional<PropertySource> gitProperties = new PropertiesPropertySourceLoader()
@@ -40,6 +45,18 @@ public class VersionProvider {
this.revision = loadRevision(gitProperties);
this.date = loadTime(gitProperties);
this.version = loadVersion(buildProperties, gitProperties);
// check the version in the settings and update if needed, we did't use it would allow us to detect incompatible update later if needed
if (settingRepository.isPresent()) {
Optional<Setting> versionSetting = settingRepository.get().findByKey(Setting.INSTANCE_VERSION);
if (versionSetting.isEmpty() || !versionSetting.get().getValue().equals(this.version)) {
settingRepository.get().save(Setting.builder()
.key(Setting.INSTANCE_VERSION)
.value(this.version)
.build()
);
}
}
}
private String loadVersion(final Optional<PropertySource> buildProperties,

View File

@@ -90,7 +90,7 @@ public class ExecutionOutputs extends Condition implements ScheduleCondition {
private static final String OUTPUTS_VAR = "outputs";
@NotNull
private Property<String> expression;
private Property<Boolean> expression;
/** {@inheritDoc} **/
@SuppressWarnings("unchecked")
@@ -105,9 +105,8 @@ public class ExecutionOutputs extends Condition implements ScheduleCondition {
conditionContext.getVariables(),
Map.of(TRIGGER_VAR, Map.of(OUTPUTS_VAR, conditionContext.getExecution().getOutputs()))
);
String render = conditionContext.getRunContext().render(expression).as(String.class, variables).orElseThrow();
return !(render.isBlank() || render.trim().equals("false"));
return conditionContext.getRunContext().render(expression).skipCache().as(Boolean.class, variables).orElseThrow();
}
private boolean hasNoOutputs(final Execution execution) {

View File

@@ -111,8 +111,9 @@ public class Labels extends Task implements ExecutionUpdatableTask {
})
).collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue
));
Map.Entry::getValue,
(first, second) -> second)
);
} else if (labels instanceof Map<?, ?> map) {
labelsAsMap = map.entrySet()
.stream()

View File

@@ -0,0 +1,75 @@
package io.kestra.plugin.core.execution;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.ExecutionUpdatableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.MapUtils;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.util.List;
import java.util.Map;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Allow to set execution variables. These variables will then be available via the `{{ vars.name }}` expression."
)
@Plugin(
examples = {
@Example(
full = true,
title = "Set variables",
code = """
id: variables
namespace: company.team
variables:
name: World
tasks:
- id: set_vars
type: io.kestra.plugin.core.execution.SetVariables
variables:
message: Hello
name: Loïc
- id: hello
type: io.kestra.plugin.core.log.Log
message: "{{ vars.message }} {{ vars.name }}\""""
)
}
)
public class SetVariables extends Task implements ExecutionUpdatableTask {
@Schema(title = "The variables")
@NotNull
private Property<Map<String, Object>> variables;
@Schema(title = "Whether to overwrite existing variables")
@NotNull
@Builder.Default
private Property<Boolean> overwrite = Property.ofValue(true);
@Override
public Execution update(Execution execution, RunContext runContext) throws Exception {
Map<String, Object> renderedVars = runContext.render(this.variables).asMap(String.class, Object.class);
boolean renderedOverwrite = runContext.render(overwrite).as(Boolean.class).orElseThrow();
if (!renderedOverwrite) {
// check that none of the new variables already exist
List<String> duplicated = renderedVars.keySet().stream().filter(key -> execution.getVariables().containsKey(key)).toList();
if (!duplicated.isEmpty()) {
throw new IllegalArgumentException("`overwrite` is set to false and the following variables already exist: " + String.join(",", duplicated));
}
}
return execution.withVariables(MapUtils.merge(execution.getVariables(), renderedVars));
}
}

View File

@@ -0,0 +1,89 @@
package io.kestra.plugin.core.execution;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.ExecutionUpdatableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.util.List;
import java.util.Map;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Allow to unset execution variables."
)
@Plugin(
examples = {
@Example(
full = true,
title = "Set and later unset variables",
code = """
id: variables
namespace: company.team
variables:
name: World
tasks:
- id: set_vars
type: io.kestra.plugin.core.execution.SetVariables
variables:
message: Hello
name: Loïc
- id: hello
type: io.kestra.plugin.core.log.Log
message: "{{ vars.message }} {{ vars.name }}"
- id: unset_variables
type: io.kestra.plugin.core.execution.UnsetVariables
variables:
- message
- name"""
)
}
)
public class UnsetVariables extends Task implements ExecutionUpdatableTask {
@Schema(title = "The variables")
@NotNull
private Property<List<String>> variables;
@Schema(title = "Whether to ignore missing variables")
@NotNull
@Builder.Default
private Property<Boolean> ignoreMissing = Property.ofValue(false);
@Override
public Execution update(Execution execution, RunContext runContext) throws Exception {
List<String> renderedVariables = runContext.render(variables).asList(String.class);
boolean renderedIgnoreMissing = runContext.render(ignoreMissing).as(Boolean.class).orElseThrow();
Map<String, Object> variables = execution.getVariables();
for (String key : renderedVariables) {
removeVar(variables, key, renderedIgnoreMissing);
}
return execution.withVariables(variables);
}
private void removeVar(Map<String, Object> vars, String key, boolean ignoreMissing) {
if (key.indexOf('.') >= 0) {
String prefix = key.substring(0, key.indexOf('.'));
String suffix = key.substring(key.indexOf('.') + 1);
removeVar((Map<String, Object>) vars.get(prefix), suffix, ignoreMissing);
} else {
if (ignoreMissing && !vars.containsKey(key)) {
return;
}
vars.remove(key);
}
}
}

View File

@@ -555,7 +555,7 @@ public class ForEachItem extends Task implements FlowableTask<VoidOutput>, Child
builder.uri(uri);
} catch (Exception e) {
runContext.logger().warn("Failed to extract outputs with the error: '{}'", e.getLocalizedMessage(), e);
var state = this.isAllowFailure() ? State.Type.WARNING : State.Type.FAILED;
var state = State.Type.fail(this);
taskRun = taskRun
.withState(state)
.withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(state)).build()))

View File

@@ -29,8 +29,13 @@ import java.util.concurrent.TimeUnit;
@Plugin(
examples = {
@Example(
full = true,
code = """
id: sleep
id: sleep
namespace: company.team
tasks:
- id: sleep
type: io.kestra.plugin.core.flow.Sleep
duration: "PT5S"
"""

View File

@@ -208,48 +208,50 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
return Optional.empty();
}
boolean isOutputsAllowed = runContext
.<Boolean>pluginConfiguration(PLUGIN_FLOW_OUTPUTS_ENABLED)
.orElse(true);
final Output.OutputBuilder builder = Output.builder()
.executionId(execution.getId())
.state(execution.getState().getCurrent());
final Map<String, Object> subflowOutputs = Optional
.ofNullable(flow.getOutputs())
.map(outputs -> outputs
.stream()
.collect(Collectors.toMap(
io.kestra.core.models.flows.Output::getId,
io.kestra.core.models.flows.Output::getValue)
)
)
.orElseGet(() -> isOutputsAllowed ? this.getOutputs() : null);
VariablesService variablesService = ((DefaultRunContext) runContext).getApplicationContext().getBean(VariablesService.class);
if (subflowOutputs != null) {
try {
Map<String, Object> outputs = runContext.render(subflowOutputs);
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
if (flow.getOutputs() != null && flowInputOutput != null) {
outputs = flowInputOutput.typedOutputs(flow, execution, outputs);
}
builder.outputs(outputs);
} catch (Exception e) {
runContext.logger().warn("Failed to extract outputs with the error: '{}'", e.getLocalizedMessage(), e);
var state = this.isAllowFailure() ? this.isAllowWarning() ? State.Type.SUCCESS : State.Type.WARNING : State.Type.FAILED;
Variables variables = variablesService.of(StorageContext.forTask(taskRun), builder.build());
taskRun = taskRun
.withState(state)
.withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(state)).build()))
.withOutputs(variables);
if (this.wait) { // we only compute outputs if we wait for the subflow
boolean isOutputsAllowed = runContext
.<Boolean>pluginConfiguration(PLUGIN_FLOW_OUTPUTS_ENABLED)
.orElse(true);
return Optional.of(SubflowExecutionResult.builder()
.executionId(execution.getId())
.state(State.Type.FAILED)
.parentTaskRun(taskRun)
.build());
final Map<String, Object> subflowOutputs = Optional
.ofNullable(flow.getOutputs())
.map(outputs -> outputs
.stream()
.collect(Collectors.toMap(
io.kestra.core.models.flows.Output::getId,
io.kestra.core.models.flows.Output::getValue)
)
)
.orElseGet(() -> isOutputsAllowed ? this.getOutputs() : null);
if (subflowOutputs != null) {
try {
Map<String, Object> outputs = runContext.render(subflowOutputs);
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
if (flow.getOutputs() != null && flowInputOutput != null) {
outputs = flowInputOutput.typedOutputs(flow, execution, outputs);
}
builder.outputs(outputs);
} catch (Exception e) {
runContext.logger().warn("Failed to extract outputs with the error: '{}'", e.getLocalizedMessage(), e);
var state = State.Type.fail(this);
Variables variables = variablesService.of(StorageContext.forTask(taskRun), builder.build());
taskRun = taskRun
.withState(state)
.withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(state)).build()))
.withOutputs(variables);
return Optional.of(SubflowExecutionResult.builder()
.executionId(execution.getId())
.state(State.Type.FAILED)
.parentTaskRun(taskRun)
.build());
}
}
}

View File

@@ -9,11 +9,11 @@ import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.services.FlowService;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import org.codehaus.commons.nullanalysis.NotNull;
import java.util.NoSuchElementException;

View File

@@ -82,12 +82,12 @@ import java.util.stream.Stream;
code = """
id: daily_flow
namespace: company.team
tasks:
- id: log
type: io.kestra.plugin.core.log.Log
message: It's {{ trigger.date ?? taskrun.startDate | date("HH:mm") }}
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
@@ -437,13 +437,6 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
Optional.empty()
);
execution = execution.toBuilder()
// keep to avoid breaking compatibility
.variables(ImmutableMap.of(
"schedule", execution.getTrigger().getVariables()
))
.build();
return Optional.of(execution);
}

View File

@@ -56,7 +56,8 @@ public class OverrideRetryInterceptor implements MethodInterceptor<Object, Objec
retry.get("delay", Duration.class).orElse(Duration.ofSeconds(1)),
retry.get("maxDelay", Duration.class).orElse(null),
new DefaultRetryPredicate(resolveIncludes(retry, "includes"), resolveIncludes(retry, "excludes")),
Throwable.class
Throwable.class,
0
);
MutableConvertibleValues<Object> attrs = context.getAttributes();

View File

@@ -49,7 +49,6 @@ class DocumentationGeneratorTest {
assertThat(render).contains("description: \"Short description for this task\"");
assertThat(render).contains("`VALUE_1`");
assertThat(render).contains("`VALUE_2`");
assertThat(render).contains("This plugin is exclusively available on the Cloud and Enterprise editions of Kestra.");
}
@SuppressWarnings({"rawtypes", "unchecked"})

View File

@@ -112,7 +112,7 @@ class JsonSchemaGeneratorTest {
var requiredWithDefault = definitions.get("io.kestra.core.docs.JsonSchemaGeneratorTest-RequiredWithDefault");
assertThat(requiredWithDefault, is(notNullValue()));
assertThat((List<String>) requiredWithDefault.get("required"), not(contains("requiredWithDefault")));
assertThat((List<String>) requiredWithDefault.get("required"), not(containsInAnyOrder("requiredWithDefault", "anotherRequiredWithDefault")));
var properties = (Map<String, Map<String, Object>>) flow.get("properties");
var listeners = properties.get("listeners");
@@ -253,7 +253,7 @@ class JsonSchemaGeneratorTest {
void requiredAreRemovedIfThereIsADefault() {
Map<String, Object> generate = jsonSchemaGenerator.properties(Task.class, RequiredWithDefault.class);
assertThat(generate, is(not(nullValue())));
assertThat((List<String>) generate.get("required"), not(containsInAnyOrder("requiredWithDefault")));
assertThat((List<String>) generate.get("required"), not(containsInAnyOrder("requiredWithDefault", "anotherRequiredWithDefault")));
assertThat((List<String>) generate.get("required"), containsInAnyOrder("requiredWithNoDefault"));
}
@@ -466,6 +466,11 @@ class JsonSchemaGeneratorTest {
@Builder.Default
private Property<TaskWithEnum.TestClass> requiredWithDefault = Property.ofValue(TaskWithEnum.TestClass.builder().testProperty("test").build());
@PluginProperty
@NotNull
@Builder.Default
private Property<TaskWithEnum.TestClass> anotherRequiredWithDefault = Property.ofValue(TaskWithEnum.TestClass.builder().testProperty("test2").build());
@PluginProperty
@NotNull
private Property<TaskWithEnum.TestClass> requiredWithNoDefault;

View File

@@ -1,6 +1,7 @@
package io.kestra.core.http.client;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.net.HttpHeaders;
import io.kestra.core.context.TestRunContextFactory;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
@@ -262,6 +263,30 @@ class HttpClientTest {
}
}
@Test
void postCustomObject_WithUnknownResponseField() throws IllegalVariableEvaluationException, HttpClientException, IOException {
CustomObject test = CustomObject.builder()
.id(IdUtils.create())
.name("test")
.build();
Map<String, String> withAdditionalField = JacksonMapper.ofJson().convertValue(test, new TypeReference<>() {
});
withAdditionalField.put("foo", "bar");
try (HttpClient client = client()) {
HttpResponse<CustomObject> response = client.request(
HttpRequest.of(URI.create(embeddedServerUri + "/http/json-post"), "POST", HttpRequest.JsonRequestBody.builder().content(withAdditionalField).build()),
CustomObject.class
);
assertThat(response.getStatus().getCode()).isEqualTo(200);
assertThat(response.getBody().id).isEqualTo(test.id);
assertThat(response.getHeaders().firstValue(HttpHeaders.CONTENT_TYPE).orElseThrow()).isEqualTo(MediaType.APPLICATION_JSON);
}
}
@Test
void postMultipart() throws IOException, URISyntaxException, IllegalVariableEvaluationException, HttpClientException {
Map<String, Object> multipart = Map.of(
@@ -509,4 +534,4 @@ class HttpClientTest {
String id;
String name;
}
}
}

View File

@@ -1,11 +1,12 @@
package io.kestra.core.models;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class LabelTest {
@Test
@@ -15,9 +16,8 @@ class LabelTest {
new Label(Label.CORRELATION_ID, "id"))
);
Assertions.assertEquals(
Map.of("system", Map.of("username", "test", "correlationId", "id")),
result
assertThat(result).isEqualTo(
Map.of("system", Map.of("username", "test", "correlationId", "id"))
);
}
@@ -29,9 +29,48 @@ class LabelTest {
new Label(Label.CORRELATION_ID, "id"))
);
Assertions.assertEquals(
Map.of("system", Map.of("username", "test1", "correlationId", "id")),
result
assertThat(result).isEqualTo(
Map.of("system", Map.of("username", "test2", "correlationId", "id"))
);
}
@Test
void shouldGetMapGivenDistinctLabels() {
Map<String, String> result = Label.toMap(List.of(
new Label(Label.USERNAME, "test"),
new Label(Label.CORRELATION_ID, "id"))
);
assertThat(result).isEqualTo(
Map.of(Label.USERNAME, "test", Label.CORRELATION_ID, "id")
);
}
@Test
void shouldGetMapGivenDuplicateLabels() {
Map<String, String> result = Label.toMap(List.of(
new Label(Label.USERNAME, "test1"),
new Label(Label.USERNAME, "test2"),
new Label(Label.CORRELATION_ID, "id"))
);
assertThat(result).isEqualTo(
Map.of(Label.USERNAME, "test2", Label.CORRELATION_ID, "id")
);
}
@Test
void shouldDuplicateLabelsWithKeyOrderKept() {
List<Label> result = Label.deduplicate(List.of(
new Label(Label.USERNAME, "test1"),
new Label(Label.USERNAME, "test2"),
new Label(Label.CORRELATION_ID, "id"),
new Label(Label.USERNAME, "test3"))
);
assertThat(result).containsExactly(
new Label(Label.USERNAME, "test3"),
new Label(Label.CORRELATION_ID, "id")
);
}
}

View File

@@ -2,6 +2,7 @@ package io.kestra.core.models.executions;
import io.kestra.core.models.Label;
import io.kestra.core.utils.IdUtils;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import io.kestra.core.models.flows.State;
@@ -157,7 +158,57 @@ class ExecutionTest {
.labels(List.of(new Label("test", "test-value")))
.build();
assertThat(execution.getLabels().size()).isEqualTo(1);
assertThat(execution.getLabels().getFirst()).isEqualTo(new Label("test", "test-value"));
assertThat(execution.getLabels()).containsExactly(new Label("test", "test-value"));
}
@Test
void labelsGetDeduplicated() {
final List<Label> duplicatedLabels = List.of(
new Label("test", "value1"),
new Label("test", "value2")
);
final Execution executionWithLabels = Execution.builder()
.build()
.withLabels(duplicatedLabels);
assertThat(executionWithLabels.getLabels()).containsExactly(new Label("test", "value2"));
final Execution executionBuilder = Execution.builder()
.labels(duplicatedLabels)
.build();
assertThat(executionBuilder.getLabels()).containsExactly(new Label("test", "value2"));
}
@Test
@Disabled("Solve label deduplication on instantization")
void labelsGetDeduplicatedOnNewInstance() {
final List<Label> duplicatedLabels = List.of(
new Label("test", "value1"),
new Label("test", "value2")
);
final Execution executionNew = new Execution(
"foo",
"id",
"namespace",
"flowId",
1,
Collections.emptyList(),
Map.of(),
Map.of(),
duplicatedLabels,
Map.of(),
State.of(State.Type.SUCCESS, Collections.emptyList()),
"parentId",
"originalId",
null,
false,
null,
null,
null,
null,
null
);
assertThat(executionNew.getLabels()).containsExactly(new Label("test", "value2"));
}
}

View File

@@ -26,7 +26,7 @@ class MultiselectInputTest {
MultiselectInput input = MultiselectInput
.builder()
.id("id")
.expression("{{ values }}")
.expression("{{ values }}\n")
.build();
// When
Input<?> renderInput = RenderableInput.mayRenderInput(input, s -> {
@@ -60,4 +60,4 @@ class MultiselectInputTest {
// Then
Assertions.assertEquals(((MultiselectInput)renderInput).getValues(), List.of("1", "2"));
}
}
}

View File

@@ -26,7 +26,7 @@ class SelectInputTest {
SelectInput input = SelectInput
.builder()
.id("id")
.expression("{{ values }}")
.expression("{{ values }}\n")
.build();
// When
Input<?> renderInput = RenderableInput.mayRenderInput(input, s -> {
@@ -60,4 +60,4 @@ class SelectInputTest {
// Then
Assertions.assertEquals(((SelectInput)renderInput).getValues(), List.of("1", "2"));
}
}
}

View File

@@ -68,6 +68,31 @@ class ScriptServiceTest {
}
}
@Test
void replaceInternalStorageUnicode() throws IOException {
var runContext = runContextFactory.of();
Path path = Path.of("/tmp/unittest/main/file-龍.txt");
if (!path.toFile().exists()) {
Files.createFile(path);
}
String internalStorageUri = "kestra://some/file-龍.txt";
File localFile = null;
try {
var command = ScriptService.replaceInternalStorage(runContext, "my command with an internal storage file: " + internalStorageUri, false);
Matcher matcher = COMMAND_PATTERN_CAPTURE_LOCAL_PATH.matcher(command);
assertThat(matcher.matches()).isTrue();
Path absoluteLocalFilePath = Path.of(matcher.group(1));
localFile = absoluteLocalFilePath.toFile();
assertThat(localFile.exists()).isTrue();
} finally {
localFile.delete();
path.toFile().delete();
}
}
@Test
void uploadInputFiles() throws IOException {
var runContext = runContextFactory.of();

View File

@@ -4,6 +4,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import io.kestra.core.models.Plugin;
import io.kestra.core.plugins.PluginRegistry;
@@ -15,12 +16,14 @@ import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
@ExtendWith(MockitoExtension.class)
class PluginDeserializerTest {
@Mock
private PluginRegistry registry;
@Test
void shouldSucceededDeserializePluginGivenValidType() throws JsonProcessingException {
// Given
@@ -38,8 +41,9 @@ class PluginDeserializerTest {
TestPluginHolder deserialized = om.readValue(input, TestPluginHolder.class);
// Then
Assertions.assertEquals(TestPlugin.class.getCanonicalName(), deserialized.plugin().getType());
Mockito.verify(registry, Mockito.only()).findClassByIdentifier(identifier);
assertThat(TestPlugin.class.getCanonicalName()).isEqualTo(deserialized.plugin().getType());
Mockito.verify(registry, Mockito.times(1)).isVersioningSupported();
Mockito.verify(registry, Mockito.times(1)).findClassByIdentifier(identifier);
}
@Test
@@ -57,17 +61,33 @@ class PluginDeserializerTest {
});
// Then
Assertions.assertEquals("io.kestra.core.plugins.serdes.Unknown", exception.getTypeId());
assertThat("io.kestra.core.plugins.serdes.Unknown").isEqualTo(exception.getTypeId());
}
@Test
void shouldReturnNullPluginIdentifierGivenNullType() {
Assertions.assertNull(PluginDeserializer.extractPluginRawIdentifier(new TextNode(null)));
assertThat(PluginDeserializer.extractPluginRawIdentifier(new TextNode(null), true)).isNull();
}
@Test
void shouldReturnNullPluginIdentifierGivenEmptyType() {
Assertions.assertNull(PluginDeserializer.extractPluginRawIdentifier(new TextNode("")));
assertThat(PluginDeserializer.extractPluginRawIdentifier(new TextNode(""), true)).isNull();
}
@Test
void shouldReturnTypeWithVersionGivenSupportedVersionTrue() {
ObjectNode jsonNodes = new ObjectNode(new ObjectMapper().getNodeFactory());
jsonNodes.set("type", new TextNode("io.kestra.core.plugins.serdes.Unknown"));
jsonNodes.set("version", new TextNode("1.0.0"));
assertThat(PluginDeserializer.extractPluginRawIdentifier(jsonNodes, true)).isEqualTo("io.kestra.core.plugins.serdes.Unknown:1.0.0");
}
@Test
void shouldReturnTypeWithVersionGivenSupportedVersionFalse() {
ObjectNode jsonNodes = new ObjectNode(new ObjectMapper().getNodeFactory());
jsonNodes.set("type", new TextNode("io.kestra.core.plugins.serdes.Unknown"));
jsonNodes.set("version", new TextNode("1.0.0"));
assertThat(PluginDeserializer.extractPluginRawIdentifier(jsonNodes, false)).isEqualTo("io.kestra.core.plugins.serdes.Unknown");
}
public record TestPluginHolder(Plugin plugin) {

Some files were not shown because too many files have changed in this diff Show More