Compare commits

...

203 Commits

Author SHA1 Message Date
github-actions[bot]
3cf1dfda69 chore(version): update to version '0.24.16' 2025-12-09 10:03:37 +00:00
Roman Acevedo
0ed3da6b61 ci: add missing GH_PERSONAL_TOKEN in main-build.yml 2025-12-09 10:53:47 +01:00
Roman Acevedo
ef8ae0bc46 ci: add GH_PERSONAL_TOKEN in release-docker.yml CI for helm chart 2025-12-09 10:15:37 +01:00
brian.mulier
03fc7d89d4 fix(core): deprecate Await util (#13369)
This reverts commit 9fa94deba9.
2025-12-04 11:26:12 +01:00
brian.mulier
5c4ab33de6 fix(executions): avoid Executor being stuck / slow startup due to flow init 2025-12-01 19:19:06 +01:00
brian.mulier
9cfdcc7cce fix(tests): use another db name on webserver to avoid colliding with repositories 2025-12-01 18:31:45 +01:00
brian.mulier
3a9dec0a15 refacto(core): rename Await.until(sleep) and (timeout) to avoid confusions 2025-12-01 18:30:53 +01:00
github-actions[bot]
fb0de0c0ae chore(version): update to version '0.24.15' 2025-11-18 13:09:41 +00:00
Loïc Mathieu
c189d97e07 fix(core): compilation issue 2025-11-17 15:10:34 +01:00
Loïc Mathieu
a723a35c88 fix(flow): flow trigger with both conditions and preconditions
When a flow have both a condition and a precondition, the condition was evaluated twice which lead to double execution triggered.

Fixes
2025-11-14 18:09:37 +01:00
Loïc Mathieu
b3f423d158 fix(flow): don't URLEncode the fileName inside the Download task
Also provide a `fileName` property that when set would override any filename from the content disposition in case it causes issues.
2025-11-13 11:11:33 +01:00
Loïc Mathieu
a46429cc09 fix(system): access log configuration
Due to a change in the configuration file, access log configuration was in the wrong sub-document.

Fixes https://github.com/kestra-io/kestra-ee/issues/5670
2025-11-12 15:08:34 +01:00
github-actions[bot]
4af50e31cc chore(version): update to version '0.24.14' 2025-11-10 14:32:17 +00:00
Loïc Mathieu
a754b749f8 fix(system): trigger an execution once per condition on flow triggers
Fixes #12560
2025-11-05 15:34:08 +01:00
github-actions[bot]
547731a2a6 chore(version): update to version '0.24.13' 2025-11-03 11:12:22 +00:00
Roman Acevedo
18570b7512 ci: add dry run and default plugin-version for release docker 2025-10-31 20:18:05 +01:00
Loïc Mathieu
a1e3b912b3 fix(executions): Flow triggered twice when there are two multiple conditions
Fixes #12560
2025-10-31 16:26:44 +01:00
Dnyanesh Pise
c04b9a4f17 fix(ui): prevent marking fields as error on login (Fix #12548) (#12554) 2025-10-30 23:38:55 +05:30
Loïc Mathieu
65395feeab fix(executions): set the execution to KILLING and not RESTARTED when killing a paused flow
Fixes https://github.com/kestra-io/kestra/issues/12417
2025-10-30 18:14:20 +01:00
Loïc Mathieu
d6e172f715 chore(deps): fix OpenTelemetry proto so it works with Protobuf 3
Fixes https://github.com/kestra-io/kestra/issues/12298
2025-10-30 15:49:25 +01:00
Hemant M Mehta
8a2271b587 fix(executions): jq-filter-zip-exception
closes: #11683
2025-10-30 12:58:21 +01:00
Your Name
0c89ba25e9 fix(core): handle integer size in chunk Pebble filter 2025-10-29 12:37:46 +01:00
Naveen Gowda MY
5a01961c55 fix(core: add error feedback and validation (#12472) 2025-10-29 16:12:59 +05:30
brian-mulier-p
96355b96f9 fix(core): show tasks in JSON Schema for Switch.cases (#12478)
part of #10508
2025-10-29 11:01:38 +01:00
Roman Acevedo
5b676409a4 ci: add skip test param to pre-release.yml 2025-10-28 17:55:05 +01:00
brian.mulier
8af1bdc2c6 chore(version): update to version '0.24.12' 2025-10-28 14:35:55 +01:00
Loïc Mathieu
de9b4f0f30 fix(executions): remove errors and finally tasks when restarting
Otherwize we would detect that an error or a finally branch is processing and the flowable state would not be correctly taken.

Moreover, it prevent this branch to be taken again after a restart.

Fixes #11731
2025-10-28 14:30:50 +01:00
Hemant M Mehta
1bd839556c fix: file-download-issue (#11774)
* fix: file-download-issue

closes: #11569

* fix: test case

Signed-off-by: Hemant M Mehta <hemant29mehta@gmail.com>

---------

Signed-off-by: Hemant M Mehta <hemant29mehta@gmail.com>
2025-10-22 11:50:17 +02:00
github-actions[bot]
761d208a08 chore(version): update to version '0.24.11' 2025-10-21 12:46:42 +00:00
Hemant M Mehta
ba55930334 fix(executions): properly handle filename with special chars (#11814)
* fix: artifact-filename-validation

closes: #10802

* fix: test

Signed-off-by: Hemant M Mehta <hemant29mehta@gmail.com>

* fix: test

Signed-off-by: Hemant M Mehta <hemant29mehta@gmail.com>

* fix: test

* fix(core): use deterministic file naming in FilesService

---------

Signed-off-by: Hemant M Mehta <hemant29mehta@gmail.com>
2025-10-15 09:29:17 +02:00
github-actions[bot]
ddc072009c chore(version): update to version '0.24.10' 2025-10-14 12:31:13 +00:00
brian-mulier-p
0c7cf30df0 fix(flows): pebble autocompletion performance optimization (#11981)
closes #11881
2025-10-14 11:40:25 +02:00
nKwiatkowski
b6e613d8b3 feat(tests): add flaky tests handling 2025-10-13 17:07:23 +02:00
Sanket Mundra
ae547565cd fix(backend): failing /resume/validate endpoint for integer label values (#11688)
* fix: cast label values to string

* fix: use findByIdWithSourceWithoutAcl() instead of findByIdWithoutAcl() and add test

* remove unwanted files
2025-10-08 10:13:57 +02:00
github-actions[bot]
a3944aa710 chore(version): update to version '0.24.9' 2025-10-07 13:21:58 +00:00
Roman Acevedo
3cec08b352 test: make listDistinctNamespaces more maintainable 2025-10-07 14:46:08 +02:00
Roman Acevedo
003a6359bd test: remove findByNamespace and findDistinctNamespace
they are too hard to maintain
2025-10-07 11:15:31 +02:00
Roman Acevedo
2b804d17e4 ci: change Dockerfile.pr to dynamic version
# Conflicts:
#	Dockerfile.pr
2025-10-07 10:03:10 +02:00
Roman Acevedo
3722642977 ci: migrate CI to kestra-io/actions
- advance on https://github.com/kestra-io/kestra-ee/issues/5363
2025-10-07 10:03:10 +02:00
Roman Acevedo
7ac54bc08d fix(system): compilation issue 2025-10-06 17:51:02 +02:00
Florian Hussonnois
bbf55bee8e fix(core): use primary pebble renderer with masked functions (#11535)
Extract a PebbleEngineFactory class and refactor VariableRenderer to
support engine injection via setter; Delete DebugVariableRenderer.

Fixes: #11535
2025-10-06 17:42:22 +02:00
hemanthsavasere
44579b1b62 refactor(tests): remove outdated README for SecureVariableRendererFactory tests 2025-10-06 17:41:09 +02:00
hemanthsavasere
374c589194 feat(tests): add comprehensive tests for SecureVariableRendererFactory to ensure secret masking functionality 2025-10-06 17:41:01 +02:00
hemanthsavasere
a933ee6cbe feat(execution): add secure variable renderer factory for debug mode
Introduce SecureVariableRendererFactory to create debug renderer instances that wrap the base renderer while maintaining security by masking sensitive functions. This provides a consistent way to handle variable rendering in debug contexts.
2025-10-06 17:40:54 +02:00
Roman Acevedo
9e4eafaf84 fix(executions): try to mitigate SSE and debug log SSE errors
- advance on https://github.com/kestra-io/kestra/issues/11608
2025-10-06 12:27:40 +02:00
mustafatarek
aa8a54f51b refactor: change iteration to start with 0 2025-10-06 11:30:10 +02:00
mustafatarek
2ecdb289b6 fix(core): fix ForEach plugin task.iteration property to show the correct number of Iteration 2025-10-06 11:30:02 +02:00
Sandip Mandal
2f8e530ab1 chore(core): make sure kv listing is filterable (#11536)
Closes https://github.com/kestra-io/kestra/issues/11413.

Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-10-04 09:37:57 +02:00
Loïc Mathieu
6bf4aeb11b fix(executions): purge executions by 100 by default
As 500 may be too much if executions are huge as the batch will be loaded in memory.
2025-10-03 17:00:25 +02:00
Loïc Mathieu
fb70b7299e Revert "fix(core): properly encode filenames with spaces in URI (#11599)"
This reverts commit 59eed01e3a.
2025-10-03 16:57:54 +02:00
Loïc Mathieu
4bd261d650 fix(system): compilation issue 2025-10-03 16:15:48 +02:00
Loïc Mathieu
d4afed683c feat(executions): improve performance of PurgeExecutions by batch deleting executions, logs and metrics
Closes #11680
2025-10-03 15:29:54 +02:00
MilosPaunovic
e92b8da56a fix(core): amend add/edit actions from topology view (#11589)
Closes https://github.com/kestra-io/kestra/issues/10668.
Closes https://github.com/kestra-io/kestra/issues/11408.
Closes https://github.com/kestra-io/kestra/issues/11417.
2025-10-03 15:17:24 +02:00
Loïc Mathieu
01a145ac2c fix(system): potential NPE in Execution.withTaskRun()
This should never happen as normally we should have taskrun already in place whenever we call this method.

But a user report seeing it and I also already seen it once or two. I think it can happen when there is an unexpected event (like a restart or a bug somewhere else that lead to an execution in an unexpected state) so it's better to fix it to be more resilient.

Fixes #11703
2025-10-03 14:33:42 +02:00
Vedant794
59eed01e3a fix(core): properly encode filenames with spaces in URI (#11599)
* Fix the issue of downloading the file with space in name

* fix(core): encode filenames with spaces in URI and add test

* fix: Indent Issue and remove the empty unnecessary lines

* Resolve the error in DownloadFileTest

* Fix: DownloadFileTest issue

* resolve the weirdName issue
2025-10-03 14:22:01 +02:00
Loïc Mathieu
5efeae2a94 fix(executions): evaluate multiple conditions in a separate queue
By evaluating multiple condition in a separate queue, we serialize their evaluation which avoir races when we compute the outputs for flow triggers.
This is because evaluation is a multi step process: first you get the existing condtion, then you evaluate, then you store the result. As this is not guarded by a lock you must not do it concurrently.

The race can still occurs if muiltiple executors run but this is less probable. A re-implementation would be needed probably in 2.0 for that.

Fixes https://github.com/kestra-io/kestra-ee/issues/4602
2025-10-03 11:08:05 +02:00
brian-mulier-p
650e8fac17 fix(core): avoid crashing UI in case of multiline function autocomplete (#11684) 2025-10-03 09:37:20 +02:00
YannC.
73e59496cf fix: missing import 2025-10-03 09:20:35 +02:00
YannC
ed3748e60e fix: set Label schema definition as list of label only, deprecate old… (#11648)
* fix: set Label schema definition as list of label only, deprecate old serdes for it and add schema definition for label

related to kestra-io/client-sdk#62

* fix: Modified the @Schema to avoid remove the map.class definition in schema annotation
2025-10-03 09:05:53 +02:00
Loïc Mathieu
3fcd8e9da7 fix(system): compilation issue 2025-10-01 11:43:04 +02:00
Loïc Mathieu
bc1a60233e chore(system): move the SkipExecution service to the services package
It was there before so it will be easier to backport the change if it moves there.
2025-10-01 11:38:42 +02:00
Loïc Mathieu
bd49bb8b7b feat(system): allow to skip an indexer record
Part-of: https://github.com/kestra-io/kestra-ee/issues/5263
2025-10-01 11:37:55 +02:00
yuri
36ba850d7f chore(logs): make search queries case-insensitive (#11313)
Execution logs' filter query used to be case-sensitive - for example, the `hello` query did not match `Hello World` log lines.
2025-10-01 09:21:52 +02:00
github-actions[bot]
cd080831c5 chore(version): update to version '0.24.8' 2025-09-30 08:27:52 +00:00
yuri1969
8282bf7940 fix(core): enable runIf at execution updating tasks 2025-09-25 14:46:54 +02:00
github-actions[bot]
0c494ebeef chore(version): update to version '0.24.7' 2025-09-23 12:29:37 +00:00
nKwiatkowski
cc58a765e4 chore(version): update to version '0.24.6' 2025-09-23 14:29:07 +02:00
github-actions[bot]
cbcd76416a chore(version): update to version '0.24.7' 2025-09-23 09:34:52 +00:00
nKwiatkowski
e98732e121 fix(tests): compilation error 2025-09-23 11:17:51 +02:00
brian-mulier-p
19e265ee1a fix(ai): avoid moving cursor twice after using AI Copilot (#11451)
closes #11314
2025-09-23 10:35:15 +02:00
brian.mulier
d7fbac8132 fix(system): avoid trigger locking after scheduler restart
closes #11434
2025-09-22 18:39:37 +02:00
brian.mulier
430bac5039 fix(ci): same CI as develop 2025-09-22 18:39:34 +02:00
brian-mulier-p
039ccf827a fix(tests): enforce closing consumers after each tests (#11399) 2025-09-19 16:33:41 +02:00
brian-mulier-p
e15287a79b fix(core): avoid ClassCastException when doing secret decryption (#11393)
closes kestra-io/kestra-ee#5191
2025-09-19 11:35:26 +02:00
Sandip Mandal
798ed061d4 fix(core: webhook curl coomand needs tenant. (#11391) 2025-09-19 14:25:48 +05:30
Loïc Mathieu
9717faaade fix(executions): concurrency limit should update the executioin
As if it's not updated in the database, it would not be detected as changed so that terminal actions (like purge) would not be done.

Fixes  #11022
Fixes #11025
Fixes #8143
2025-09-18 12:12:18 +02:00
Loïc Mathieu
631f0c5cdb fix(executions): the Exit task was not correctly ends parent tasks
Fixes https://github.com/kestra-io/kestra-ee/issues/5168
2025-09-18 11:39:52 +02:00
brian.mulier
bbf619246e fix(core): filters weren't applying anymore 2025-09-17 18:01:02 +02:00
Loïc Mathieu
cb307780e4 fix(executions): possible NPE on dynamic taskrun
Fixes https://github.com/kestra-io/kestra-ee/issues/5166
2025-09-17 15:57:42 +02:00
brian.mulier
95f2bbea1d fix(core): avoid filters from overlapping on other pages when changing query params 2025-09-17 10:39:43 +02:00
brian.mulier
5cb9a340a7 fix(core): avoid clearing filters when reclicking on current left menu item
closes #9476
2025-09-17 10:39:39 +02:00
brian.mulier
c5ccae287e fix(core): avoid undefined error on refresh chart 2025-09-17 10:39:05 +02:00
github-actions[bot]
40a9732dac chore(version): update to version '0.24.6' 2025-09-16 09:21:18 +00:00
brian-mulier-p
ae7aefcc17 fix(security): enhance basic auth security (#11285)
closes kestra-io/kestra-ee#5111
2025-09-15 16:27:58 +02:00
Loïc Mathieu
0a81f67981 chore(version): upgrade to 0.24.5 2025-09-09 11:37:35 +02:00
Bart Ledoux
c4757ed915 fix(flow): arrays of values would never be updated
closes #10784
2025-09-08 11:40:33 +02:00
Miloš Paunović
4577070e32 fix(flow)*: properly handle tab closing by clicking the cross icon in the corner of the panel (#11090)
Relates to https://github.com/kestra-io/kestra/issues/10981.
2025-09-04 14:21:25 +02:00
github-actions[bot]
7bd519ddb4 chore(version): update to version '0.24.4' 2025-09-02 12:17:39 +00:00
Ludovic DEHON
62c85078b6 refactor(ui): posthog as composable and option for ui telemetry
relate to kestra-io/kestra-ee#4831
2025-09-02 11:59:59 +05:30
Ludovic DEHON
3718be9658 refactor(ui): posthog as composable and option for ui telemetry
relate to kestra-io/kestra-ee#4831
2025-09-02 11:56:24 +05:30
Roman Acevedo
2df6c1b730 ci: fix setversion-tag.yml not triggering a main.yml job on a pushed tag
the missing token: ${{ secrets.GH_PERSONAL_TOKEN }} is the only difference between this CI and EE CI, so it is probably the right fix
2025-09-01 16:29:09 +02:00
Ludovic DEHON
9af86ea677 feat(core): add thread http client, deadlock and virtual thread metrics 2025-09-01 00:31:53 +02:00
Ludovic DEHON
8601905994 fix(core): disable useless health check 2025-09-01 00:31:42 +02:00
Ludovic DEHON
9de1a15d02 feat(core): add netty metrics on micrometer 2025-09-01 00:31:10 +02:00
Ludovic DEHON
25b056ebb3 fix(core): align open source & ee configuration 2025-09-01 00:29:54 +02:00
Loïc Mathieu
87d8f9867f fix(executions): clear errors/finally/afterExecution branches when changing the state of a taskrun
As changing the state of a taskrun will restart the flow, if we didn't clear those branches, the flow would not resart properly.

Fixes https://github.com/kestra-io/kestra-ee/issues/3211
2025-08-29 16:26:01 +02:00
Piyush Bhaskar
a00c1f8397 fix(ui): do not allow white space in password (#10987) 2025-08-29 16:49:18 +05:30
github-actions[bot]
f4470095ff chore(core): localize to languages other than english (#10933)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-08-28 11:39:21 +02:00
brian.mulier
cbfaa8815d fix(dashboard): working dashboard edit 2025-08-28 11:39:21 +02:00
brian.mulier
10e55bbb77 fix(dashboard): don't duplicate id on source retrieval 2025-08-28 11:39:21 +02:00
brian-mulier-p
59d5d4cb91 feat(dashboard): mandatory id + add autogenerated id to source for legacy handling (#10912)
closes kestra-io/kestra-ee#4484
2025-08-28 11:39:21 +02:00
brian.mulier
e8ee3b0a84 fix(core): allow some left menu methods inheritance
part of kestra-io/kestra-ee#4728
2025-08-27 10:49:16 +02:00
YannC.
602ff849e3 fix: clean translation 2025-08-26 17:49:05 +02:00
github-actions[bot]
155bdca83f chore(version): update to version '0.24.3' 2025-08-26 13:10:59 +00:00
github-actions[bot]
faaaeada3a chore(core): localize to languages other than english (#10904)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-08-26 12:27:13 +02:00
Piyush Bhaskar
6ef35974d7 fix(core): do not overflow the version selection on release notes (#10903) 2025-08-26 13:28:03 +05:30
YannC
46f9bb768f feat: add action to merge release note between OSS and EE (#10882) 2025-08-26 09:42:47 +02:00
Piyush Bhaskar
ab87f63e8c fix(ui): bring better small chart and tooltip. (#10839) 2025-08-26 13:06:04 +05:30
YannC
cdb73ccbd7 fix: allow to enforce editor view when list is unreadable, also truncate too long column (#10885) 2025-08-26 09:12:04 +02:00
brian.mulier
8fc936e0a3 fix(logs): emitAsync is now keeping messages order 2025-08-25 16:54:45 +02:00
brian.mulier
1e0ebc94b8 fix(logs): higher max message length to keep stacktraces in a single log 2025-08-25 16:54:45 +02:00
brian.mulier
5318592eff chore(deps): bump Micronaut platform to 4.9.2
closes #10626
closes #10788
2025-08-25 16:54:45 +02:00
Piyush Bhaskar
2da08f160d fix(core): show the logs for the task from topology graph. (#10890) 2025-08-25 18:48:46 +05:30
Roman Acevedo
8cbc9e7aff ci: backport recent docker semver rework
backport changes from develop maede in https://github.com/kestra-io/kestra/pull/10848
2025-08-22 15:26:32 +02:00
brian-mulier-p
f8e15d103f fix(kv): Set task should convert numbers to string if kvType == STRING (#10836) 2025-08-21 09:33:43 +02:00
Loïc Mathieu
49794a4f2a fix(system): properly close the ScheduledExecutorService tasks
This avoids having running threads while the component is supposed to be closed.
2025-08-20 15:57:43 +02:00
Nicolas K.
bafa5fe03c fix(test): disable kafka concurrency queue test (#10755)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-08-19 18:12:26 +02:00
nKwiatkowski
208b244f0f chore: update version to 0.24.2 2025-08-19 15:15:36 +02:00
Barthélémy Ledoux
b93976091d fix(core): avoid triggering hundreds of reactivity updates for each icon (#10766) 2025-08-19 14:07:55 +02:00
brian.mulier
eec52d76f0 fix(namespaces): namespace files content was not sent to the flow namespace
part of #7499
2025-08-19 12:18:12 +02:00
brian.mulier
b96fd87572 fix(core): change cache policy on files returned by webserver that needs to stay fresh
closes #7499
2025-08-19 11:55:08 +02:00
brian.mulier
1aa5bfab43 fix(namespaces): properly send editor content upon creating / updating ns file
part of #7499
2025-08-19 11:54:58 +02:00
Roman Acevedo
c4572e86a5 fix(tests): filter out ExecutionKind.TEST from FlowTriggers
- fixes Silence flow trigger on TEST-kind executions kestra-ee#4689
2025-08-19 11:08:06 +02:00
Florian Hussonnois
f2f97bb70c fix(core): fix preconditions rendering for ExecutionOutputs (#10651)
Ensure that preconditions are always re-rendered for any
new executions

Changes:
* add new fluent skipCache methods on RunContextProperty and Property
  classes

Fix: #10651
2025-08-18 21:01:05 +02:00
Roman Acevedo
804c740d3c fix(tests): namespace binding was breaking filtering in Flow page
fixes https://github.com/kestra-io/kestra-ee/issues/4691

the additional namespace binding in Tabs was added in PR https://github.com/kestra-io/kestra/pull/10543 to solve the special case of Namespace creation
2025-08-18 15:05:26 +02:00
Loïc Mathieu
75cd4f44e0 fix(execution): parallel flowable may not ends all child flowable
Parallel flowable tasks like `Parallel`, `Dag` and `ForEach` are racy. When a task fail in a branch, other concurrent branches that have flowable may never ends.
We make sure that all children are terminated when a flowable is itself terminated.

Fixes #6780
2025-08-14 12:26:37 +02:00
YannC
f167a2a2bb fix: avoid file being displayed as diff in namespace file editor (#10746)
close #10744
2025-08-14 10:38:51 +02:00
Loïc Mathieu
08d9416e3a fix(execution): concurrency limit didn't work with afterExecutions
This is because the execution is never considered fully terminated so concurrency limit is not handled properly.
This should also affect SLA, trigger lock, and other cleaning stuff.

The root issue is that, with a worker task from afterExecution, there are no other update on the execution itself (as it's already terminated) so no execution messages are again processed by the executor.

Because of that, the worker task result message from the afterExecution block is the last message, but unfortunatly as messages from the worker task result have no flow attached, the computation of the final termination is incorrect.
The solution is to load the flow if null inside the executor and the execution is terminated which should only occurs inside afterExecution.

Fixes #10657
Fixes #8459
Fixes #8609
2025-08-13 09:32:40 +02:00
Prayag
2a879c617c fix(core): Enter key is now validating filter / refreshing data (#9630)
closes #9471

---------

Co-authored-by: brian.mulier <bmmulier@hotmail.fr>
2025-08-12 17:29:45 +02:00
Loïc Mathieu
3227ca7c11 fix(executions): SLA monitor should take into account restarted executions 2025-08-12 11:50:03 +02:00
Loïc Mathieu
428a52ce02 fix(executions): concurrency limit exceeded when restarting an execution
Fixes #7880
2025-08-12 11:49:40 +02:00
YannC.
f58bc4caba chore: update version to 0.23.11 2025-08-12 10:58:41 +02:00
Loïc Mathieu
e99ae9513f fix(executions): correctly fail the request when trying to resume an execution with the wrong inputs
Fixes #9959
2025-08-12 09:40:44 +02:00
Piyush Bhaskar
c8b51fcacf fix(core): reduce size of code block text and padding (#10689) 2025-08-12 11:47:41 +05:30
brian.mulier
813b2f6439 fix(dashboard): avoid duplicate dashboard calls + properly refresh dashboards on refresh button + don't discard component entirely on refresh 2025-08-11 22:29:15 +02:00
brian.mulier
c6b5bca25b fix(dashboard): properly use time filters in queries
closes kestra-io/kestra-ee#4389
2025-08-11 22:29:15 +02:00
brian.mulier
de35d2cdb9 tests(core): add a test to taskrunners to ensure it's working multiple times on the same working directory
part of kestra-io/plugin-ee-kubernetes#45
2025-08-11 15:06:21 +02:00
Loïc Mathieu
a6ffbd59d0 fix(executions): properly fail the task if it contains unsupported unicode sequence
This occurs in Postgres using the `\u0000` unicode sequence. Postgres refuse to store any JSONB with this sequence as it has no textual representation.
We now properly detect that and fail the task.

Fixes #10326
2025-08-11 11:54:16 +02:00
Piyush Bhaskar
568740a214 fix(flows): copy trigger url propely. (#10645) 2025-08-08 13:13:02 +05:30
Loïc Mathieu
aa0d2c545f fix(executions): allow caching tasks that use the 'workingDir' variable
Fixes #10253
2025-08-08 09:08:00 +02:00
brian.mulier
cda77d5146 fix(core): ensure props with defaults are not marked as required in generated doc 2025-08-07 15:10:16 +02:00
brian.mulier
d4fd1f61ba fix(core): wrong @NotNull import leading to key not being marked as required
closes #9287
2025-08-07 15:10:16 +02:00
github-actions[bot]
9859ea5eb6 chore(version): update to version '0.24.0' 2025-08-05 12:01:23 +00:00
Piyush Bhaskar
aca374a28f fix(flows): ensure plugin documentation change on flow switch (#10546)
Co-authored-by: Barthélémy Ledoux <bledoux@kestra.io>
2025-08-05 15:21:21 +05:30
Barthélémy Ledoux
c413ba95e1 fix(flows): add conditional rendering for restart button based on execution (#10570) 2025-08-05 10:22:53 +02:00
Barthélémy Ledoux
9c6b92619e fix: restore InputForm (#10568) 2025-08-05 09:45:10 +02:00
brian.mulier
8173e8df51 fix(namespaces): autocomplete in kv & secrets
related to kestra-io/kestra-ee#4559
2025-08-04 20:30:06 +02:00
brian.mulier
5c95505911 fix(executions): avoid SSE error in follow execution dependencies
closes #10560
2025-08-04 20:23:40 +02:00
Barthélémy Ledoux
33f0b533bb fix(flows)*: load flow for execution needs to be stored most of the time (#10566) 2025-08-04 18:55:57 +02:00
brian.mulier
23e35a7f97 chore(version): upgrade version to 0.24.0-rc2-SNAPSHOT 2025-08-04 16:19:44 +02:00
Abhilash T
0357321c58 fix: Updated InputsForm.vue to clear Radio Button Selection (#9654)
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
Co-authored-by: Bart Ledoux <bledoux@kestra.io>
2025-08-04 16:03:55 +02:00
Barthélémy Ledoux
5c08403398 fix(flows): no-code - when changing type message avoid warning (#10498) 2025-08-04 15:57:23 +02:00
Barthélémy Ledoux
a63cb71218 fix: remove debugging value from playground (#10541) 2025-08-04 15:57:05 +02:00
brian.mulier
317885b91c fix(executions): restore execution redirect & subflow logs view from parent
closes #10528
closes #10551
2025-08-04 15:47:49 +02:00
Piyush Bhaskar
87637302e4 chore(core): remove variable and directly assign. (#10554) 2025-08-04 18:51:14 +05:30
Piyush Bhaskar
056faaaf9f fix(core): proper state detection from parsed data (#10527) 2025-08-04 18:50:53 +05:30
Miloš Paunović
54c74a1328 chore(namespaces): add the needed prop for loading all namespaces inside a selector (#10544) 2025-08-04 12:45:06 +02:00
Miloš Paunović
fae0c88c5e fix(namespaces): amend problems with namespace secrets and kv pairs (#10543)
Closes https://github.com/kestra-io/kestra-ee/issues/4584.
2025-08-04 12:20:37 +02:00
YannC.
db5d83d1cb fix: add missing webhook releases secrets for github releases 2025-08-01 23:22:18 +02:00
brian.mulier
066b947762 fix(core): remove icon for inputs in no-code
closes #10520
2025-08-01 16:32:55 +02:00
Piyush Bhaskar
b6597475b1 fix(namespaces): fixes loading of additional ns (#10518) 2025-08-01 17:01:53 +05:30
brian.mulier
f2610baf15 fix(executions): avoid race condition leading to never-ending follow with non-terminal state 2025-08-01 13:12:59 +02:00
brian.mulier
b619bf76d8 fix(core): ensure instances can read all messages when no consumer group / queue type 2025-08-01 13:12:59 +02:00
Loïc Mathieu
117f453a77 feat(flows): warn on runnable only properties on non-runnable tasks
Closes #9967
Closes #10500
2025-08-01 12:53:24 +02:00
Piyush Bhaskar
053d6276ff fix(executions): do not rely on monaco to get value (#10515) 2025-08-01 13:26:25 +05:30
Barthélémy Ledoux
3870eca70b fix(flows): playground need to use ui-libs (#10506) 2025-08-01 09:06:51 +02:00
Piyush Bhaskar
afd7c216f9 fix(flows): route to flow page (#10514) 2025-08-01 12:13:08 +05:30
Piyush Bhaskar
59a17e88e7 fix(executions): properly handle methods and computed for tabs (#10513) 2025-08-01 12:12:54 +05:30
Piyush Bhaskar
99f8dca1c2 fix(editor): adjust padding for editor (#10497)
* fix(editor): adjust padding for editor

* fix: make padding 16px
2025-08-01 12:12:38 +05:30
YannC
1068c9fe51 fix: handle empty flows list in lastExecutions correctly (#10493) 2025-08-01 07:21:16 +02:00
YannC
ea6d30df7c fix(ui): load correctly filters + refresh dashboard on filter change (#10504) 2025-08-01 07:16:34 +02:00
Loïc Mathieu
04ba7363c2 fix(ci): workflow build artifact doesn't need the plugin version 2025-07-31 14:32:57 +02:00
Loïc Mathieu
281a987944 chore(version): upgrade version to 0.24.0-rc1-SNAPSHOT 2025-07-31 14:20:07 +02:00
github-actions[bot]
c9ce54b0be chore(core): localize to languages other than english (#10494)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-31 14:19:14 +02:00
github-actions[bot]
ccd9baef3c chore(core): localize to languages other than english (#10489)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-31 14:19:04 +02:00
Barthélémy Ledoux
97869b9c75 fix(flows): forget all old taskRunId when a new execution (#10487) 2025-07-31 14:17:14 +02:00
Barthélémy Ledoux
1c681c1492 fix(flows): wait longer for widgets to be rendered (#10485) 2025-07-31 14:17:06 +02:00
Barthélémy Ledoux
de2a446f93 fix(flows): load flows documentation when coming back to no-code root (#10374) 2025-07-31 14:17:00 +02:00
Barthélémy Ledoux
d778947017 fix(flows): add the load errors to the flow errors (#10483) 2025-07-31 14:16:47 +02:00
Barthélémy Ledoux
3f97845fdd fix(flows): hide executionkind meta in the logs (#10482) 2025-07-31 14:16:41 +02:00
Barthélémy Ledoux
631cd169a1 fix(executions): do not rely on monaco to get value (#10467) 2025-07-31 14:16:33 +02:00
Barthélémy Ledoux
1648fa076c fix(flows): playground - implement new designs (#10459)
Co-authored-by: brian.mulier <bmmulier@hotmail.fr>
2025-07-31 14:16:26 +02:00
Barthélémy Ledoux
474806882e fix(flows): playground align restart button button (#10415) 2025-07-31 14:16:17 +02:00
Barthélémy Ledoux
65467bd118 fix(flows): playground clear current execution when clearExecutions() (#10414) 2025-07-31 14:16:04 +02:00
YannC
387bbb80ac feat(ui): added http method autocompletion (#10492) 2025-07-31 13:29:22 +02:00
Loïc Mathieu
19d4c64f19 fix(executions): Don't create outputs from the Subflow task when we didn't wait
As, well, if we didn't wait for the subflow execution, we cannot have access to its outputs.
2025-07-31 13:07:26 +02:00
Loïc Mathieu
809c0a228c feat(system): improve performance of computeSchedulable
- Store flowIds in a list to avoid computing the multiple times
- Storeg triggers by ID in a map to avoid iterating the list of triggers for each flow
2025-07-31 12:34:30 +02:00
Piyush Bhaskar
6a045900fb fix(core): remove top spacing from no execution page and removing the redundant code (#10445)
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-07-31 13:28:08 +05:30
Piyush Bhaskar
4ada5fe8f3 fix(executions): make columns that are not links normal text (#10460)
* fix(executions): make it normal text

* fix(executions): use monospace font only
2025-07-31 13:27:45 +05:30
github-actions[bot]
998087ca30 chore(core): localize to languages other than english (#10471)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-31 08:25:16 +02:00
Malaydewangan09
146338e48f feat(plugins): add script plugins 2025-07-30 23:34:55 +05:30
brian.mulier
de177b925e chore(deps): hardcode vue override version 2025-07-30 19:26:31 +02:00
brian.mulier
04bfb19095 fix(core): avoid follow execution from being discarded too early
closes #10472
closes #7623
2025-07-30 19:26:31 +02:00
brian-mulier-p
c913c48785 fix(core): redesign playground run task button (#10423)
closes #10389
2025-07-30 15:27:49 +02:00
François Delbrayelle
0d5b593d42 fix(): fix icons 2025-07-30 14:55:33 +02:00
weibo1
83f92535c5 feat: Trigger Initialization Method Performance Optimization 2025-07-30 14:54:08 +02:00
Loïc Mathieu
fd6a0a6c11 fix(ci): bad SNAPSHOT repo URL 2025-07-30 12:57:28 +02:00
Loïc Mathieu
104c4c97b4 fix(ci): don't publish docker in build-artifact 2025-07-30 12:05:30 +02:00
Loïc Mathieu
21cd21269f fix(ci): add missing build artifact job 2025-07-30 11:50:26 +02:00
Loïc Mathieu
679befa2fe build(ci): allow downloading the exe from the workflow and not the release
This would allow running the workflow even if the release step fail
2025-07-30 11:24:21 +02:00
YannC
8a0ecdeb8a fix(dashboard): pageSize & pageNumber is now correctly pass when fetching a chart (#10413) 2025-07-30 08:45:51 +02:00
YannC.
ee8762e138 fix(ci): correctly pass GH token to release workflow 2025-07-29 15:04:18 +02:00
github-actions[bot]
d16324f265 chore(version): update to version 'v0.24.0-rc0-SNAPSHOT'. 2025-07-29 12:14:49 +00:00
264 changed files with 5530 additions and 3500 deletions

View File

@@ -1,29 +0,0 @@
name: 'Load Kestra Plugin List'
description: 'Composite action to load list of plugins'
inputs:
plugin-version:
description: "Kestra version"
default: 'LATEST'
required: true
plugin-file:
description: "File of the plugins"
default: './.plugins'
required: true
outputs:
plugins:
description: "List of all Kestra plugins"
value: ${{ steps.plugins.outputs.plugins }}
repositories:
description: "List of all Kestra repositories of plugins"
value: ${{ steps.plugins.outputs.repositories }}
runs:
using: composite
steps:
- name: Get Plugins List
id: plugins
shell: bash
run: |
PLUGINS=$([ -f ${{ inputs.plugin-file }} ] && cat ${{ inputs.plugin-file }} | grep "io\\.kestra\\." | sed -e '/#/s/^.//' | sed -e "s/LATEST/${{ inputs.plugin-version }}/g" | cut -d':' -f2- | xargs || echo '');
REPOSITORIES=$([ -f ${{ inputs.plugin-file }} ] && cat ${{ inputs.plugin-file }} | grep "io\\.kestra\\." | sed -e '/#/s/^.//' | cut -d':' -f1 | uniq | sort | xargs || echo '')
echo "plugins=$PLUGINS" >> $GITHUB_OUTPUT
echo "repositories=$REPOSITORIES" >> $GITHUB_OUTPUT

View File

@@ -1,20 +0,0 @@
name: 'Setup vars'
description: 'Composite action to setup common vars'
outputs:
tag:
description: "Git tag"
value: ${{ steps.vars.outputs.tag }}
commit:
description: "Git commit"
value: ${{ steps.vars.outputs.commit }}
runs:
using: composite
steps:
# Setup vars
- name: Set variables
id: vars
shell: bash
run: |
TAG=${GITHUB_REF#refs/*/}
echo "tag=${TAG}" >> $GITHUB_OUTPUT
echo "commit=$(git rev-parse --short "$GITHUB_SHA")" >> $GITHUB_OUTPUT

View File

@@ -1,67 +0,0 @@
name: Auto-Translate UI keys and create PR
on:
schedule:
- cron: "0 9-21 * * *" # Every hour from 9 AM to 9 PM
workflow_dispatch:
inputs:
retranslate_modified_keys:
description: "Whether to re-translate modified keys even if they already have translations."
type: choice
options:
- "false"
- "true"
default: "false"
required: false
jobs:
translations:
name: Translations
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- uses: actions/checkout@v4
name: Checkout
with:
fetch-depth: 0
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.x"
- name: Install Python dependencies
run: pip install gitpython openai
- name: Generate translations
run: python ui/src/translations/generate_translations.py ${{ github.event.inputs.retranslate_modified_keys }}
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
- name: Set up Node
uses: actions/setup-node@v4
with:
node-version: "20.x"
- name: Set up Git
run: |
git config --global user.name "GitHub Action"
git config --global user.email "actions@github.com"
- name: Commit and create PR
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
BRANCH_NAME="chore/update-translations-$(date +%s)"
git checkout -b $BRANCH_NAME
git add ui/src/translations/*.json
if git diff --cached --quiet; then
echo "No changes to commit. Exiting with success."
exit 0
fi
git commit -m "chore(core): localize to languages other than english" -m "Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference."
git push -u origin $BRANCH_NAME || (git push origin --delete $BRANCH_NAME && git push -u origin $BRANCH_NAME)
gh pr create --title "Translations from en.json" --body $'This PR was created automatically by a GitHub Action.\n\nSomeone from the @kestra-io/frontend team needs to review and merge.' --base ${{ github.ref_name }} --head $BRANCH_NAME
- name: Check keys matching
run: node ui/src/translations/check.js

View File

@@ -1,85 +0,0 @@
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
name: "CodeQL"
on:
schedule:
- cron: '0 5 * * 1'
workflow_dispatch: {}
jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
# Override automatic language detection by changing the below list
# Supported options are ['csharp', 'cpp', 'go', 'java', 'javascript', 'python']
language: ['java', 'javascript']
# Learn more...
# https://docs.github.com/en/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#overriding-automatic-language-detection
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
# We must fetch at least the immediate parents so that if this is
# a pull request then we can checkout the head.
fetch-depth: 2
# If this run was triggered by a pull request event, then checkout
# the head of the pull request instead of the merge commit.
- run: git checkout HEAD^2
if: ${{ github.event_name == 'pull_request' }}
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v3
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main
# Set up JDK
- name: Set up JDK
uses: actions/setup-java@v4
if: ${{ matrix.language == 'java' }}
with:
distribution: 'temurin'
java-version: 21
- name: Setup gradle
if: ${{ matrix.language == 'java' }}
uses: gradle/actions/setup-gradle@v4
- name: Build with Gradle
if: ${{ matrix.language == 'java' }}
run: ./gradlew testClasses -x :ui:assembleFrontend
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
if: ${{ matrix.language != 'java' }}
uses: github/codeql-action/autobuild@v3
# Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
# and modify them (or add more) to build your code if your project
# uses a compiled language
#- run: |
# make bootstrap
# make release
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v3

View File

@@ -1,147 +0,0 @@
name: Create Docker images on Release
on:
workflow_dispatch:
inputs:
retag-latest:
description: 'Retag latest Docker images'
required: true
type: string
default: "false"
options:
- "true"
- "false"
release-tag:
description: 'Kestra Release Tag'
required: false
type: string
plugin-version:
description: 'Plugin version'
required: false
type: string
default: "LATEST"
env:
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
jobs:
plugins:
name: List Plugins
runs-on: ubuntu-latest
outputs:
plugins: ${{ steps.plugins.outputs.plugins }}
steps:
# Checkout
- uses: actions/checkout@v4
# Get Plugins List
- name: Get Plugins List
uses: ./.github/actions/plugins-list
id: plugins
with:
plugin-version: ${{ env.PLUGIN_VERSION }}
docker:
name: Publish Docker
needs: [ plugins ]
runs-on: ubuntu-latest
strategy:
matrix:
image:
- name: "-no-plugins"
plugins: ""
packages: jattach
python-libs: ""
- name: ""
plugins: ${{needs.plugins.outputs.plugins}}
packages: python3 python-is-python3 python3-pip curl jattach
python-libs: kestra
steps:
- uses: actions/checkout@v4
# Vars
- name: Set image name
id: vars
run: |
if [[ "${{ inputs.release-tag }}" == "" ]]; then
TAG=${GITHUB_REF#refs/*/}
echo "tag=${TAG}" >> $GITHUB_OUTPUT
else
TAG="${{ inputs.release-tag }}"
echo "tag=${TAG}" >> $GITHUB_OUTPUT
fi
if [[ "${{ env.PLUGIN_VERSION }}" == *"-SNAPSHOT" ]]; then
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ ${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT;
else
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
fi
# Download release
- name: Download release
uses: robinraju/release-downloader@v1.12
with:
tag: ${{steps.vars.outputs.tag}}
fileName: 'kestra-*'
out-file-path: build/executable
- name: Copy exe to image
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
# Docker setup
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Docker - Fix Qemu
shell: bash
run: |
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
# Docker Login
- name: Login to DockerHub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_PASSWORD }}
# Docker Build and push
- name: Push to Docker Hub
uses: docker/build-push-action@v6
with:
context: .
push: true
tags: ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }}
platforms: linux/amd64,linux/arm64
build-args: |
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }}
APT_PACKAGES=${{ matrix.image.packages }}
PYTHON_LIBRARIES=${{ matrix.image.python-libs }}
- name: Install regctl
if: github.event.inputs.retag-latest == 'true'
uses: regclient/actions/regctl-installer@main
- name: Retag to latest
if: github.event.inputs.retag-latest == 'true'
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
end:
runs-on: ubuntu-latest
needs:
- docker
if: always()
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
steps:
# Slack
- name: Slack notification
uses: Gamesight/slack-workflow-status@master
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
name: GitHub Actions
icon_emoji: ':github-actions:'
channel: 'C02DQ1A7JLR' # _int_git channel

View File

@@ -1,86 +0,0 @@
name: 'E2E tests revival'
description: 'New E2E tests implementation started by Roman. Based on playwright in npm UI project, tests Kestra OSS develop docker image. These tests are written from zero, lets make them unflaky from the start!.'
on:
schedule:
- cron: "0 * * * *" # Every hour
workflow_call:
inputs:
noInputYet:
description: 'not input yet.'
required: false
type: string
default: "no input"
workflow_dispatch:
inputs:
noInputYet:
description: 'not input yet.'
required: false
type: string
default: "no input"
jobs:
check:
timeout-minutes: 10
runs-on: ubuntu-latest
env:
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
steps:
- name: Login to DockerHub
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ github.token }}
- name: Checkout kestra
uses: actions/checkout@v4
with:
path: kestra
# Setup build
- uses: kestra-io/actions/.github/actions/setup-build@main
name: Setup - Build
id: build
with:
java-enabled: true
node-enabled: true
python-enabled: true
- name: Install Npm dependencies
run: |
cd kestra/ui
npm i
npx playwright install --with-deps chromium
- name: Run E2E Tests
run: |
cd kestra
sh build-and-start-e2e-tests.sh
- name: Upload Playwright Report as Github artifact
# 'With this report, you can analyze locally the results of the tests. see https://playwright.dev/docs/ci-intro#html-report'
uses: actions/upload-artifact@v4
if: ${{ !cancelled() }}
with:
name: playwright-report
path: kestra/ui/playwright-report/
retention-days: 7
# Allure check
# TODO I don't know what it should do
# - uses: rlespinasse/github-slug-action@v5
# name: Allure - Generate slug variables
#
# - name: Allure - Publish report
# uses: andrcuns/allure-publish-action@v2.9.0
# if: always() && env.GOOGLE_SERVICE_ACCOUNT != ''
# continue-on-error: true
# env:
# GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_AUTH_TOKEN }}
# JAVA_HOME: /usr/lib/jvm/default-jvm/
# with:
# storageType: gcs
# resultsGlob: "**/build/allure-results"
# bucket: internal-kestra-host
# baseUrl: "https://internal.dev.kestra.io"
# prefix: ${{ format('{0}/{1}', github.repository, 'allure/java') }}
# copyLatest: true
# ignoreMissingResults: true

View File

@@ -1,82 +0,0 @@
name: Run Gradle Release for Kestra Plugins
on:
workflow_dispatch:
inputs:
releaseVersion:
description: 'The release version (e.g., 0.21.0-rc1)'
required: true
type: string
nextVersion:
description: 'The next version (e.g., 0.22.0-SNAPSHOT)'
required: true
type: string
dryRun:
description: 'Use DRY_RUN mode'
required: false
default: 'false'
jobs:
release:
name: Release plugins
runs-on: ubuntu-latest
steps:
# Checkout
- uses: actions/checkout@v4
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v4
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: ./actions/.github/actions/setup-build
id: build
with:
java-enabled: true
node-enabled: true
python-enabled: true
caches-enabled: true
# Get Plugins List
- name: Get Plugins List
uses: ./.github/actions/plugins-list
id: plugins-list
with:
plugin-version: 'LATEST'
- name: 'Configure Git'
run: |
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
git config --global user.name "github-actions[bot]"
# Execute
- name: Run Gradle Release
if: ${{ github.event.inputs.dryRun == 'false' }}
env:
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
run: |
chmod +x ./dev-tools/release-plugins.sh;
./dev-tools/release-plugins.sh \
--release-version=${{github.event.inputs.releaseVersion}} \
--next-version=${{github.event.inputs.nextVersion}} \
--yes \
${{ steps.plugins-list.outputs.repositories }}
- name: Run Gradle Release (DRY_RUN)
if: ${{ github.event.inputs.dryRun == 'true' }}
env:
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
run: |
chmod +x ./dev-tools/release-plugins.sh;
./dev-tools/release-plugins.sh \
--release-version=${{github.event.inputs.releaseVersion}} \
--next-version=${{github.event.inputs.nextVersion}} \
--dry-run \
--yes \
${{ steps.plugins-list.outputs.repositories }}

View File

@@ -1,92 +0,0 @@
name: Run Gradle Release
run-name: "Releasing Kestra ${{ github.event.inputs.releaseVersion }} 🚀"
on:
workflow_dispatch:
inputs:
releaseVersion:
description: 'The release version (e.g., 0.21.0-rc1)'
required: true
type: string
nextVersion:
description: 'The next version (e.g., 0.22.0-SNAPSHOT)'
required: true
type: string
env:
RELEASE_VERSION: "${{ github.event.inputs.releaseVersion }}"
NEXT_VERSION: "${{ github.event.inputs.nextVersion }}"
jobs:
release:
name: Release Kestra
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/develop'
steps:
# Checks
- name: Check Inputs
run: |
if ! [[ "$RELEASE_VERSION" =~ ^[0-9]+(\.[0-9]+)\.0-rc[01](-SNAPSHOT)?$ ]]; then
echo "Invalid release version. Must match regex: ^[0-9]+(\.[0-9]+)\.0-rc[01](-SNAPSHOT)?$"
exit 1
fi
if ! [[ "$NEXT_VERSION" =~ ^[0-9]+(\.[0-9]+)\.0-SNAPSHOT$ ]]; then
echo "Invalid next version. Must match regex: ^[0-9]+(\.[0-9]+)\.0-SNAPSHOT$"
exit 1;
fi
# Checkout
- uses: actions/checkout@v4
with:
fetch-depth: 0
path: kestra
# Checkout GitHub Actions
- uses: actions/checkout@v4
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: ./actions/.github/actions/setup-build
id: build
with:
java-enabled: true
node-enabled: true
python-enabled: true
caches-enabled: true
- name: Configure Git
run: |
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
git config --global user.name "github-actions[bot]"
# Execute
- name: Run Gradle Release
env:
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
run: |
# Extract the major and minor versions
BASE_VERSION=$(echo "$RELEASE_VERSION" | sed -E 's/^([0-9]+\.[0-9]+)\..*/\1/')
PUSH_RELEASE_BRANCH="releases/v${BASE_VERSION}.x"
cd kestra
# Create and push release branch
git checkout -b "$PUSH_RELEASE_BRANCH";
git push -u origin "$PUSH_RELEASE_BRANCH";
# Run gradle release
git checkout develop;
if [[ "$RELEASE_VERSION" == *"-SNAPSHOT" ]]; then
# -SNAPSHOT qualifier maybe used to test release-candidates
./gradlew release -Prelease.useAutomaticVersion=true \
-Prelease.releaseVersion="${RELEASE_VERSION}" \
-Prelease.newVersion="${NEXT_VERSION}" \
-Prelease.pushReleaseVersionBranch="${PUSH_RELEASE_BRANCH}" \
-Prelease.failOnSnapshotDependencies=false
else
./gradlew release -Prelease.useAutomaticVersion=true \
-Prelease.releaseVersion="${RELEASE_VERSION}" \
-Prelease.newVersion="${NEXT_VERSION}" \
-Prelease.pushReleaseVersionBranch="${PUSH_RELEASE_BRANCH}"
fi

87
.github/workflows/main-build.yml vendored Normal file
View File

@@ -0,0 +1,87 @@
name: Main Workflow
on:
push:
branches:
- releases/*
- develop
workflow_dispatch:
inputs:
skip-test:
description: 'Skip test'
type: choice
required: true
default: 'false'
options:
- "true"
- "false"
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}-main
cancel-in-progress: true
jobs:
backend-tests:
name: Backend tests
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
uses: kestra-io/actions/.github/workflows/kestra-oss-backend-tests.yml@main
secrets:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
frontend-tests:
name: Frontend tests
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
uses: kestra-io/actions/.github/workflows/kestra-oss-frontend-tests.yml@main
secrets:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
publish-develop-docker:
name: Publish Docker
needs: [backend-tests, frontend-tests]
if: "!failure() && !cancelled() && github.ref == 'refs/heads/develop'"
uses: kestra-io/actions/.github/workflows/kestra-oss-publish-docker.yml@main
with:
plugin-version: 'LATEST-SNAPSHOT'
secrets:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
publish-develop-maven:
name: Publish develop Maven
needs: [ backend-tests, frontend-tests ]
if: "!failure() && !cancelled() && github.ref == 'refs/heads/develop'"
uses: kestra-io/actions/.github/workflows/kestra-oss-publish-maven.yml@main
secrets:
SONATYPE_USER: ${{ secrets.SONATYPE_USER }}
SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }}
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
end:
runs-on: ubuntu-latest
needs: [publish-develop-docker, publish-develop-maven]
if: always()
steps:
- name: Trigger EE Workflow
uses: peter-evans/repository-dispatch@v3
if: github.ref == 'refs/heads/develop' && needs.release.result == 'success'
with:
token: ${{ secrets.GH_PERSONAL_TOKEN }}
repository: kestra-io/kestra-ee
event-type: "oss-updated"
# Slack
- name: Slack - Notification
if: ${{ failure() && env.SLACK_WEBHOOK_URL != 0 && (github.ref == 'refs/heads/master' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/develop') }}
uses: kestra-io/actions/composite/slack-status@main
with:
webhook-url: ${{ secrets.SLACK_WEBHOOK_URL }}

View File

@@ -1,73 +0,0 @@
name: Main Workflow
on:
workflow_dispatch:
inputs:
plugin-version:
description: "Kestra version"
default: 'LATEST'
required: true
type: string
push:
branches:
- master
- main
- releases/*
- develop
tags:
- v*
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}-main
cancel-in-progress: true
jobs:
tests:
name: Execute tests
uses: ./.github/workflows/workflow-test.yml
with:
report-status: false
release:
name: Release
needs: [tests]
if: "!startsWith(github.ref, 'refs/heads/releases')"
uses: ./.github/workflows/workflow-release.yml
with:
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
secrets:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
SONATYPE_USER: ${{ secrets.SONATYPE_USER }}
SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }}
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
end:
runs-on: ubuntu-latest
needs:
- release
if: always()
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
steps:
- name: Trigger EE Workflow
uses: peter-evans/repository-dispatch@v3
if: github.ref == 'refs/heads/develop' && needs.release.result == 'success'
with:
token: ${{ secrets.GH_PERSONAL_TOKEN }}
repository: kestra-io/kestra-ee
event-type: "oss-updated"
# Slack
- name: Slack - Notification
uses: Gamesight/slack-workflow-status@master
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
name: GitHub Actions
icon_emoji: ":github-actions:"
channel: "C02DQ1A7JLR" # _int_git channel

60
.github/workflows/pre-release.yml vendored Normal file
View File

@@ -0,0 +1,60 @@
name: Pre Release
on:
push:
tags:
- 'v*'
workflow_dispatch:
inputs:
skip-test:
description: 'Skip test'
type: choice
required: true
default: 'false'
options:
- "true"
- "false"
jobs:
build-artifacts:
name: Build Artifacts
uses: kestra-io/actions/.github/workflows/kestra-oss-build-artifacts.yml@main
backend-tests:
name: Backend tests
uses: kestra-io/actions/.github/workflows/kestra-oss-backend-tests.yml@main
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
secrets:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
frontend-tests:
name: Frontend tests
uses: kestra-io/actions/.github/workflows/kestra-oss-frontend-tests.yml@main
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
secrets:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
publish-maven:
name: Publish Maven
needs: [ backend-tests, frontend-tests ]
if: "!failure() && !cancelled()"
uses: kestra-io/actions/.github/workflows/kestra-oss-publish-maven.yml@main
secrets:
SONATYPE_USER: ${{ secrets.SONATYPE_USER }}
SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }}
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
publish-github:
name: Github Release
needs: [build-artifacts, backend-tests, frontend-tests]
if: "!failure() && !cancelled()"
uses: kestra-io/actions/.github/workflows/kestra-oss-publish-github.yml@main
secrets:
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}

View File

@@ -0,0 +1,16 @@
name: Pull Request - Delete Docker
on:
pull_request:
types: [closed]
# TODO import a reusable one
jobs:
publish:
name: Pull Request - Delete Docker
if: github.repository == 'kestra-io/kestra' # prevent running on forks
runs-on: ubuntu-latest
steps:
- uses: dataaxiom/ghcr-cleanup-action@v1
with:
package: kestra-pr
delete-tags: ${{ github.event.pull_request.number }}

View File

@@ -2,17 +2,12 @@ name: Pull Request Workflow
on:
pull_request:
branches:
- develop
concurrency:
group: ${{ github.workflow }}-${{ github.ref_name }}-pr
cancel-in-progress: true
jobs:
# ********************************************************************************************************************
# File changes detection
# ********************************************************************************************************************
file-changes:
if: ${{ github.event.pull_request.draft == false }}
name: File changes detection
@@ -33,14 +28,11 @@ jobs:
- '!{ui,.github}/**'
token: ${{ secrets.GITHUB_TOKEN }}
# ********************************************************************************************************************
# Tests
# ********************************************************************************************************************
frontend:
name: Frontend - Tests
needs: [file-changes]
if: "needs.file-changes.outputs.ui == 'true'"
uses: ./.github/workflows/workflow-frontend-test.yml
uses: kestra-io/actions/.github/workflows/kestra-oss-frontend-tests.yml@main
secrets:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
@@ -49,7 +41,7 @@ jobs:
name: Backend - Tests
needs: file-changes
if: "needs.file-changes.outputs.backend == 'true'"
uses: ./.github/workflows/workflow-backend-test.yml
uses: kestra-io/actions/.github/workflows/kestra-oss-backend-tests.yml@main
secrets:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
@@ -58,21 +50,8 @@ jobs:
e2e-tests:
name: E2E - Tests
uses: ./.github/workflows/e2e.yml
uses: kestra-io/actions/.github/workflows/kestra-oss-e2e-tests.yml@main
end:
name: End
runs-on: ubuntu-latest
if: always()
needs: [frontend, backend]
steps:
# Slack
- name: Slack notification
uses: Gamesight/slack-workflow-status@master
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
name: GitHub Actions
icon_emoji: ":github-actions:"
channel: "C02DQ1A7JLR"
generate-pull-request-docker-image:
name: Generate PR docker image
uses: kestra-io/actions/.github/workflows/kestra-oss-pullrequest-publish-docker.yml@main

41
.github/workflows/release-docker.yml vendored Normal file
View File

@@ -0,0 +1,41 @@
name: Publish docker
on:
workflow_dispatch:
inputs:
retag-latest:
description: 'Retag latest Docker images'
required: true
type: boolean
default: false
retag-lts:
description: 'Retag LTS Docker images'
required: true
type: boolean
default: false
plugin-version:
description: '(deprecated) Plugin version window for old Kestra releases using .plugins file (0.22 to 0.24). If omitted, then plugin list will be fetched from the API compatible versions endpoint'
required: false
type: string
default: "[0.24,1.0)"
dry-run:
description: 'Dry run mode that will not write or release anything'
required: true
type: boolean
default: false
jobs:
publish-docker:
name: Publish Docker
if: startsWith(github.ref, 'refs/tags/v')
uses: kestra-io/actions/.github/workflows/kestra-oss-publish-docker.yml@main
with:
plugin-version: ${{ inputs.plugin-version }}
retag-latest: ${{ inputs.retag-latest }}
retag-lts: ${{ inputs.retag-lts }}
dry-run: ${{ inputs.dry-run }}
secrets:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}

View File

@@ -1,60 +0,0 @@
name: Set Version and Tag Plugins
on:
workflow_dispatch:
inputs:
releaseVersion:
description: 'The release version (e.g., 0.21.0)'
required: true
type: string
dryRun:
description: 'Use DRY_RUN mode'
required: false
default: 'false'
jobs:
tag:
name: Release plugins
runs-on: ubuntu-latest
steps:
# Checkout
- uses: actions/checkout@v4
with:
fetch-depth: 0
# Get Plugins List
- name: Get Plugins List
uses: ./.github/actions/plugins-list
id: plugins-list
with:
plugin-version: 'LATEST'
- name: 'Configure Git'
run: |
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
git config --global user.name "github-actions[bot]"
# Execute
- name: Set Version and Tag Plugins
if: ${{ github.event.inputs.dryRun == 'false' }}
env:
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
run: |
chmod +x ./dev-tools/setversion-tag-plugins.sh;
./dev-tools/setversion-tag-plugins.sh \
--release-version=${{github.event.inputs.releaseVersion}} \
--yes \
${{ steps.plugins-list.outputs.repositories }}
- name: Set Version and Tag Plugins (DRY_RUN)
if: ${{ github.event.inputs.dryRun == 'true' }}
env:
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
run: |
chmod +x ./dev-tools/setversion-tag-plugins.sh;
./dev-tools/setversion-tag-plugins.sh \
--release-version=${{github.event.inputs.releaseVersion}} \
--dry-run \
--yes \
${{ steps.plugins-list.outputs.repositories }}

View File

@@ -1,57 +0,0 @@
name: Set Version and Tag
run-name: "Set version and Tag Kestra to ${{ github.event.inputs.releaseVersion }} 🚀"
on:
workflow_dispatch:
inputs:
releaseVersion:
description: 'The release version (e.g., 0.21.1)'
required: true
type: string
env:
RELEASE_VERSION: "${{ github.event.inputs.releaseVersion }}"
jobs:
release:
name: Release Kestra
runs-on: ubuntu-latest
if: startsWith(github.ref, 'refs/heads/releases/v')
steps:
# Checks
- name: Check Inputs
run: |
if ! [[ "$RELEASE_VERSION" =~ ^[0-9]+(\.[0-9]+)(\.[0-9]+)(-rc[0-9])?(-SNAPSHOT)?$ ]]; then
echo "Invalid release version. Must match regex: ^[0-9]+(\.[0-9]+)(\.[0-9]+)-(rc[0-9])?(-SNAPSHOT)?$"
exit 1
fi
# Extract the major and minor versions
BASE_VERSION=$(echo "$RELEASE_VERSION" | sed -E 's/^([0-9]+\.[0-9]+)\..*/\1/')
RELEASE_BRANCH="refs/heads/releases/v${BASE_VERSION}.x"
CURRENT_BRANCH="$GITHUB_REF"
if ! [[ "$CURRENT_BRANCH" == "$RELEASE_BRANCH" ]]; then
echo "Invalid release branch. Expected $RELEASE_BRANCH, was $CURRENT_BRANCH"
exit 1
fi
# Checkout
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Configure Git
run: |
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
git config --global user.name "github-actions[bot]"
# Execute
- name: Run Gradle Release
env:
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
run: |
# Update version
sed -i "s/^version=.*/version=$RELEASE_VERSION/" ./gradle.properties
git add ./gradle.properties
git commit -m"chore(version): update to version '$RELEASE_VERSION'"
git push
git tag -a "v$RELEASE_VERSION" -m"v$RELEASE_VERSION"
git push --tags

View File

@@ -17,24 +17,16 @@ jobs:
runs-on: ubuntu-latest
steps:
# Checkout
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v4
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: ./actions/.github/actions/setup-build
- uses: kestra-io/actions/composite/setup-build@main
id: build
with:
java-enabled: true
node-enabled: true
caches-enabled: true
# Npm
- name: Npm - Install
@@ -56,92 +48,3 @@ jobs:
with:
name: dependency-check-report
path: build/reports/dependency-check-report.html
develop-image-check:
name: Image Check (develop)
runs-on: ubuntu-latest
permissions:
contents: read
security-events: write
actions: read
steps:
# Checkout
- uses: actions/checkout@v4
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v4
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: ./actions/.github/actions/setup-build
id: build
with:
java-enabled: false
node-enabled: false
caches-enabled: true
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
- name: Docker Vulnerabilities Check
uses: aquasecurity/trivy-action@0.32.0
with:
image-ref: kestra/kestra:develop
format: 'template'
template: '@/contrib/sarif.tpl'
severity: 'CRITICAL,HIGH'
output: 'trivy-results.sarif'
skip-dirs: /app/plugins
- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: 'trivy-results.sarif'
category: docker-
latest-image-check:
name: Image Check (latest)
runs-on: ubuntu-latest
permissions:
contents: read
security-events: write
actions: read
steps:
# Checkout
- uses: actions/checkout@v4
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v4
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: ./actions/.github/actions/setup-build
id: build
with:
java-enabled: false
node-enabled: false
caches-enabled: true
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
- name: Docker Vulnerabilities Check
uses: aquasecurity/trivy-action@0.32.0
with:
image-ref: kestra/kestra:latest
format: table
skip-dirs: /app/plugins
scanners: vuln
severity: 'CRITICAL,HIGH'
output: 'trivy-results.sarif'
- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: 'trivy-results.sarif'

View File

@@ -1,142 +0,0 @@
name: Backend - Tests
on:
workflow_call:
secrets:
GITHUB_AUTH_TOKEN:
description: "The GitHub Token."
required: true
CODECOV_TOKEN:
description: 'Codecov Token'
required: true
SONAR_TOKEN:
description: 'Sonar Token'
required: true
GOOGLE_SERVICE_ACCOUNT:
description: 'Google Service Account'
required: true
permissions:
contents: write
checks: write
actions: read
jobs:
test:
name: Backend - Tests
runs-on: ubuntu-latest
env:
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
steps:
- uses: actions/checkout@v4
name: Checkout - Current ref
with:
fetch-depth: 0
# Setup build
- uses: kestra-io/actions/.github/actions/setup-build@main
name: Setup - Build
id: build
with:
java-enabled: true
node-enabled: true
python-enabled: true
# Services
- name: Setup - Start docker compose
shell: bash
run: docker compose -f docker-compose-ci.yml up -d
# Gradle check
- name: Gradle - Build
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
env:
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
shell: bash
run: |
echo $GOOGLE_SERVICE_ACCOUNT | base64 -d > ~/.gcp-service-account.json
export GOOGLE_APPLICATION_CREDENTIALS=$HOME/.gcp-service-account.json
./gradlew check javadoc --parallel
# report test
- name: Test - Publish Test Results
uses: dorny/test-reporter@v2
if: always()
with:
name: Java Tests Report
reporter: java-junit
path: '**/build/test-results/test/TEST-*.xml'
list-suites: 'failed'
list-tests: 'failed'
fail-on-error: 'false'
token: ${{ secrets.GITHUB_AUTH_TOKEN }}
# Sonar
- name: Test - Analyze with Sonar
if: env.SONAR_TOKEN != ''
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_AUTH_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
shell: bash
run: ./gradlew sonar --info
# GCP
- name: GCP - Auth with unit test account
id: auth
if: always() && env.GOOGLE_SERVICE_ACCOUNT != ''
continue-on-error: true
uses: "google-github-actions/auth@v2"
with:
credentials_json: "${{ secrets.GOOGLE_SERVICE_ACCOUNT }}"
- name: GCP - Setup Cloud SDK
if: env.GOOGLE_SERVICE_ACCOUNT != ''
uses: "google-github-actions/setup-gcloud@v2"
# Allure check
- uses: rlespinasse/github-slug-action@v5
name: Allure - Generate slug variables
- name: Allure - Publish report
uses: andrcuns/allure-publish-action@v2.9.0
if: always() && env.GOOGLE_SERVICE_ACCOUNT != ''
continue-on-error: true
env:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_AUTH_TOKEN }}
JAVA_HOME: /usr/lib/jvm/default-jvm/
with:
storageType: gcs
resultsGlob: "**/build/allure-results"
bucket: internal-kestra-host
baseUrl: "https://internal.dev.kestra.io"
prefix: ${{ format('{0}/{1}', github.repository, 'allure/java') }}
copyLatest: true
ignoreMissingResults: true
# Jacoco
- name: Jacoco - Copy reports
if: env.GOOGLE_SERVICE_ACCOUNT != ''
continue-on-error: true
shell: bash
run: |
mv build/reports/jacoco/testCodeCoverageReport build/reports/jacoco/test/
mv build/reports/jacoco/test/testCodeCoverageReport.xml build/reports/jacoco/test/jacocoTestReport.xml
gsutil -m rsync -d -r build/reports/jacoco/test/ gs://internal-kestra-host/${{ format('{0}/{1}', github.repository, 'jacoco') }}
# Codecov
- name: Codecov - Upload coverage reports
uses: codecov/codecov-action@v5
if: ${{ !cancelled() }}
continue-on-error: true
with:
token: ${{ secrets.CODECOV_TOKEN }}
flags: backend
- name: Codecov - Upload test results
uses: codecov/test-results-action@v1
if: ${{ !cancelled() }}
continue-on-error: true
with:
token: ${{ secrets.CODECOV_TOKEN }}
flags: backend

View File

@@ -1,152 +0,0 @@
name: Build Artifacts
on:
workflow_call:
inputs:
plugin-version:
description: "Kestra version"
default: 'LATEST'
required: true
type: string
outputs:
docker-tag:
value: ${{ jobs.build.outputs.docker-tag }}
description: "The Docker image Tag for Kestra"
docker-artifact-name:
value: ${{ jobs.build.outputs.docker-artifact-name }}
description: "The GitHub artifact containing the Kestra docker image name."
plugins:
value: ${{ jobs.build.outputs.plugins }}
description: "The Kestra plugins list used for the build."
jobs:
build:
name: Build - Artifacts
runs-on: ubuntu-latest
outputs:
docker-tag: ${{ steps.vars.outputs.tag }}
docker-artifact-name: ${{ steps.vars.outputs.artifact }}
plugins: ${{ steps.plugins.outputs.plugins }}
env:
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
steps:
- name: Checkout - Current ref
uses: actions/checkout@v4
with:
fetch-depth: 0
# Npm
- name: Setup - Npm install
shell: bash
working-directory: ui
run: npm ci
# Setup build
- uses: kestra-io/actions/.github/actions/setup-build@main
name: Setup - Build
id: build
with:
java-enabled: true
node-enabled: true
# Get Plugins List
- name: Plugins - Get List
uses: ./.github/actions/plugins-list
if: "!startsWith(github.ref, 'refs/tags/v')"
id: plugins-list
with:
plugin-version: ${{ env.PLUGIN_VERSION }}
# Set Plugins List
- name: Plugins - Set List
id: plugins
if: "!startsWith(github.ref, 'refs/tags/v')"
shell: bash
run: |
PLUGINS="${{ steps.plugins-list.outputs.plugins }}"
TAG=${GITHUB_REF#refs/*/}
if [[ $TAG = "master" || $TAG == v* ]]; then
echo "plugins=$PLUGINS" >> $GITHUB_OUTPUT
else
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ $PLUGINS" >> $GITHUB_OUTPUT
fi
# Build
- name: Gradle - Build
shell: bash
run: |
./gradlew executableJar
- name: Artifacts - Copy exe to image
shell: bash
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
# Docker Tag
- name: Setup - Docker vars
id: vars
shell: bash
run: |
TAG=${GITHUB_REF#refs/*/}
if [[ $TAG = "master" ]]
then
TAG="latest";
elif [[ $TAG = "develop" ]]
then
TAG="develop";
elif [[ $TAG = v* ]]
then
TAG="${TAG}";
else
TAG="build-${{ github.run_id }}";
fi
echo "tag=${TAG}" >> $GITHUB_OUTPUT
echo "artifact=docker-kestra-${TAG}" >> $GITHUB_OUTPUT
# Docker setup
- name: Docker - Setup QEMU
uses: docker/setup-qemu-action@v3
- name: Docker - Fix Qemu
shell: bash
run: |
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
- name: Docker - Setup Buildx
uses: docker/setup-buildx-action@v3
# Docker Build
- name: Docker - Build & export image
uses: docker/build-push-action@v6
if: "!startsWith(github.ref, 'refs/tags/v')"
with:
context: .
push: false
file: Dockerfile
tags: |
kestra/kestra:${{ steps.vars.outputs.tag }}
build-args: |
KESTRA_PLUGINS=${{ steps.plugins.outputs.plugins }}
APT_PACKAGES=${{ env.DOCKER_APT_PACKAGES }}
PYTHON_LIBRARIES=${{ env.DOCKER_PYTHON_LIBRARIES }}
outputs: type=docker,dest=/tmp/${{ steps.vars.outputs.artifact }}.tar
# Upload artifacts
- name: Artifacts - Upload JAR
uses: actions/upload-artifact@v4
with:
name: jar
path: build/libs/
- name: Artifacts - Upload Executable
uses: actions/upload-artifact@v4
with:
name: exe
path: build/executable/
- name: Artifacts - Upload Docker
uses: actions/upload-artifact@v4
if: "!startsWith(github.ref, 'refs/tags/v')"
with:
name: ${{ steps.vars.outputs.artifact }}
path: /tmp/${{ steps.vars.outputs.artifact }}.tar

View File

@@ -1,70 +0,0 @@
name: Frontend - Tests
on:
workflow_call:
secrets:
GITHUB_AUTH_TOKEN:
description: "The GitHub Token."
required: true
CODECOV_TOKEN:
description: 'Codecov Token'
required: true
env:
# to save corepack from itself
COREPACK_INTEGRITY_KEYS: 0
jobs:
test:
name: Frontend - Tests
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Cache Node Modules
id: cache-node-modules
uses: actions/cache@v4
with:
path: |
ui/node_modules
key: modules-${{ hashFiles('ui/package-lock.json') }}
- name: Cache Playwright Binaries
id: cache-playwright
uses: actions/cache@v4
with:
path: |
~/.cache/ms-playwright
key: playwright-${{ hashFiles('ui/package-lock.json') }}
- name: Npm - install
if: steps.cache-node-modules.outputs.cache-hit != 'true'
working-directory: ui
run: npm ci
- name: Npm - lint
uses: reviewdog/action-eslint@v1
with:
github_token: ${{ secrets.GITHUB_AUTH_TOKEN }}
reporter: github-pr-review
workdir: ui
- name: Npm - Run build
working-directory: ui
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
run: npm run build
- name: Run front-end unit tests
working-directory: ui
run: npm run test:unit -- --coverage
- name: Storybook - Install Playwright
working-directory: ui
if: steps.cache-playwright.outputs.cache-hit != 'true'
run: npx playwright install --with-deps
- name: Run storybook component tests
working-directory: ui
run: npm run test:storybook -- --coverage

View File

@@ -1,78 +0,0 @@
name: Github - Release
on:
workflow_call:
secrets:
GH_PERSONAL_TOKEN:
description: "The Github personal token."
required: true
push:
tags:
- '*'
jobs:
publish:
name: Github - Release
runs-on: ubuntu-latest
steps:
# Check out
- name: Checkout - Repository
uses: actions/checkout@v4
with:
fetch-depth: 0
submodules: true
# Checkout GitHub Actions
- name: Checkout - Actions
uses: actions/checkout@v4
with:
repository: kestra-io/actions
sparse-checkout-cone-mode: true
path: actions
sparse-checkout: |
.github/actions
# Download Exec
# Must be done after checkout actions
- name: Artifacts - Download executable
uses: actions/download-artifact@v4
if: startsWith(github.ref, 'refs/tags/v')
with:
name: exe
path: build/executable
- name: Check if current tag is latest
id: is_latest
run: |
latest_tag=$(git tag | grep -E '^v[0-9]+\.[0-9]+\.[0-9]+$' | sed 's/^v//' | sort -V | tail -n1)
current_tag="${GITHUB_REF_NAME#v}"
if [ "$current_tag" = "$latest_tag" ]; then
echo "latest=true" >> $GITHUB_OUTPUT
else
echo "latest=false" >> $GITHUB_OUTPUT
fi
env:
GITHUB_REF_NAME: ${{ github.ref_name }}
# GitHub Release
- name: Create GitHub release
uses: ./actions/.github/actions/github-release
if: ${{ startsWith(github.ref, 'refs/tags/v') }}
env:
MAKE_LATEST: ${{ steps.is_latest.outputs.latest }}
GITHUB_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
# Trigger gha workflow to bump helm chart version
- name: GitHub - Trigger the Helm chart version bump
uses: peter-evans/repository-dispatch@v3
with:
token: ${{ secrets.GH_PERSONAL_TOKEN }}
repository: kestra-io/helm-charts
event-type: update-helm-chart-version
client-payload: |-
{
"new_version": "${{ github.ref_name }}",
"github_repository": "${{ github.repository }}",
"github_actor": "${{ github.actor }}"
}

View File

@@ -1,146 +0,0 @@
name: Publish - Docker
on:
workflow_dispatch:
inputs:
plugin-version:
description: "Kestra version"
default: 'LATEST'
required: false
type: string
force-download-artifact:
description: 'Force download artifact'
required: false
type: string
default: "true"
workflow_call:
inputs:
plugin-version:
description: "Kestra version"
default: 'LATEST'
required: false
type: string
force-download-artifact:
description: 'Force download artifact'
required: false
type: string
default: "true"
secrets:
DOCKERHUB_USERNAME:
description: "The Dockerhub username."
required: true
DOCKERHUB_PASSWORD:
description: "The Dockerhub password."
required: true
jobs:
# ********************************************************************************************************************
# Build
# ********************************************************************************************************************
build-artifacts:
name: Build Artifacts
if: ${{ github.event.inputs.force-download-artifact == 'true' }}
uses: ./.github/workflows/workflow-build-artifacts.yml
with:
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
# ********************************************************************************************************************
# Docker
# ********************************************************************************************************************
publish:
name: Publish - Docker
runs-on: ubuntu-latest
needs: build-artifacts
if: |
always() &&
(needs.build-artifacts.result == 'success' ||
github.event.inputs.force-download-artifact != 'true')
env:
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
strategy:
matrix:
image:
- tag: -no-plugins
packages: jattach
plugins: false
python-libraries: ""
- tag: ""
plugins: true
packages: python3 python3-venv python-is-python3 python3-pip nodejs npm curl zip unzip jattach
python-libraries: kestra
steps:
- name: Checkout - Current ref
uses: actions/checkout@v4
# Docker setup
- name: Docker - Setup QEMU
uses: docker/setup-qemu-action@v3
- name: Docker - Fix Qemu
shell: bash
run: |
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
- name: Docker - Setup Docker Buildx
uses: docker/setup-buildx-action@v3
# Docker Login
- name: Docker - Login to DockerHub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_PASSWORD }}
# # Get Plugins List
- name: Plugins - Get List
uses: ./.github/actions/plugins-list
id: plugins-list
if: ${{ matrix.image.plugins}}
with:
plugin-version: ${{ env.PLUGIN_VERSION }}
# Vars
- name: Docker - Set variables
shell: bash
id: vars
run: |
TAG=${GITHUB_REF#refs/*/}
PLUGINS="${{ matrix.image.plugins == true && steps.plugins-list.outputs.plugins || '' }}"
if [[ $TAG == v* ]]; then
TAG="${TAG}";
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
elif [[ $TAG = "develop" ]]; then
TAG="develop";
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ $PLUGINS" >> $GITHUB_OUTPUT
else
TAG="build-${{ github.run_id }}";
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ $PLUGINS" >> $GITHUB_OUTPUT
fi
echo "tag=${TAG}${{ matrix.image.tag }}" >> $GITHUB_OUTPUT
# Build Docker Image
- name: Artifacts - Download executable
uses: actions/download-artifact@v4
with:
name: exe
path: build/executable
- name: Docker - Copy exe to image
shell: bash
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
# Docker Build and push
- name: Docker - Build image
uses: docker/build-push-action@v6
with:
context: .
push: true
tags: kestra/kestra:${{ steps.vars.outputs.tag }}
platforms: linux/amd64,linux/arm64
build-args: |
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }}
APT_PACKAGES=${{ matrix.image.packages }}
PYTHON_LIBRARIES=${{ matrix.image.python-libraries }}

View File

@@ -1,57 +0,0 @@
name: Publish - Maven
on:
workflow_call:
secrets:
SONATYPE_USER:
description: "The Sonatype username."
required: true
SONATYPE_PASSWORD:
description: "The Sonatype password."
required: true
SONATYPE_GPG_KEYID:
description: "The Sonatype GPG key id."
required: true
SONATYPE_GPG_PASSWORD:
description: "The Sonatype GPG password."
required: true
SONATYPE_GPG_FILE:
description: "The Sonatype GPG file."
required: true
jobs:
publish:
name: Publish - Maven
runs-on: ubuntu-latest
steps:
- name: Checkout - Current ref
uses: actions/checkout@v4
# Setup build
- name: Setup - Build
uses: kestra-io/actions/.github/actions/setup-build@main
id: build
with:
java-enabled: true
node-enabled: true
# Publish
- name: Publish - Release package to Maven Central
shell: bash
env:
ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.SONATYPE_USER }}
ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }}
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE}}
run: |
mkdir -p ~/.gradle/
echo "signing.keyId=${SONATYPE_GPG_KEYID}" > ~/.gradle/gradle.properties
echo "signing.password=${SONATYPE_GPG_PASSWORD}" >> ~/.gradle/gradle.properties
echo "signing.secretKeyRingFile=${HOME}/.gradle/secring.gpg" >> ~/.gradle/gradle.properties
echo ${SONATYPE_GPG_FILE} | base64 -d > ~/.gradle/secring.gpg
./gradlew publishToMavenCentral
# Gradle dependency
- name: Java - Gradle dependency graph
uses: gradle/actions/dependency-submission@v4

View File

@@ -1,80 +0,0 @@
name: Release
on:
workflow_dispatch:
inputs:
plugin-version:
description: "Kestra version"
default: 'LATEST'
required: false
type: string
publish-docker:
description: "Publish Docker image"
default: 'false'
required: false
type: string
workflow_call:
inputs:
plugin-version:
description: "Kestra version"
default: 'LATEST'
required: false
type: string
secrets:
DOCKERHUB_USERNAME:
description: "The Dockerhub username."
required: true
DOCKERHUB_PASSWORD:
description: "The Dockerhub password."
required: true
SONATYPE_USER:
description: "The Sonatype username."
required: true
SONATYPE_PASSWORD:
description: "The Sonatype password."
required: true
SONATYPE_GPG_KEYID:
description: "The Sonatype GPG key id."
required: true
SONATYPE_GPG_PASSWORD:
description: "The Sonatype GPG password."
required: true
SONATYPE_GPG_FILE:
description: "The Sonatype GPG file."
required: true
jobs:
build-artifacts:
name: Build - Artifacts
uses: ./.github/workflows/workflow-build-artifacts.yml
with:
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
Docker:
name: Publish Docker
needs: build-artifacts
uses: ./.github/workflows/workflow-publish-docker.yml
if: startsWith(github.ref, 'refs/heads/develop') || github.event.inputs.publish-docker == 'true'
with:
force-download-artifact: 'false'
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
secrets:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
Maven:
name: Publish Maven
uses: ./.github/workflows/workflow-publish-maven.yml
secrets:
SONATYPE_USER: ${{ secrets.SONATYPE_USER }}
SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }}
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
Github:
name: Github Release
needs: build-artifacts
if: startsWith(github.ref, 'refs/tags/v')
uses: ./.github/workflows/workflow-github-release.yml
secrets:
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}

View File

@@ -1,97 +0,0 @@
name: Tests
on:
schedule:
- cron: '0 4 * * 1,2,3,4,5'
workflow_call:
inputs:
report-status:
description: "Report status of the jobs in outputs"
type: string
required: false
default: false
outputs:
frontend_status:
description: "Status of the frontend job"
value: ${{ jobs.set-frontend-status.outputs.frontend_status }}
backend_status:
description: "Status of the backend job"
value: ${{ jobs.set-backend-status.outputs.backend_status }}
jobs:
file-changes:
name: File changes detection
runs-on: ubuntu-latest
timeout-minutes: 60
outputs:
ui: ${{ steps.changes.outputs.ui }}
backend: ${{ steps.changes.outputs.backend }}
steps:
- uses: actions/checkout@v4
if: "!startsWith(github.ref, 'refs/tags/v')"
- uses: dorny/paths-filter@v3
if: "!startsWith(github.ref, 'refs/tags/v')"
id: changes
with:
filters: |
ui:
- 'ui/**'
backend:
- '!{ui,.github}/**'
token: ${{ secrets.GITHUB_TOKEN }}
frontend:
name: Frontend - Tests
needs: file-changes
if: "needs.file-changes.outputs.ui == 'true' || startsWith(github.ref, 'refs/tags/v')"
uses: ./.github/workflows/workflow-frontend-test.yml
secrets:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
backend:
name: Backend - Tests
needs: file-changes
if: "needs.file-changes.outputs.backend == 'true' || startsWith(github.ref, 'refs/tags/v')"
uses: ./.github/workflows/workflow-backend-test.yml
secrets:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
# Output every job status
# To be used in other workflows
report-status:
name: Report Status
runs-on: ubuntu-latest
needs: [ frontend, backend ]
if: always() && (inputs.report-status == 'true')
outputs:
frontend_status: ${{ steps.set-frontend-status.outputs.frontend_status }}
backend_status: ${{ steps.set-backend-status.outputs.backend_status }}
steps:
- id: set-frontend-status
name: Set frontend job status
run: echo "::set-output name=frontend_status::${{ needs.frontend.result }}"
- id: set-backend-status
name: Set backend job status
run: echo "::set-output name=backend_status::${{ needs.backend.result }}"
notify:
name: Notify - Slack
runs-on: ubuntu-latest
needs: [ frontend, backend ]
if: github.event_name == 'schedule'
steps:
- name: Notify failed CI
id: send-ci-failed
if: |
always() && (needs.frontend.result != 'success' ||
needs.backend.result != 'success')
uses: kestra-io/actions/.github/actions/send-ci-failed@main
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}

View File

@@ -87,13 +87,18 @@
#plugin-powerbi:io.kestra.plugin:plugin-powerbi:LATEST
#plugin-pulsar:io.kestra.plugin:plugin-pulsar:LATEST
#plugin-redis:io.kestra.plugin:plugin-redis:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-bun:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-deno:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-go:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-groovy:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-jbang:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-julia:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-jython:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-lua:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-nashorn:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-node:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-perl:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-php:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-powershell:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-python:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-r:LATEST

8
Dockerfile.pr Normal file
View File

@@ -0,0 +1,8 @@
ARG KESTRA_DOCKER_BASE_VERSION=develop
FROM kestra/kestra:$KESTRA_DOCKER_BASE_VERSION
USER root
COPY --chown=kestra:kestra docker /
USER kestra

View File

@@ -205,23 +205,59 @@ subprojects {
testImplementation 'org.assertj:assertj-core'
}
test {
useJUnitPlatform()
def commonTestConfig = { Test t ->
// set Xmx for test workers
maxHeapSize = '4g'
t.maxHeapSize = '4g'
// configure en_US default locale for tests
systemProperty 'user.language', 'en'
systemProperty 'user.country', 'US'
t.systemProperty 'user.language', 'en'
t.systemProperty 'user.country', 'US'
environment 'SECRET_MY_SECRET', "{\"secretKey\":\"secretValue\"}".bytes.encodeBase64().toString()
environment 'SECRET_NEW_LINE', "cGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2\nZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZl\neXJsb25n"
environment 'SECRET_WEBHOOK_KEY', "secretKey".bytes.encodeBase64().toString()
environment 'SECRET_NON_B64_SECRET', "some secret value"
environment 'SECRET_PASSWORD', "cGFzc3dvcmQ="
environment 'ENV_TEST1', "true"
environment 'ENV_TEST2', "Pass by env"
t.environment 'SECRET_MY_SECRET', "{\"secretKey\":\"secretValue\"}".bytes.encodeBase64().toString()
t.environment 'SECRET_NEW_LINE', "cGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2\nZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZl\neXJsb25n"
t.environment 'SECRET_WEBHOOK_KEY', "secretKey".bytes.encodeBase64().toString()
t.environment 'SECRET_NON_B64_SECRET', "some secret value"
t.environment 'SECRET_PASSWORD', "cGFzc3dvcmQ="
t.environment 'ENV_TEST1', "true"
t.environment 'ENV_TEST2', "Pass by env"
}
tasks.register('flakyTest', Test) { Test t ->
group = 'verification'
description = 'Runs tests tagged @Flaky but does not fail the build.'
useJUnitPlatform {
includeTags 'flaky'
}
ignoreFailures = true
reports {
junitXml.required = true
junitXml.outputPerTestCase = true
junitXml.mergeReruns = true
junitXml.includeSystemErrLog = true
junitXml.outputLocation = layout.buildDirectory.dir("test-results/flakyTest")
}
commonTestConfig(t)
}
test {
useJUnitPlatform {
excludeTags 'flaky'
}
reports {
junitXml.required = true
junitXml.outputPerTestCase = true
junitXml.mergeReruns = true
junitXml.includeSystemErrLog = true
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
}
commonTestConfig(it)
finalizedBy(tasks.named('flakyTest'))
}
testlogger {

View File

@@ -4,10 +4,13 @@ import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.ServerType;
import io.kestra.core.runners.IndexerInterface;
import io.kestra.core.utils.Await;
import io.kestra.core.services.SkipExecutionService;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import picocli.CommandLine;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@CommandLine.Command(
@@ -17,6 +20,11 @@ import java.util.Map;
public class IndexerCommand extends AbstractServerCommand {
@Inject
private ApplicationContext applicationContext;
@Inject
private SkipExecutionService skipExecutionService;
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting purpose only")
private List<String> skipIndexerRecords = Collections.emptyList();
@SuppressWarnings("unused")
public static Map<String, Object> propertiesOverrides() {
@@ -27,6 +35,8 @@ public class IndexerCommand extends AbstractServerCommand {
@Override
public Integer call() throws Exception {
this.skipExecutionService.setSkipIndexerRecords(skipIndexerRecords);
super.call();
IndexerInterface indexer = applicationContext.getBean(IndexerInterface.class);

View File

@@ -63,6 +63,9 @@ public class StandAloneCommand extends AbstractServerCommand {
@CommandLine.Option(names = {"--skip-tenants"}, split=",", description = "a list of tenants to skip, separated by a coma; for troubleshooting purpose only")
private List<String> skipTenants = Collections.emptyList();
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting purpose only")
private List<String> skipIndexerRecords = Collections.emptyList();
@CommandLine.Option(names = {"--no-tutorials"}, description = "Flag to disable auto-loading of tutorial flows.")
boolean tutorialsDisabled = false;
@@ -93,6 +96,7 @@ public class StandAloneCommand extends AbstractServerCommand {
this.skipExecutionService.setSkipFlows(skipFlows);
this.skipExecutionService.setSkipNamespaces(skipNamespaces);
this.skipExecutionService.setSkipTenants(skipTenants);
this.skipExecutionService.setSkipIndexerRecords(skipIndexerRecords);
this.startExecutorService.applyOptions(startExecutors, notStartExecutors);
KestraContext.getContext().injectWorkerConfigs(workerThread, null);

View File

@@ -5,12 +5,15 @@ import io.kestra.core.models.ServerType;
import io.kestra.core.runners.IndexerInterface;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.core.services.SkipExecutionService;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import picocli.CommandLine.Option;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -28,11 +31,17 @@ public class WebServerCommand extends AbstractServerCommand {
@Inject
private ExecutorsUtils executorsUtils;
@Inject
private SkipExecutionService skipExecutionService;
@Option(names = {"--no-tutorials"}, description = "Flag to disable auto-loading of tutorial flows.")
boolean tutorialsDisabled = false;
private boolean tutorialsDisabled = false;
@Option(names = {"--no-indexer"}, description = "Flag to disable starting an embedded indexer.")
boolean indexerDisabled = false;
private boolean indexerDisabled = false;
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting purpose only")
private List<String> skipIndexerRecords = Collections.emptyList();
@Override
public boolean isFlowAutoLoadEnabled() {
@@ -48,6 +57,8 @@ public class WebServerCommand extends AbstractServerCommand {
@Override
public Integer call() throws Exception {
this.skipExecutionService.setSkipIndexerRecords(skipIndexerRecords);
super.call();
// start the indexer

View File

@@ -18,6 +18,10 @@ micronaut:
root:
paths: classpath:root
mapping: /**
codec:
json:
additional-types:
- application/scim+json
server:
max-request-size: 10GB
multipart:
@@ -26,15 +30,15 @@ micronaut:
read-idle-timeout: 60m
write-idle-timeout: 60m
idle-timeout: 60m
netty:
max-zstd-encode-size: 67108864 # increased to 64MB from the default of 32MB
max-chunk-size: 10MB
max-header-size: 32768 # increased from the default of 8k
responses:
file:
cache-seconds: 86400
cache-control:
public: true
netty:
max-zstd-encode-size: 67108864 # increased to 64MB from the default of 32MB
max-chunk-size: 10MB
max-header-size: 32768 # increased from the default of 8k
# Access log configuration, see https://docs.micronaut.io/latest/guide/index.html#accessLogger
access-logger:
@@ -78,8 +82,19 @@ micronaut:
type: scheduled
core-pool-size: 1
# Disable OpenTelemetry metrics by default, users that need it must enable it and configure the collector URL.
metrics:
binders:
retry:
enabled: true
netty:
queues:
enabled: true
bytebuf-allocators:
enabled: true
channels:
enabled: true
# Disable OpenTelemetry metrics by default, users that need it must enable it and configure the collector URL.
export:
otlp:
enabled: false
@@ -92,6 +107,8 @@ jackson:
serialization-inclusion: non_null
deserialization:
FAIL_ON_UNKNOWN_PROPERTIES: false
mapper:
ACCEPT_CASE_INSENSITIVE_ENUMS: true
endpoints:
all:
@@ -100,6 +117,10 @@ endpoints:
sensitive: false
health:
details-visible: ANONYMOUS
disk-space:
enabled: false
discovery-client:
enabled: false
loggers:
write-sensitive: false
env:
@@ -133,12 +154,46 @@ kestra:
tutorial-flows:
# Automatically loads all tutorial flows at startup.
enabled: true
retries:
attempts: 5
multiplier: 2.0
delay: 1s
maxDelay: ""
server:
basic-auth:
# These URLs will not be authenticated, by default we open some of the Micronaut default endpoints but not all for security reasons
open-urls:
- "/ping"
- "/api/v1/executions/webhook/"
- "/api/v1/main/executions/webhook/"
- "/api/v1/*/executions/webhook/"
preview:
initial-rows: 100
max-rows: 5000
# The expected time for this server to complete all its tasks before initiating a graceful shutdown.
terminationGracePeriod: 5m
workerTaskRestartStrategy: AFTER_TERMINATION_GRACE_PERIOD
# Configuration for Liveness and Heartbeat mechanism between servers.
liveness:
enabled: true
# The expected time between liveness probe.
interval: 10s
# The timeout used to detect service failures.
timeout: 1m
# The time to wait before executing a liveness probe.
initialDelay: 1m
# The expected time between service heartbeats.
heartbeatInterval: 3s
service:
purge:
initial-delay: 1h
fixed-delay: 1d
retention: 30d
jdbc:
queues:
min-poll-interval: 25ms
@@ -150,7 +205,7 @@ kestra:
fixed-delay: 1h
retention: 7d
types:
- type : io.kestra.core.models.executions.LogEntry
- type: io.kestra.core.models.executions.LogEntry
retention: 1h
- type: io.kestra.core.models.executions.MetricEntry
retention: 1h
@@ -182,37 +237,12 @@ kestra:
traces:
root: DISABLED
server:
basic-auth:
# These URLs will not be authenticated, by default we open some of the Micronaut default endpoints but not all for security reasons
open-urls:
- "/ping"
- "/api/v1/executions/webhook/"
preview:
initial-rows: 100
max-rows: 5000
# The expected time for this server to complete all its tasks before initiating a graceful shutdown.
terminationGracePeriod: 5m
workerTaskRestartStrategy: AFTER_TERMINATION_GRACE_PERIOD
# Configuration for Liveness and Heartbeat mechanism between servers.
liveness:
enabled: true
# The expected time between liveness probe.
interval: 10s
# The timeout used to detect service failures.
timeout: 1m
# The time to wait before executing a liveness probe.
initialDelay: 1m
# The expected time between service heartbeats.
heartbeatInterval: 3s
service:
purge:
initial-delay: 1h
fixed-delay: 1d
retention: 30d
ui-anonymous-usage-report:
enabled: true
anonymous-usage-report:
enabled: true
uri: https://api.kestra.io/v1/reports/usages
uri: https://api.kestra.io/v1/reports/server-events
initial-delay: 5m
fixed-delay: 1h

View File

@@ -63,6 +63,10 @@ dependencies {
exclude group: 'com.fasterxml.jackson.core'
}
// micrometer
implementation "io.micronaut.micrometer:micronaut-micrometer-observation"
implementation 'io.micrometer:micrometer-java21'
// test
testAnnotationProcessor project(':processor')
testImplementation project(':tests')

View File

@@ -122,12 +122,13 @@ public class JsonSchemaGenerator {
if (jsonNode instanceof ObjectNode clazzSchema && clazzSchema.get("required") instanceof ArrayNode requiredPropsNode && clazzSchema.get("properties") instanceof ObjectNode properties) {
List<String> requiredFieldValues = StreamSupport.stream(requiredPropsNode.spliterator(), false)
.map(JsonNode::asText)
.toList();
.collect(Collectors.toList());
properties.fields().forEachRemaining(e -> {
int indexInRequiredArray = requiredFieldValues.indexOf(e.getKey());
if (indexInRequiredArray != -1 && e.getValue() instanceof ObjectNode valueNode && valueNode.has("default")) {
requiredPropsNode.remove(indexInRequiredArray);
requiredFieldValues.remove(indexInRequiredArray);
}
});

View File

@@ -6,8 +6,14 @@ import io.kestra.core.http.HttpRequest;
import io.kestra.core.http.HttpResponse;
import io.kestra.core.http.client.apache.*;
import io.kestra.core.http.client.configurations.HttpConfiguration;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.micrometer.common.KeyValues;
import io.micrometer.core.instrument.binder.httpcomponents.hc5.ApacheHttpClientContext;
import io.micrometer.core.instrument.binder.httpcomponents.hc5.DefaultApacheHttpClientObservationConvention;
import io.micrometer.core.instrument.binder.httpcomponents.hc5.ObservationExecChainHandler;
import io.micrometer.observation.ObservationRegistry;
import io.micronaut.http.MediaType;
import jakarta.annotation.Nullable;
import lombok.Builder;
@@ -16,6 +22,7 @@ import org.apache.commons.lang3.ArrayUtils;
import org.apache.hc.client5.http.ContextBuilder;
import org.apache.hc.client5.http.auth.*;
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.impl.ChainElement;
import org.apache.hc.client5.http.impl.DefaultAuthenticationStrategy;
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
@@ -50,11 +57,16 @@ public class HttpClient implements Closeable {
private transient CloseableHttpClient client;
private final RunContext runContext;
private final HttpConfiguration configuration;
private ObservationRegistry observationRegistry;
@Builder
public HttpClient(RunContext runContext, @Nullable HttpConfiguration configuration) throws IllegalVariableEvaluationException {
this.runContext = runContext;
this.configuration = configuration == null ? HttpConfiguration.builder().build() : configuration;
if (runContext instanceof DefaultRunContext defaultRunContext) {
this.observationRegistry = defaultRunContext.getApplicationContext().findBean(ObservationRegistry.class).orElse(null);
}
this.client = this.createClient();
}
@@ -67,6 +79,13 @@ public class HttpClient implements Closeable {
.disableDefaultUserAgent()
.setUserAgent("Kestra");
if (observationRegistry != null) {
// micrometer, must be placed before the retry strategy (see https://docs.micrometer.io/micrometer/reference/reference/httpcomponents.html#_retry_strategy_considerations)
builder.addExecInterceptorAfter(ChainElement.RETRY.name(), "micrometer",
new ObservationExecChainHandler(observationRegistry, new CustomApacheHttpClientObservationConvention())
);
}
// logger
if (this.configuration.getLogs() != null && this.configuration.getLogs().length > 0) {
if (ArrayUtils.contains(this.configuration.getLogs(), HttpConfiguration.LoggingType.REQUEST_HEADERS) ||
@@ -297,4 +316,14 @@ public class HttpClient implements Closeable {
this.client.close();
}
}
public static class CustomApacheHttpClientObservationConvention extends DefaultApacheHttpClientObservationConvention {
@Override
public KeyValues getLowCardinalityKeyValues(ApacheHttpClientContext context) {
return KeyValues.concat(
super.getLowCardinalityKeyValues(context),
KeyValues.of("type", "core-client")
);
}
}
}

View File

@@ -0,0 +1,34 @@
package io.kestra.core.metrics;
import io.micrometer.core.instrument.binder.jvm.JvmThreadDeadlockMetrics;
import io.micrometer.java21.instrument.binder.jdk.VirtualThreadMetrics;
import io.micronaut.configuration.metrics.annotation.RequiresMetrics;
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Primary;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Singleton;
import static io.micronaut.configuration.metrics.micrometer.MeterRegistryFactory.MICRONAUT_METRICS_BINDERS;
import static io.micronaut.core.util.StringUtils.FALSE;
@Factory
@RequiresMetrics
public class MeterRegistryBinderFactory {
@Bean
@Primary
@Singleton
@Requires(property = MICRONAUT_METRICS_BINDERS + ".jvm.enabled", notEquals = FALSE)
public VirtualThreadMetrics virtualThreadMetrics() {
return new VirtualThreadMetrics();
}
@Bean
@Primary
@Singleton
@Requires(property = MICRONAUT_METRICS_BINDERS + ".jvm.enabled", notEquals = FALSE)
public JvmThreadDeadlockMetrics threadDeadlockMetricsMetrics() {
return new JvmThreadDeadlockMetrics();
}
}

View File

@@ -1,13 +1,15 @@
package io.kestra.core.models;
import io.kestra.core.utils.MapUtils;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.NotEmpty;
import java.util.*;
import java.util.stream.Collectors;
public record Label(@NotNull String key, @NotNull String value) {
@Schema(description = "A key/value pair that can be attached to a Flow or Execution. Labels are often used to organize and categorize objects.")
public record Label(@NotEmpty String key, @NotEmpty String value) {
public static final String SYSTEM_PREFIX = "system.";
// system labels

View File

@@ -272,7 +272,7 @@ public class Execution implements DeletedInterface, TenantInterface {
}
public Execution withTaskRun(TaskRun taskRun) throws InternalException {
ArrayList<TaskRun> newTaskRunList = new ArrayList<>(this.taskRunList);
ArrayList<TaskRun> newTaskRunList = this.taskRunList == null ? new ArrayList<>() : new ArrayList<>(this.taskRunList);
boolean b = Collections.replaceAll(
newTaskRunList,
@@ -1040,6 +1040,16 @@ public class Execution implements DeletedInterface, TenantInterface {
return result;
}
/**
* Find all children of this {@link TaskRun}.
*/
public List<TaskRun> findChildren(TaskRun parentTaskRun) {
return taskRunList.stream()
.filter(taskRun -> parentTaskRun.getId().equals(taskRun.getParentTaskRunId()))
.toList();
}
public List<String> findParentsValues(TaskRun taskRun, boolean withCurrent) {
return (withCurrent ?
Stream.concat(findParents(taskRun).stream(), Stream.of(taskRun)) :

View File

@@ -3,7 +3,6 @@ package io.kestra.core.models.flows;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import io.kestra.core.models.Label;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.WorkerGroup;
import io.kestra.core.serializers.ListOrMapOfLabelDeserializer;
import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
@@ -59,7 +58,14 @@ public abstract class AbstractFlow implements FlowInterface {
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
@JsonDeserialize(using = ListOrMapOfLabelDeserializer.class)
@Schema(implementation = Object.class, oneOf = {List.class, Map.class})
@Schema(
description = "Labels as a list of Label (key/value pairs) or as a map of string to string.",
oneOf = {
Label[].class,
Map.class
}
)
@Valid
List<Label> labels;
@Schema(additionalProperties = Schema.AdditionalPropertiesValue.TRUE)
@@ -67,4 +73,5 @@ public abstract class AbstractFlow implements FlowInterface {
@Valid
private WorkerGroup workerGroup;
}

View File

@@ -116,7 +116,7 @@ public class State {
}
public Instant maxDate() {
if (this.histories.size() == 0) {
if (this.histories.isEmpty()) {
return Instant.now();
}
@@ -124,7 +124,7 @@ public class State {
}
public Instant minDate() {
if (this.histories.size() == 0) {
if (this.histories.isEmpty()) {
return Instant.now();
}
@@ -173,6 +173,11 @@ public class State {
return this.current.isBreakpoint();
}
@JsonIgnore
public boolean isQueued() {
return this.current.isQueued();
}
@JsonIgnore
public boolean isRetrying() {
return this.current.isRetrying();
@@ -206,6 +211,14 @@ public class State {
return this.histories.get(this.histories.size() - 2).state.isPaused();
}
/**
* Return true if the execution has failed, then was restarted.
* This is to disambiguate between a RESTARTED after PAUSED and RESTARTED after FAILED state.
*/
public boolean failedThenRestarted() {
return this.current == Type.RESTARTED && this.histories.get(this.histories.size() - 2).state.isFailed();
}
@Introspected
public enum Type {
CREATED,
@@ -264,6 +277,10 @@ public class State {
return this == Type.KILLED;
}
public boolean isQueued(){
return this == Type.QUEUED;
}
/**
* @return states that are terminal to an execution
*/

View File

@@ -68,6 +68,19 @@ public class Property<T> {
String getExpression() {
return expression;
}
/**
* Returns a new {@link Property} with no cached rendered value,
* so that the next render will evaluate its original Pebble expression.
* <p>
* The returned property will still cache its rendered result.
* To re-evaluate on a subsequent render, call {@code skipCache()} again.
*
* @return a new {@link Property} without a pre-rendered value
*/
public Property<T> skipCache() {
return Property.ofExpression(expression);
}
/**
* Build a new Property object with a value already set.<br>

View File

@@ -29,6 +29,7 @@ public interface QueueFactoryInterface {
String CLUSTER_EVENT_NAMED = "clusterEventQueue";
String SUBFLOWEXECUTIONEND_NAMED = "subflowExecutionEndQueue";
String EXECUTION_RUNNING_NAMED = "executionRunningQueue";
String MULTIPLE_CONDITION_EVENT_NAMED = "multipleConditionEventQueue";
QueueInterface<Execution> execution();
@@ -61,4 +62,6 @@ public interface QueueFactoryInterface {
QueueInterface<SubflowExecutionEnd> subflowExecutionEnd();
QueueInterface<ExecutionRunning> executionRunning();
QueueInterface<MultipleConditionEvent> multipleConditionEvent();
}

View File

@@ -5,6 +5,7 @@ import io.kestra.core.models.Pauseable;
import io.kestra.core.utils.Either;
import java.io.Closeable;
import java.util.List;
import java.util.function.Consumer;
public interface QueueInterface<T> extends Closeable, Pauseable {
@@ -18,7 +19,15 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
emitAsync(null, message);
}
void emitAsync(String consumerGroup, T message) throws QueueException;
default void emitAsync(String consumerGroup, T message) throws QueueException {
emitAsync(consumerGroup, List.of(message));
}
default void emitAsync(List<T> messages) throws QueueException {
emitAsync(null, messages);
}
void emitAsync(String consumerGroup, List<T> messages) throws QueueException;
default void delete(T message) throws QueueException {
delete(null, message);
@@ -27,7 +36,7 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
void delete(String consumerGroup, T message) throws QueueException;
default Runnable receive(Consumer<Either<T, DeserializationException>> consumer) {
return receive((String) null, consumer);
return receive(null, consumer, false);
}
default Runnable receive(String consumerGroup, Consumer<Either<T, DeserializationException>> consumer) {

View File

@@ -0,0 +1,12 @@
package io.kestra.core.queues;
import java.io.Serial;
public class UnsupportedMessageException extends QueueException {
@Serial
private static final long serialVersionUID = 1L;
public UnsupportedMessageException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -106,6 +106,8 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
Integer purge(Execution execution);
Integer purge(List<Execution> executions);
List<DailyExecutionStatistics> dailyStatisticsForAllTenants(
@Nullable String query,
@Nullable String namespace,
@@ -161,7 +163,7 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
}
List<Execution> lastExecutions(
@Nullable String tenantId,
String tenantId,
@Nullable List<FlowFilter> flows
);
}

View File

@@ -25,8 +25,8 @@ public interface FlowRepositoryInterface {
* Used only if result is used internally and not exposed to the user.
* It is useful when we want to restart/resume a flow.
*/
default Flow findByExecutionWithoutAcl(Execution execution) {
Optional<Flow> find = this.findByIdWithoutAcl(
default FlowWithSource findByExecutionWithoutAcl(Execution execution) {
Optional<FlowWithSource> find = this.findByIdWithSourceWithoutAcl(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),

View File

@@ -94,6 +94,8 @@ public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry
Integer purge(Execution execution);
Integer purge(List<Execution> executions);
void deleteByQuery(String tenantId, String executionId, String taskId, String taskRunId, Level minLevel, Integer attempt);
void deleteByQuery(String tenantId, String namespace, String flowId, String triggerId);

View File

@@ -29,6 +29,8 @@ public interface MetricRepositoryInterface extends SaveRepositoryInterface<Metri
Integer purge(Execution execution);
Integer purge(List<Execution> executions);
Flux<MetricEntry> findAllAsync(@Nullable String tenantId);
default Function<String, String> sortMapping() throws IllegalArgumentException {

View File

@@ -86,7 +86,7 @@ public class Executor {
public Boolean canBeProcessed() {
return !(this.getException() != null || this.getFlow() == null || this.getFlow() instanceof FlowWithException || this.getFlow().getTasks() == null ||
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint());
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint() || this.getExecution().getState().isQueued());
}
public Executor withFlow(FlowWithSource flow) {

View File

@@ -237,9 +237,9 @@ public class ExecutorService {
try {
state = flowableParent.resolveState(runContext, execution, parentTaskRun);
} catch (Exception e) {
// This will lead to the next task being still executed but at least Kestra will not crash.
// This will lead to the next task being still executed, but at least Kestra will not crash.
// This is the best we can do, Flowable task should not fail, so it's a kind of panic mode.
runContext.logger().error("Unable to resolve state from the Flowable task: " + e.getMessage(), e);
runContext.logger().error("Unable to resolve state from the Flowable task: {}", e.getMessage(), e);
state = Optional.of(State.Type.FAILED);
}
Optional<WorkerTaskResult> endedTask = childWorkerTaskTypeToWorkerTask(
@@ -589,6 +589,23 @@ public class ExecutorService {
list = list.stream().filter(workerTaskResult -> !workerTaskResult.getTaskRun().getId().equals(taskRun.getParentTaskRunId()))
.collect(Collectors.toCollection(ArrayList::new));
}
// If the task is a flowable and its terminated, check that all children are terminated.
// This may not be the case for parallel flowable tasks like Parallel, Dag, ForEach...
// After a fail task, some child flowable may not be correctly terminated.
if (task instanceof FlowableTask<?> && taskRun.getState().isTerminated()) {
List<TaskRun> updated = executor.getExecution().findChildren(taskRun).stream()
.filter(child -> !child.getState().isTerminated())
.map(throwFunction(child -> child.withState(taskRun.getState().getCurrent())))
.toList();
if (!updated.isEmpty()) {
Execution execution = executor.getExecution();
for (TaskRun child : updated) {
execution = execution.withTaskRun(child);
}
executor = executor.withExecution(execution, "handledTerminatedFlowableTasks");
}
}
}
metricRegistry
@@ -1045,6 +1062,17 @@ public class ExecutorService {
var executionUpdatingTask = (ExecutionUpdatableTask) workerTask.getTask();
try {
// handle runIf
if (!TruthUtils.isTruthy(workerTask.getRunContext().render(workerTask.getTask().getRunIf()))) {
executor.withExecution(
executor
.getExecution()
.withTaskRun(workerTask.getTaskRun().withState(State.Type.SKIPPED)),
"handleExecutionUpdatingTaskSkipped"
);
return false;
}
executor.withExecution(
executionUpdatingTask.update(executor.getExecution(), workerTask.getRunContext())
.withTaskRun(workerTask.getTaskRun().withState(State.Type.RUNNING)),
@@ -1144,7 +1172,7 @@ public class ExecutorService {
}
}
return taskRuns.size() > execution.getTaskRunList().size() ? execution.withTaskRunList(taskRuns) : null;
return taskRuns.size() > ListUtils.emptyOnNull(execution.getTaskRunList()).size() ? execution.withTaskRunList(taskRuns) : null;
}
public boolean canBePurged(final Executor executor) {

View File

@@ -82,6 +82,8 @@ public abstract class FilesService {
}
private static String resolveUniqueNameForFile(final Path path) {
return IdUtils.from(path.toString()) + "-" + path.toFile().getName();
String filename = path.getFileName().toString();
String encodedFilename = java.net.URLEncoder.encode(filename, java.nio.charset.StandardCharsets.UTF_8);
return IdUtils.from(path.toString()) + "-" + encodedFilename;
}
}

View File

@@ -463,7 +463,7 @@ public class FlowableUtils {
ArrayList<ResolvedTask> result = new ArrayList<>();
int index = 0;
int iteration = 0;
for (Object current : distinctValue) {
try {
String resolvedValue = current instanceof String stringValue ? stringValue : MAPPER.writeValueAsString(current);
@@ -471,7 +471,7 @@ public class FlowableUtils {
result.add(ResolvedTask.builder()
.task(task)
.value(resolvedValue)
.iteration(index++)
.iteration(iteration)
.parentId(parentTaskRun.getId())
.build()
);
@@ -479,6 +479,7 @@ public class FlowableUtils {
} catch (JsonProcessingException e) {
throw new IllegalVariableEvaluationException(e);
}
iteration++;
}
return result;

View File

@@ -0,0 +1,13 @@
package io.kestra.core.runners;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.utils.IdUtils;
public record MultipleConditionEvent(Flow flow, Execution execution) implements HasUID {
@Override
public String uid() {
return IdUtils.fromParts(flow.uidWithoutRevision(), execution.getId());
}
}

View File

@@ -29,7 +29,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
public class RunContextLogger implements Supplier<org.slf4j.Logger> {
private static final int MAX_MESSAGE_LENGTH = 1024 * 10;
private static final int MAX_MESSAGE_LENGTH = 1024 * 15;
public static final String ORIGINAL_TIMESTAMP_KEY = "originalTimestamp";
private final String loggerName;
@@ -80,7 +80,6 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
}
List<LogEntry> result = new ArrayList<>();
long i = 0;
for (String s : split) {
result.add(LogEntry.builder()
.namespace(logEntry.getNamespace())
@@ -98,7 +97,6 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
.thread(event.getThreadName())
.build()
);
i++;
}
return result;
@@ -331,14 +329,11 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
protected void append(ILoggingEvent e) {
e = this.transform(e);
logEntries(e, logEntry)
.forEach(l -> {
try {
logQueue.emitAsync(l);
} catch (QueueException ex) {
log.warn("Unable to emit logQueue", ex);
}
});
try {
logQueue.emitAsync(logEntries(e, logEntry));
} catch (QueueException ex) {
log.warn("Unable to emit logQueue", ex);
}
}
}

View File

@@ -4,15 +4,11 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.Validator;
import lombok.extern.slf4j.Slf4j;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static io.kestra.core.utils.Rethrow.throwFunction;
@@ -27,12 +23,19 @@ public class RunContextProperty<T> {
private final RunContext runContext;
private final Task task;
private final AbstractTrigger trigger;
private final boolean skipCache;
RunContextProperty(Property<T> property, RunContext runContext) {
this(property, runContext, false);
}
RunContextProperty(Property<T> property, RunContext runContext, boolean skipCache) {
this.property = property;
this.runContext = runContext;
this.task = ((DefaultRunContext) runContext).getTask();
this.trigger = ((DefaultRunContext) runContext).getTrigger();
this.skipCache = skipCache;
}
private void validate() {
@@ -45,6 +48,19 @@ public class RunContextProperty<T> {
log.trace("Unable to do validation: no task or trigger found");
}
}
/**
* Returns a new {@link RunContextProperty} that will always be rendered by evaluating
* its original Pebble expression, without using any previously cached value.
* <p>
* This ensures that each time the property is rendered, the underlying
* expression is re-evaluated to produce a fresh result.
*
* @return a new {@link Property} that bypasses the cache
*/
public RunContextProperty<T> skipCache() {
return new RunContextProperty<>(this.property, this.runContext, true);
}
/**
* Render a property then convert it to its target type and validate it.<br>
@@ -55,13 +71,13 @@ public class RunContextProperty<T> {
* Warning, due to the caching mechanism, this method is not thread-safe.
*/
public Optional<T> as(Class<T> clazz) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.as(prop, this.runContext, clazz)));
validate();
return as;
}
/**
* Render a property with additional variables, then convert it to its target type and validate it.<br>
*
@@ -71,7 +87,7 @@ public class RunContextProperty<T> {
* Warning, due to the caching mechanism, this method is not thread-safe.
*/
public Optional<T> as(Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.as(prop, this.runContext, clazz, variables)));
validate();
@@ -89,7 +105,7 @@ public class RunContextProperty<T> {
*/
@SuppressWarnings("unchecked")
public <I> T asList(Class<I> itemClazz) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz)))
.orElse((T) Collections.emptyList());
@@ -108,7 +124,7 @@ public class RunContextProperty<T> {
*/
@SuppressWarnings("unchecked")
public <I> T asList(Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz, variables)))
.orElse((T) Collections.emptyList());
@@ -127,7 +143,7 @@ public class RunContextProperty<T> {
*/
@SuppressWarnings("unchecked")
public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass)))
.orElse((T) Collections.emptyMap());
@@ -146,11 +162,15 @@ public class RunContextProperty<T> {
*/
@SuppressWarnings("unchecked")
public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass, variables)))
.orElse((T) Collections.emptyMap());
validate();
return as;
}
private Property<T> getProperty() {
return skipCache ? this.property.skipCache() : this.property;
}
}

View File

@@ -45,7 +45,7 @@ final class Secret {
for (var entry: data.entrySet()) {
if (entry.getValue() instanceof Map map) {
// if some value are of type EncryptedString we decode them and replace the object
if (EncryptedString.TYPE.equalsIgnoreCase((String)map.get("type"))) {
if (map.get("type") instanceof String typeStr && EncryptedString.TYPE.equalsIgnoreCase(typeStr)) {
try {
String decoded = decrypt((String) map.get("value"));
decryptedMap.put(entry.getKey(), decoded);

View File

@@ -0,0 +1,39 @@
package io.kestra.core.runners;
import io.kestra.core.runners.pebble.PebbleEngineFactory;
import io.kestra.core.runners.pebble.functions.SecretFunction;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.List;
@Singleton
public class SecureVariableRendererFactory {
private final PebbleEngineFactory pebbleEngineFactory;
private final ApplicationContext applicationContext;
private VariableRenderer secureVariableRenderer;
@Inject
public SecureVariableRendererFactory(ApplicationContext applicationContext, PebbleEngineFactory pebbleEngineFactory) {
this.pebbleEngineFactory = pebbleEngineFactory;
this.applicationContext = applicationContext;
}
/**
* Creates or returns the existing secured {@link VariableRenderer} instance.
*
* @return the secured {@link VariableRenderer} instance
*/
public synchronized VariableRenderer createOrGet() {
if (this.secureVariableRenderer == null) {
// Explicitly create a new instance through the application context to ensure
// eventual custom VariableRenderer implementation is used
secureVariableRenderer = applicationContext.createBean(VariableRenderer.class);
secureVariableRenderer.setPebbleEngine(pebbleEngineFactory.createWithMaskedFunctions(secureVariableRenderer, List.of(SecretFunction.NAME)));
}
return secureVariableRenderer;
}
}

View File

@@ -2,121 +2,44 @@ package io.kestra.core.runners;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.pebble.*;
import io.kestra.core.runners.pebble.functions.RenderingFunctionInterface;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.core.annotation.Nullable;
import io.pebbletemplates.pebble.PebbleEngine;
import io.pebbletemplates.pebble.error.AttributeNotFoundException;
import io.pebbletemplates.pebble.error.PebbleException;
import io.pebbletemplates.pebble.extension.Extension;
import io.pebbletemplates.pebble.extension.Function;
import io.pebbletemplates.pebble.template.PebbleTemplate;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Getter;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@Singleton
public class VariableRenderer {
private static final Pattern RAW_PATTERN = Pattern.compile("(\\{%-*\\s*raw\\s*-*%}(.*?)\\{%-*\\s*endraw\\s*-*%})");
public static final int MAX_RENDERING_AMOUNT = 100;
private final PebbleEngine pebbleEngine;
private PebbleEngine pebbleEngine;
private final VariableConfiguration variableConfiguration;
@Inject
public VariableRenderer(ApplicationContext applicationContext, @Nullable VariableConfiguration variableConfiguration) {
this(applicationContext, variableConfiguration, Collections.emptyList());
this(applicationContext.getBean(PebbleEngineFactory.class), variableConfiguration);
}
public VariableRenderer(ApplicationContext applicationContext, @Nullable VariableConfiguration variableConfiguration, List<String> functionsToMask) {
public VariableRenderer(PebbleEngineFactory pebbleEngineFactory, @Nullable VariableConfiguration variableConfiguration) {
this.variableConfiguration = variableConfiguration != null ? variableConfiguration : new VariableConfiguration();
PebbleEngine.Builder pebbleBuilder = new PebbleEngine.Builder()
.registerExtensionCustomizer(ExtensionCustomizer::new)
.strictVariables(true)
.cacheActive(this.variableConfiguration.getCacheEnabled())
.newLineTrimming(false)
.autoEscaping(false);
List<Extension> extensions = applicationContext.getBeansOfType(Extension.class).stream()
.map(e -> functionsToMask.stream().anyMatch(excludedFunction -> e.getFunctions().containsKey(excludedFunction))
? extensionWithMaskedFunctions(e, functionsToMask)
: e)
.toList();
extensions.forEach(pebbleBuilder::extension);
if (this.variableConfiguration.getCacheEnabled()) {
pebbleBuilder.templateCache(new PebbleLruCache(this.variableConfiguration.getCacheSize()));
}
this.pebbleEngine = pebbleBuilder.build();
this.pebbleEngine = pebbleEngineFactory.create();
}
private Extension extensionWithMaskedFunctions(Extension initialExtension, List<String> maskedFunctions) {
return (Extension) Proxy.newProxyInstance(
initialExtension.getClass().getClassLoader(),
new Class[]{Extension.class},
(proxy, method, methodArgs) -> {
if (method.getName().equals("getFunctions")) {
return initialExtension.getFunctions().entrySet().stream()
.map(entry -> {
if (maskedFunctions.contains(entry.getKey())) {
return Map.entry(entry.getKey(), this.maskedFunctionProxy(entry.getValue()));
} else if (RenderingFunctionInterface.class.isAssignableFrom(entry.getValue().getClass())) {
return Map.entry(entry.getKey(), this.variableRendererProxy(entry.getValue()));
}
return entry;
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
return method.invoke(initialExtension, methodArgs);
}
);
public void setPebbleEngine(final PebbleEngine pebbleEngine) {
this.pebbleEngine = pebbleEngine;
}
private Function variableRendererProxy(Function initialFunction) {
return (Function) Proxy.newProxyInstance(
initialFunction.getClass().getClassLoader(),
new Class[]{Function.class, RenderingFunctionInterface.class},
(functionProxy, functionMethod, functionArgs) -> {
if (functionMethod.getName().equals("variableRenderer")) {
return this;
}
return functionMethod.invoke(initialFunction, functionArgs);
}
);
}
private Function maskedFunctionProxy(Function initialFunction) {
return (Function) Proxy.newProxyInstance(
initialFunction.getClass().getClassLoader(),
new Class[]{Function.class},
(functionProxy, functionMethod, functionArgs) -> {
Object result;
try {
result = functionMethod.invoke(initialFunction, functionArgs);
} catch (InvocationTargetException e) {
throw e.getCause();
}
if (functionMethod.getName().equals("execute")) {
return "******";
}
return result;
}
);
}
public static IllegalVariableEvaluationException properPebbleException(PebbleException initialExtension) {
if (initialExtension instanceof AttributeNotFoundException current) {
return new IllegalVariableEvaluationException(

View File

@@ -508,14 +508,11 @@ public class Worker implements Service, Runnable, AutoCloseable {
Execution execution = workerTrigger.getTrigger().isFailOnTriggerError() ? TriggerService.generateExecution(workerTrigger.getTrigger(), workerTrigger.getConditionContext(), workerTrigger.getTriggerContext(), (Output) null)
.withState(FAILED) : null;
if (execution != null) {
RunContextLogger.logEntries(Execution.loggingEventFromException(e), LogEntry.of(execution))
.forEach(log -> {
try {
logQueue.emitAsync(log);
} catch (QueueException ex) {
// fail silently
}
});
try {
logQueue.emitAsync(RunContextLogger.logEntries(Execution.loggingEventFromException(e), LogEntry.of(execution)));
} catch (QueueException ex) {
// fail silently
}
}
this.workerTriggerResultQueue.emit(
WorkerTriggerResult.builder()
@@ -764,6 +761,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
workerTask = workerTask.withTaskRun(workerTask.getTaskRun().withState(state));
WorkerTaskResult workerTaskResult = new WorkerTaskResult(workerTask.getTaskRun(), dynamicTaskRuns);
this.workerTaskResultQueue.emit(workerTaskResult);
// upload the cache file, hash may not be present if we didn't succeed in computing it
@@ -796,6 +794,10 @@ public class Worker implements Service, Runnable, AutoCloseable {
// If it's a message too big, we remove the outputs
failed = failed.withOutputs(Variables.empty());
}
if (e instanceof UnsupportedMessageException) {
// we expect the offending char is in the output so we remove it
failed = failed.withOutputs(Variables.empty());
}
WorkerTaskResult workerTaskResult = new WorkerTaskResult(failed);
RunContextLogger contextLogger = runContextLoggerFactory.create(workerTask);
contextLogger.logger().error("Unable to emit the worker task result to the queue: {}", e.getMessage(), e);
@@ -818,7 +820,11 @@ public class Worker implements Service, Runnable, AutoCloseable {
private Optional<String> hashTask(RunContext runContext, Task task) {
try {
var map = JacksonMapper.toMap(task);
var rMap = runContext.render(map);
// If there are task provided variables, rendering the task may fail.
// The best we can do is to add a fake 'workingDir' as it's an often added variables,
// and it should not be part of the task hash.
Map<String, Object> variables = Map.of("workingDir", "workingDir");
var rMap = runContext.render(map, variables);
var json = JacksonMapper.ofJson().writeValueAsBytes(rMap);
MessageDigest digest = MessageDigest.getInstance("SHA-256");
digest.update(json);

View File

@@ -0,0 +1,118 @@
package io.kestra.core.runners.pebble;
import io.kestra.core.runners.VariableRenderer;
import io.kestra.core.runners.pebble.functions.RenderingFunctionInterface;
import io.micronaut.context.ApplicationContext;
import io.micronaut.core.annotation.Nullable;
import io.pebbletemplates.pebble.PebbleEngine;
import io.pebbletemplates.pebble.extension.Extension;
import io.pebbletemplates.pebble.extension.Function;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Singleton
public class PebbleEngineFactory {
private final ApplicationContext applicationContext;
private final VariableRenderer.VariableConfiguration variableConfiguration;
@Inject
public PebbleEngineFactory(ApplicationContext applicationContext, @Nullable VariableRenderer.VariableConfiguration variableConfiguration) {
this.applicationContext = applicationContext;
this.variableConfiguration = variableConfiguration;
}
public PebbleEngine create() {
PebbleEngine.Builder builder = newPebbleEngineBuilder();
this.applicationContext.getBeansOfType(Extension.class).forEach(builder::extension);
return builder.build();
}
public PebbleEngine createWithMaskedFunctions(VariableRenderer renderer, final List<String> functionsToMask) {
PebbleEngine.Builder builder = newPebbleEngineBuilder();
this.applicationContext.getBeansOfType(Extension.class).stream()
.map(e -> functionsToMask.stream().anyMatch(fun -> e.getFunctions().containsKey(fun))
? extensionWithMaskedFunctions(renderer, e, functionsToMask)
: e)
.forEach(builder::extension);
return builder.build();
}
private PebbleEngine.Builder newPebbleEngineBuilder() {
PebbleEngine.Builder builder = new PebbleEngine.Builder()
.registerExtensionCustomizer(ExtensionCustomizer::new)
.strictVariables(true)
.cacheActive(this.variableConfiguration.getCacheEnabled())
.newLineTrimming(false)
.autoEscaping(false);
if (this.variableConfiguration.getCacheEnabled()) {
builder = builder.templateCache(new PebbleLruCache(this.variableConfiguration.getCacheSize()));
}
return builder;
}
private Extension extensionWithMaskedFunctions(VariableRenderer renderer, Extension initialExtension, List<String> maskedFunctions) {
return (Extension) Proxy.newProxyInstance(
initialExtension.getClass().getClassLoader(),
new Class[]{Extension.class},
(proxy, method, methodArgs) -> {
if (method.getName().equals("getFunctions")) {
return initialExtension.getFunctions().entrySet().stream()
.map(entry -> {
if (maskedFunctions.contains(entry.getKey())) {
return Map.entry(entry.getKey(), this.maskedFunctionProxy(entry.getValue()));
} else if (RenderingFunctionInterface.class.isAssignableFrom(entry.getValue().getClass())) {
return Map.entry(entry.getKey(), this.variableRendererProxy(renderer, entry.getValue()));
}
return entry;
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
return method.invoke(initialExtension, methodArgs);
}
);
}
private Function variableRendererProxy(VariableRenderer renderer, Function initialFunction) {
return (Function) Proxy.newProxyInstance(
initialFunction.getClass().getClassLoader(),
new Class[]{Function.class, RenderingFunctionInterface.class},
(functionProxy, functionMethod, functionArgs) -> {
if (functionMethod.getName().equals("variableRenderer")) {
return renderer;
}
return functionMethod.invoke(initialFunction, functionArgs);
}
);
}
private Function maskedFunctionProxy(Function initialFunction) {
return (Function) Proxy.newProxyInstance(
initialFunction.getClass().getClassLoader(),
new Class[]{Function.class},
(functionProxy, functionMethod, functionArgs) -> {
Object result;
try {
result = functionMethod.invoke(initialFunction, functionArgs);
} catch (InvocationTargetException e) {
throw e.getCause();
}
if (functionMethod.getName().equals("execute")) {
return "******";
}
return result;
}
);
}
}

View File

@@ -1,14 +1,15 @@
package io.kestra.core.runners.pebble.filters;
import java.util.List;
import java.util.Map;
import com.google.common.collect.Lists;
import io.pebbletemplates.pebble.error.PebbleException;
import io.pebbletemplates.pebble.extension.Filter;
import io.pebbletemplates.pebble.template.EvaluationContext;
import io.pebbletemplates.pebble.template.PebbleTemplate;
import java.util.List;
import java.util.Map;
public class ChunkFilter implements Filter {
@Override
public List<String> getArgumentNames() {
@@ -30,6 +31,10 @@ public class ChunkFilter implements Filter {
throw new PebbleException(null, "'chunk' filter can only be applied to List. Actual type was: " + input.getClass().getName(), lineNumber, self.getName());
}
return Lists.partition((List) input, ((Long) args.get("size")).intValue());
Object sizeObj = args.get("size");
if (!(sizeObj instanceof Number)) {
throw new PebbleException(null, "'chunk' filter argument 'size' must be a number. Actual type was: " + sizeObj.getClass().getName(), lineNumber, self.getName());
}
return Lists.partition((List) input, ((Number) sizeObj).intValue());
}
}

View File

@@ -17,12 +17,17 @@ import java.util.List;
import java.util.Map;
public class JqFilter implements Filter {
private final Scope scope;
// Load Scope once as static to avoid repeated initialization
// This improves performance by loading builtin functions only once when the class loads
private static final Scope SCOPE;
private final List<String> argumentNames = new ArrayList<>();
static {
SCOPE = Scope.newEmptyScope();
BuiltinFunctionLoader.getInstance().loadFunctions(Versions.JQ_1_6, SCOPE);
}
public JqFilter() {
scope = Scope.newEmptyScope();
BuiltinFunctionLoader.getInstance().loadFunctions(Versions.JQ_1_6, scope);
this.argumentNames.add("expression");
}
@@ -43,10 +48,7 @@ public class JqFilter implements Filter {
String pattern = (String) args.get("expression");
Scope rootScope = Scope.newEmptyScope();
BuiltinFunctionLoader.getInstance().loadFunctions(Versions.JQ_1_6, rootScope);
try {
JsonQuery q = JsonQuery.compile(pattern, Versions.JQ_1_6);
JsonNode in;
@@ -59,7 +61,7 @@ public class JqFilter implements Filter {
final List<Object> out = new ArrayList<>();
try {
q.apply(scope, in, v -> {
q.apply(Scope.newChildScope(SCOPE), in, v -> {
if (v instanceof TextNode) {
out.add(v.textValue());
} else if (v instanceof NullNode) {

View File

@@ -8,6 +8,7 @@ import io.kestra.core.events.CrudEventType;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
@@ -29,10 +30,7 @@ import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.server.ServiceType;
import io.kestra.core.services.*;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.Either;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.*;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.core.util.CollectionUtils;
@@ -91,7 +89,9 @@ public abstract class AbstractScheduler implements Scheduler, Service {
private volatile Boolean isReady = false;
private final ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> scheduledFuture;
private final ScheduledExecutorService executionMonitorExecutor = Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> executionMonitorFuture;
@Getter
protected SchedulerTriggerStateInterface triggerState;
@@ -152,7 +152,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
this.flowListeners.run();
this.flowListeners.listen(this::initializedTriggers);
ScheduledFuture<?> evaluationLoop = scheduleExecutor.scheduleAtFixedRate(
scheduledFuture = scheduleExecutor.scheduleAtFixedRate(
this::handle,
0,
1,
@@ -162,10 +162,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
// look at exception on the evaluation loop thread
Thread.ofVirtual().name("scheduler-evaluation-loop-watch").start(
() -> {
Await.until(evaluationLoop::isDone);
Await.until(scheduledFuture::isDone);
try {
evaluationLoop.get();
scheduledFuture.get();
} catch (CancellationException ignored) {
} catch (ExecutionException | InterruptedException e) {
@@ -177,7 +177,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
);
// Periodically report metrics and logs of running executions
ScheduledFuture<?> monitoringLoop = executionMonitorExecutor.scheduleWithFixedDelay(
executionMonitorFuture = executionMonitorExecutor.scheduleWithFixedDelay(
this::executionMonitor,
30,
10,
@@ -187,10 +187,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
// look at exception on the monitoring loop thread
Thread.ofVirtual().name("scheduler-monitoring-loop-watch").start(
() -> {
Await.until(monitoringLoop::isDone);
Await.until(executionMonitorFuture::isDone);
try {
monitoringLoop.get();
executionMonitorFuture.get();
} catch (CancellationException ignored) {
} catch (ExecutionException | InterruptedException e) {
@@ -302,6 +302,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
// Initialized local trigger state,
// and if some flows were created outside the box, for example from the CLI,
// then we may have some triggers that are not created yet.
/* FIXME: There is a race between Kafka stream consumption & initializedTriggers: we can override a trigger update coming from a stream consumption with an old one because stream consumption is not waiting for trigger initialization
* Example: we see a SUCCESS execution so we reset the trigger's executionId but then the initializedTriggers resubmits an old trigger state for some reasons (evaluationDate for eg.) */
private void initializedTriggers(List<FlowWithSource> flows) {
record FlowAndTrigger(FlowWithSource flow, AbstractTrigger trigger) {
@Override
@@ -318,7 +320,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
}
synchronized (this) { // we need a sync block as we read then update so we should not do it in multiple threads concurrently
List<Trigger> triggers = triggerState.findAllForAllTenants();
Map<String, Trigger> triggers = triggerState.findAllForAllTenants().stream().collect(Collectors.toMap(HasUID::uid, Function.identity()));
flows
.stream()
@@ -328,7 +330,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
.flatMap(flow -> flow.getTriggers().stream().filter(trigger -> trigger instanceof WorkerTriggerInterface).map(trigger -> new FlowAndTrigger(flow, trigger)))
.distinct()
.forEach(flowAndTrigger -> {
Optional<Trigger> trigger = triggers.stream().filter(t -> t.uid().equals(Trigger.uid(flowAndTrigger.flow(), flowAndTrigger.trigger()))).findFirst(); // must have one or none
String triggerUid = Trigger.uid(flowAndTrigger.flow(), flowAndTrigger.trigger());
Optional<Trigger> trigger = Optional.ofNullable(triggers.get(triggerUid));
if (trigger.isEmpty()) {
RunContext runContext = runContextFactory.of(flowAndTrigger.flow(), flowAndTrigger.trigger());
ConditionContext conditionContext = conditionService.conditionContext(runContext, flowAndTrigger.flow(), null);
@@ -371,10 +374,13 @@ public abstract class AbstractScheduler implements Scheduler, Service {
this.triggerState.update(lastUpdate);
}
} else if (recoverMissedSchedules == RecoverMissedSchedules.NONE) {
lastUpdate = trigger.get().toBuilder().nextExecutionDate(schedule.nextEvaluationDate()).build();
} else {
ZonedDateTime nextEvaluationDate = schedule.nextEvaluationDate();
if (recoverMissedSchedules == RecoverMissedSchedules.NONE && !Objects.equals(trigger.get().getNextExecutionDate(), nextEvaluationDate)) {
lastUpdate = trigger.get().toBuilder().nextExecutionDate(nextEvaluationDate).build();
this.triggerState.update(lastUpdate);
this.triggerState.update(lastUpdate);
}
}
// Used for schedulableNextDate
FlowWithWorkerTrigger flowWithWorkerTrigger = FlowWithWorkerTrigger.builder()
@@ -467,9 +473,12 @@ public abstract class AbstractScheduler implements Scheduler, Service {
private List<FlowWithTriggers> computeSchedulable(List<FlowWithSource> flows, List<Trigger> triggerContextsToEvaluate, ScheduleContextInterface scheduleContext) {
List<String> flowToKeep = triggerContextsToEvaluate.stream().map(Trigger::getFlowId).toList();
List<String> flowIds = flows.stream().map(FlowId::uidWithoutRevision).toList();
Map<String, Trigger> triggerById = triggerContextsToEvaluate.stream().collect(Collectors.toMap(HasUID::uid, Function.identity()));
// delete trigger which flow has been deleted
triggerContextsToEvaluate.stream()
.filter(trigger -> !flows.stream().map(FlowId::uidWithoutRevision).toList().contains(FlowId.uid(trigger)))
.filter(trigger -> !flowIds.contains(FlowId.uid(trigger)))
.forEach(trigger -> {
try {
this.triggerState.delete(trigger);
@@ -491,12 +500,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
.map(abstractTrigger -> {
RunContext runContext = runContextFactory.of(flow, abstractTrigger);
ConditionContext conditionContext = conditionService.conditionContext(runContext, flow, null);
Trigger triggerContext = null;
Trigger lastTrigger = triggerContextsToEvaluate
.stream()
.filter(triggerContextToFind -> triggerContextToFind.uid().equals(Trigger.uid(flow, abstractTrigger)))
.findFirst()
.orElse(null);
Trigger triggerContext;
Trigger lastTrigger = triggerById.get(Trigger.uid(flow, abstractTrigger));
// If a trigger is not found in triggers to evaluate, then we ignore it
if (lastTrigger == null) {
return null;
@@ -1006,8 +1011,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
setState(ServiceState.TERMINATING);
this.receiveCancellations.forEach(Runnable::run);
this.scheduleExecutor.shutdown();
this.executionMonitorExecutor.shutdown();
ExecutorsUtils.closeScheduledThreadPool(this.scheduleExecutor, Duration.ofSeconds(5), List.of(scheduledFuture));
ExecutorsUtils.closeScheduledThreadPool(executionMonitorExecutor, Duration.ofSeconds(5), List.of(executionMonitorFuture));
try {
if (onClose != null) {
onClose.run();

View File

@@ -1,6 +1,7 @@
package io.kestra.core.server;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.utils.ExecutorsUtils;
import io.micronaut.core.annotation.Introspected;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
@@ -8,9 +9,11 @@ import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -25,6 +28,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
protected final ServerConfig serverConfig;
private final AtomicBoolean isStopped = new AtomicBoolean(false);
private ScheduledExecutorService scheduledExecutorService;
private ScheduledFuture<?> scheduledFuture;
private Instant lastScheduledExecution;
/**
@@ -98,7 +102,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, name));
Duration scheduleInterval = getScheduleInterval();
log.debug("Scheduling '{}' at fixed rate {}.", name, scheduleInterval);
scheduledExecutorService.scheduleAtFixedRate(
scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(
this,
0,
scheduleInterval.toSeconds(),
@@ -133,20 +137,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
@Override
public void close() {
if (isStopped.compareAndSet(false, true) && scheduledExecutorService != null) {
scheduledExecutorService.shutdown();
if (scheduledExecutorService.isTerminated()) {
return;
}
try {
if (!scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS)) {
log.debug("Failed to wait for scheduled '{}' task termination. Cause: Timeout", name);
}
log.debug("Stopped scheduled '{}' task.", name);
} catch (InterruptedException e) {
scheduledExecutorService.shutdownNow();
Thread.currentThread().interrupt();
log.debug("Failed to wait for scheduled '{}' task termination. Cause: Interrupted.", name);
}
ExecutorsUtils.closeScheduledThreadPool(scheduledExecutorService, Duration.ofSeconds(5), List.of(scheduledFuture));
}
}
}

View File

@@ -14,6 +14,7 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.input.InputAndValue;
import io.kestra.core.models.hierarchies.AbstractGraphTask;
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
@@ -56,8 +57,7 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static io.kestra.core.utils.Rethrow.throwFunction;
import static io.kestra.core.utils.Rethrow.throwPredicate;
import static io.kestra.core.utils.Rethrow.*;
@Singleton
@Slf4j
@@ -122,21 +122,38 @@ public class ExecutionService {
* Retry set the given taskRun in created state
* and return the execution in running state
**/
public Execution retryTask(Execution execution, String taskRunId) {
List<TaskRun> newTaskRuns = execution
.getTaskRunList()
.stream()
.map(taskRun -> {
if (taskRun.getId().equals(taskRunId)) {
return taskRun
.withState(State.Type.CREATED);
public Execution retryTask(Execution execution, Flow flow, String taskRunId) throws InternalException {
TaskRun taskRun = execution.findTaskRunByTaskRunId(taskRunId).withState(State.Type.CREATED);
List<TaskRun> taskRunList = execution.getTaskRunList();
if (taskRun.getParentTaskRunId() != null) {
// we need to find the parent to remove any errors or finally tasks already executed
TaskRun parentTaskRun = execution.findTaskRunByTaskRunId(taskRun.getParentTaskRunId());
Task parentTask = flow.findTaskByTaskId(parentTaskRun.getTaskId());
if (parentTask instanceof FlowableTask<?> flowableTask) {
if (flowableTask.getErrors() != null) {
List<Task> allErrors = Stream.concat(flowableTask.getErrors().stream()
.filter(task -> task.isFlowable() && ((FlowableTask<?>) task).getErrors() != null)
.flatMap(task -> ((FlowableTask<?>) task).getErrors().stream()),
flowableTask.getErrors().stream())
.toList();
allErrors.forEach(error -> taskRunList.removeIf(t -> t.getTaskId().equals(error.getId())));
}
return taskRun;
})
.toList();
if (flowableTask.getFinally() != null) {
List<Task> allFinally = Stream.concat(flowableTask.getFinally().stream()
.filter(task -> task.isFlowable() && ((FlowableTask<?>) task).getFinally() != null)
.flatMap(task -> ((FlowableTask<?>) task).getFinally().stream()),
flowableTask.getFinally().stream())
.toList();
allFinally.forEach(error -> taskRunList.removeIf(t -> t.getTaskId().equals(error.getId())));
}
}
return execution.withTaskRunList(newTaskRuns).withState(State.Type.RUNNING);
return execution.withTaskRunList(taskRunList).withTaskRun(taskRun).withState(State.Type.RUNNING);
}
return execution.withTaskRun(taskRun).withState(State.Type.RUNNING);
}
public Execution retryWaitFor(Execution execution, String flowableTaskRunId) {
@@ -317,6 +334,32 @@ public class ExecutionService {
return revision != null ? newExecution.withFlowRevision(revision) : newExecution;
}
public Execution changeTaskRunState(final Execution execution, Flow flow, String taskRunId, State.Type newState) throws Exception {
Execution newExecution = markAs(execution, flow, taskRunId, newState);
// if the execution was terminated, it could have executed errors/finally/afterExecutions, we must remove them as the execution will be restarted
if (execution.getState().isTerminated()) {
List<TaskRun> newTaskRuns = newExecution.getTaskRunList();
// We need to remove global error tasks and flowable error tasks if any
flow
.allErrorsWithChildren()
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
// We need to remove global finally tasks and flowable error tasks if any
flow
.allFinallyWithChildren()
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
// We need to remove afterExecution tasks
ListUtils.emptyOnNull(flow.getAfterExecution())
.forEach(task -> newTaskRuns.removeIf(taskRun -> taskRun.getTaskId().equals(task.getId())));
return newExecution.withTaskRunList(newTaskRuns);
} else {
return newExecution;
}
}
public Execution markAs(final Execution execution, FlowInterface flow, String taskRunId, State.Type newState) throws Exception {
return this.markAs(execution, flow, taskRunId, newState, null, null);
}
@@ -396,7 +439,8 @@ public class ExecutionService {
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
@Nullable List<State.Type> state
@Nullable List<State.Type> state,
int batchSize
) throws IOException {
PurgeResult purgeResult = this.executionRepository
.find(
@@ -413,24 +457,27 @@ public class ExecutionService {
null,
true
)
.map(throwFunction(execution -> {
.buffer(batchSize)
.map(throwFunction(executions -> {
PurgeResult.PurgeResultBuilder<?, ?> builder = PurgeResult.builder();
if (purgeExecution) {
builder.executionsCount(this.executionRepository.purge(execution));
builder.executionsCount(this.executionRepository.purge(executions));
}
if (purgeLog) {
builder.logsCount(this.logRepository.purge(execution));
builder.logsCount(this.logRepository.purge(executions));
}
if (purgeMetric) {
builder.metricsCount(this.metricRepository.purge(execution));
builder.metricsCount(this.metricRepository.purge(executions));
}
if (purgeStorage) {
URI uri = StorageContext.forExecution(execution).getExecutionStorageURI(StorageContext.KESTRA_SCHEME);
builder.storagesCount(storageInterface.deleteByPrefix(execution.getTenantId(), execution.getNamespace(), uri).size());
executions.forEach(throwConsumer(execution -> {
URI uri = StorageContext.forExecution(execution).getExecutionStorageURI(StorageContext.KESTRA_SCHEME);
builder.storagesCount(storageInterface.deleteByPrefix(execution.getTenantId(), execution.getNamespace(), uri).size());
}));
}
return (PurgeResult) builder.build();
@@ -671,7 +718,7 @@ public class ExecutionService {
// An edge case can exist where the execution is resumed automatically before we resume it with a killing.
try {
newExecution = this.resume(execution, flow, State.Type.KILLING, null);
newExecution = newExecution.withState(afterKillState.orElse(newExecution.getState().getCurrent()));
newExecution = newExecution.withState(killingOrAfterKillState);
} catch (Exception e) {
// if we cannot resume, we set it anyway to killing, so we don't throw
log.warn("Unable to resume a paused execution before killing it", e);
@@ -684,6 +731,7 @@ public class ExecutionService {
eventPublisher.publishEvent(new CrudEvent<>(newExecution, execution, CrudEventType.UPDATE));
return newExecution;
}
public Execution kill(Execution execution, FlowInterface flow) {
return this.kill(execution, flow, Optional.empty());
}

View File

@@ -9,6 +9,7 @@ import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.validations.ModelValidator;
@@ -51,7 +52,6 @@ import java.util.stream.StreamSupport;
@Singleton
@Slf4j
public class FlowService {
@Inject
Optional<FlowRepositoryInterface> flowRepository;
@@ -236,6 +236,7 @@ public class FlowService {
}
List<String> warnings = new ArrayList<>(checkValidSubflows(flow, tenantId));
List<io.kestra.plugin.core.trigger.Flow> flowTriggers = ListUtils.emptyOnNull(flow.getTriggers()).stream()
.filter(io.kestra.plugin.core.trigger.Flow.class::isInstance)
.map(io.kestra.plugin.core.trigger.Flow.class::cast)
@@ -246,6 +247,21 @@ public class FlowService {
}
});
// add warning for runnable properties (timeout, workerGroup, taskCache) when used not in a runnable
flow.allTasksWithChilds().forEach(task -> {
if (!(task instanceof RunnableTask<?>)) {
if (task.getTimeout() != null) {
warnings.add("The task '" + task.getId() + "' cannot use the 'timeout' property as it's only relevant for runnable tasks.");
}
if (task.getTaskCache() != null) {
warnings.add("The task '" + task.getId() + "' cannot use the 'taskCache' property as it's only relevant for runnable tasks.");
}
if (task.getWorkerGroup() != null) {
warnings.add("The task '" + task.getId() + "' cannot use the 'workerGroup' property as it's only relevant for runnable tasks.");
}
}
});
return warnings;
}

View File

@@ -1,6 +1,7 @@
package io.kestra.core.services;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKind;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
@@ -10,7 +11,6 @@ import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInte
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.utils.ListUtils;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.AllArgsConstructor;
import lombok.Getter;
@@ -24,16 +24,16 @@ import java.util.stream.Stream;
@Singleton
public class FlowTriggerService {
@Inject
private ConditionService conditionService;
private final ConditionService conditionService;
private final RunContextFactory runContextFactory;
private final FlowService flowService;
@Inject
private RunContextFactory runContextFactory;
public FlowTriggerService(ConditionService conditionService, RunContextFactory runContextFactory, FlowService flowService) {
this.conditionService = conditionService;
this.runContextFactory = runContextFactory;
this.flowService = flowService;
}
@Inject
private FlowService flowService;
// used in EE only
public Stream<FlowWithFlowTrigger> withFlowTriggersOnly(Stream<FlowWithSource> allFlows) {
return allFlows
.filter(flow -> !flow.isDisabled())
@@ -49,14 +49,147 @@ public class FlowTriggerService {
.map(io.kestra.plugin.core.trigger.Flow.class::cast);
}
public List<Execution> computeExecutionsFromFlowTriggers(Execution execution, List<? extends Flow> allFlows, Optional<MultipleConditionStorageInterface> multipleConditionStorage) {
List<FlowWithFlowTrigger> validTriggersBeforeMultipleConditionEval = allFlows.stream()
/**
* This method computes executions to trigger from flow triggers from a given execution.
* It only computes those depending on standard (non-multiple / non-preconditions) conditions, so it must be used
* in conjunction with {@link #computeExecutionsFromFlowTriggerPreconditions(Execution, Flow, MultipleConditionStorageInterface)}.
*/
public List<Execution> computeExecutionsFromFlowTriggerConditions(Execution execution, Flow flow) {
List<FlowWithFlowTrigger> flowWithFlowTriggers = computeFlowTriggers(execution, flow)
.stream()
// we must filter on no multiple conditions and no preconditions to avoid evaluating two times triggers that have standard conditions and multiple conditions
.filter(it -> it.getTrigger().getPreconditions() == null && ListUtils.emptyOnNull(it.getTrigger().getConditions()).stream().noneMatch(MultipleCondition.class::isInstance))
.toList();
// short-circuit empty triggers to evaluate
if (flowWithFlowTriggers.isEmpty()) {
return Collections.emptyList();
}
// compute all executions to create from flow triggers without taken into account multiple conditions
return flowWithFlowTriggers.stream()
.map(f -> f.getTrigger().evaluate(
Optional.empty(),
runContextFactory.of(f.getFlow(), execution),
f.getFlow(),
execution
))
.filter(Optional::isPresent)
.map(Optional::get)
.toList();
}
/**
* This method computes executions to trigger from flow triggers from a given execution.
* It only computes those depending on multiple conditions and preconditions, so it must be used
* in conjunction with {@link #computeExecutionsFromFlowTriggerConditions(Execution, Flow)}.
*/
public List<Execution> computeExecutionsFromFlowTriggerPreconditions(Execution execution, Flow flow, MultipleConditionStorageInterface multipleConditionStorage) {
List<FlowWithFlowTrigger> flowWithFlowTriggers = computeFlowTriggers(execution, flow)
.stream()
// we must filter on multiple conditions or preconditions to avoid evaluating two times triggers that only have standard conditions
.filter(flowWithFlowTrigger -> flowWithFlowTrigger.getTrigger().getPreconditions() != null || ListUtils.emptyOnNull(flowWithFlowTrigger.getTrigger().getConditions()).stream().anyMatch(MultipleCondition.class::isInstance))
.toList();
// short-circuit empty triggers to evaluate
if (flowWithFlowTriggers.isEmpty()) {
return Collections.emptyList();
}
List<FlowWithFlowTriggerAndMultipleCondition> flowWithMultipleConditionsToEvaluate = flowWithFlowTriggers.stream()
.flatMap(flowWithFlowTrigger -> flowTriggerMultipleConditions(flowWithFlowTrigger)
.map(multipleCondition -> new FlowWithFlowTriggerAndMultipleCondition(
flowWithFlowTrigger.getFlow(),
multipleConditionStorage.getOrCreate(flowWithFlowTrigger.getFlow(), multipleCondition, execution.getOutputs()),
flowWithFlowTrigger.getTrigger(),
multipleCondition
)
)
)
// avoid evaluating expired windows (for ex for daily time window or deadline)
.filter(flowWithFlowTriggerAndMultipleCondition -> flowWithFlowTriggerAndMultipleCondition.getMultipleConditionWindow().isValid(ZonedDateTime.now()))
.toList();
// evaluate multiple conditions
Map<FlowWithFlowTriggerAndMultipleCondition, MultipleConditionWindow> multipleConditionWindowsByFlow = flowWithMultipleConditionsToEvaluate.stream().map(f -> {
Map<String, Boolean> results = f.getMultipleCondition()
.getConditions()
.entrySet()
.stream()
.map(e -> new AbstractMap.SimpleEntry<>(
e.getKey(),
conditionService.isValid(e.getValue(), f.getFlow(), execution)
))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return Map.entry(f, f.getMultipleConditionWindow().with(results));
})
.filter(e -> !e.getValue().getResults().isEmpty())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// persist results
multipleConditionStorage.save(new ArrayList<>(multipleConditionWindowsByFlow.values()));
// compute all executions to create from flow triggers now that multiple conditions storage is populated
List<Execution> executions = flowWithFlowTriggers.stream()
// will evaluate conditions
.filter(flowWithFlowTrigger ->
conditionService.isValid(
flowWithFlowTrigger.getTrigger(),
flowWithFlowTrigger.getFlow(),
execution,
multipleConditionStorage
)
)
// will evaluate preconditions
.filter(flowWithFlowTrigger ->
conditionService.isValid(
flowWithFlowTrigger.getTrigger().getPreconditions(),
flowWithFlowTrigger.getFlow(),
execution,
multipleConditionStorage
)
)
.map(f -> f.getTrigger().evaluate(
Optional.of(multipleConditionStorage),
runContextFactory.of(f.getFlow(), execution),
f.getFlow(),
execution
))
.filter(Optional::isPresent)
.map(Optional::get)
.toList();
// purge fulfilled or expired multiple condition windows
Stream.concat(
multipleConditionWindowsByFlow.entrySet().stream()
.map(e -> Map.entry(
e.getKey().getMultipleCondition(),
e.getValue()
))
.filter(e -> !Boolean.FALSE.equals(e.getKey().getResetOnSuccess()) &&
e.getKey().getConditions().size() == Optional.ofNullable(e.getValue().getResults()).map(Map::size).orElse(0)
)
.map(Map.Entry::getValue),
multipleConditionStorage.expired(execution.getTenantId()).stream()
).forEach(multipleConditionStorage::delete);
return executions;
}
private List<FlowWithFlowTrigger> computeFlowTriggers(Execution execution, Flow flow) {
if (
// prevent recursive flow triggers
.filter(flow -> flowService.removeUnwanted(flow, execution))
// ensure flow & triggers are enabled
.filter(flow -> !flow.isDisabled() && !(flow instanceof FlowWithException))
.filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty())
.flatMap(flow -> flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger)))
!flowService.removeUnwanted(flow, execution) ||
// filter out Test Executions
execution.getKind() != null ||
// ensure flow & triggers are enabled
flow.isDisabled() || flow instanceof FlowWithException ||
flow.getTriggers() == null || flow.getTriggers().isEmpty()) {
return Collections.emptyList();
}
return flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger))
// filter on the execution state the flow listen to
.filter(flowWithFlowTrigger -> flowWithFlowTrigger.getTrigger().getStates().contains(execution.getState().getCurrent()))
// validate flow triggers conditions excluding multiple conditions
@@ -71,96 +204,6 @@ public class FlowTriggerService {
execution
)
)).toList();
// short-circuit empty triggers to evaluate
if (validTriggersBeforeMultipleConditionEval.isEmpty()) {
return Collections.emptyList();
}
Map<FlowWithFlowTriggerAndMultipleCondition, MultipleConditionWindow> multipleConditionWindowsByFlow = null;
if (multipleConditionStorage.isPresent()) {
List<FlowWithFlowTriggerAndMultipleCondition> flowWithMultipleConditionsToEvaluate = validTriggersBeforeMultipleConditionEval.stream()
.flatMap(flowWithFlowTrigger -> flowTriggerMultipleConditions(flowWithFlowTrigger)
.map(multipleCondition -> new FlowWithFlowTriggerAndMultipleCondition(
flowWithFlowTrigger.getFlow(),
multipleConditionStorage.get().getOrCreate(flowWithFlowTrigger.getFlow(), multipleCondition, execution.getOutputs()),
flowWithFlowTrigger.getTrigger(),
multipleCondition
)
)
)
// avoid evaluating expired windows (for ex for daily time window or deadline)
.filter(flowWithFlowTriggerAndMultipleCondition -> flowWithFlowTriggerAndMultipleCondition.getMultipleConditionWindow().isValid(ZonedDateTime.now()))
.toList();
// evaluate multiple conditions
multipleConditionWindowsByFlow = flowWithMultipleConditionsToEvaluate.stream().map(f -> {
Map<String, Boolean> results = f.getMultipleCondition()
.getConditions()
.entrySet()
.stream()
.map(e -> new AbstractMap.SimpleEntry<>(
e.getKey(),
conditionService.isValid(e.getValue(), f.getFlow(), execution)
))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return Map.entry(f, f.getMultipleConditionWindow().with(results));
})
.filter(e -> !e.getValue().getResults().isEmpty())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// persist results
multipleConditionStorage.get().save(new ArrayList<>(multipleConditionWindowsByFlow.values()));
}
// compute all executions to create from flow triggers now that multiple conditions storage is populated
List<Execution> executions = validTriggersBeforeMultipleConditionEval.stream()
// will evaluate conditions
.filter(flowWithFlowTrigger ->
conditionService.isValid(
flowWithFlowTrigger.getTrigger(),
flowWithFlowTrigger.getFlow(),
execution,
multipleConditionStorage.orElse(null)
)
)
// will evaluate preconditions
.filter(flowWithFlowTrigger ->
conditionService.isValid(
flowWithFlowTrigger.getTrigger().getPreconditions(),
flowWithFlowTrigger.getFlow(),
execution,
multipleConditionStorage.orElse(null)
)
)
.map(f -> f.getTrigger().evaluate(
multipleConditionStorage,
runContextFactory.of(f.getFlow(), execution),
f.getFlow(),
execution
))
.filter(Optional::isPresent)
.map(Optional::get)
.toList();
if (multipleConditionStorage.isPresent()) {
// purge fulfilled or expired multiple condition windows
Stream.concat(
multipleConditionWindowsByFlow.entrySet().stream()
.map(e -> Map.entry(
e.getKey().getMultipleCondition(),
e.getValue()
))
.filter(e -> !Boolean.FALSE.equals(e.getKey().getResetOnSuccess()) &&
e.getKey().getConditions().size() == Optional.ofNullable(e.getValue().getResults()).map(Map::size).orElse(0)
)
.map(Map.Entry::getValue),
multipleConditionStorage.get().expired(execution.getTenantId()).stream()
).forEach(multipleConditionStorage.get()::delete);
}
return executions;
}
private Stream<MultipleCondition> flowTriggerMultipleConditions(FlowWithFlowTrigger flowWithFlowTrigger) {

View File

@@ -173,18 +173,15 @@ public class PluginDefaultService {
try {
return this.injectAllDefaults(flow, false);
} catch (Exception e) {
RunContextLogger
.logEntries(
Execution.loggingEventFromException(e),
LogEntry.of(execution)
)
.forEach(logEntry -> {
try {
logQueue.emitAsync(logEntry);
} catch (QueueException e1) {
// silently do nothing
}
});
try {
logQueue.emitAsync(RunContextLogger
.logEntries(
Execution.loggingEventFromException(e),
LogEntry.of(execution)
));
} catch (QueueException e1) {
// silently do nothing
}
return readWithoutDefaultsOrThrow(flow);
}
}

View File

@@ -3,6 +3,7 @@ package io.kestra.core.services;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import jakarta.annotation.Nullable;
import jakarta.inject.Singleton;
import java.util.Collections;
@@ -14,6 +15,7 @@ public class SkipExecutionService {
private volatile List<FlowId> skipFlows = Collections.emptyList();
private volatile List<NamespaceId> skipNamespaces = Collections.emptyList();
private volatile List<String> skipTenants = Collections.emptyList();
private volatile List<String> skipIndexerRecords = Collections.emptyList();
public synchronized void setSkipExecutions(List<String> skipExecutions) {
this.skipExecutions = skipExecutions == null ? Collections.emptyList() : skipExecutions;
@@ -31,6 +33,10 @@ public class SkipExecutionService {
this.skipTenants = skipTenants == null ? Collections.emptyList() : skipTenants;
}
public synchronized void setSkipIndexerRecords(List<String> skipIndexerRecords) {
this.skipIndexerRecords = skipIndexerRecords == null ? Collections.emptyList() : skipIndexerRecords;
}
/**
* Warning: this method didn't check the flow, so it must be used only when neither of the others can be used.
*/
@@ -46,6 +52,14 @@ public class SkipExecutionService {
return skipExecution(taskRun.getTenantId(), taskRun.getNamespace(), taskRun.getFlowId(), taskRun.getExecutionId());
}
/**
* Skip an indexer records based on its key.
* @param key the record key as computed by <code>QueueService.key(record)</code>, can be null
*/
public boolean skipIndexerRecord(@Nullable String key) {
return key != null && skipIndexerRecords.contains(key);
}
@VisibleForTesting
boolean skipExecution(String tenant, String namespace, String flow, String executionId) {
return (tenant != null && skipTenants.contains(tenant)) ||

View File

@@ -6,6 +6,10 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
/**
* @deprecated use {@link org.awaitility.Awaitility} instead
*/
@Deprecated
public class Await {
private static final Duration defaultSleep = Duration.ofMillis(100);

View File

@@ -3,12 +3,16 @@ package io.kestra.core.utils;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.*;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
@Singleton
@Slf4j
public class ExecutorsUtils {
@Inject
private ThreadMainFactoryBuilder threadFactoryBuilder;
@@ -61,6 +65,29 @@ public class ExecutorsUtils {
);
}
public static void closeScheduledThreadPool(ScheduledExecutorService scheduledExecutorService, Duration gracePeriod, List<ScheduledFuture<?>> taskFutures) {
scheduledExecutorService.shutdown();
if (scheduledExecutorService.isTerminated()) {
return;
}
try {
if (!scheduledExecutorService.awaitTermination(gracePeriod.toMillis(), TimeUnit.MILLISECONDS)) {
log.warn("Failed to shutdown the ScheduledThreadPoolExecutor during grace period, forcing it to shutdown now");
// Ensure the scheduled task reaches a terminal state to avoid possible memory leak
ListUtils.emptyOnNull(taskFutures).forEach(taskFuture -> taskFuture.cancel(true));
scheduledExecutorService.shutdownNow();
}
log.debug("Stopped scheduled ScheduledThreadPoolExecutor.");
} catch (InterruptedException e) {
scheduledExecutorService.shutdownNow();
Thread.currentThread().interrupt();
log.debug("Failed to shutdown the ScheduledThreadPoolExecutor.");
}
}
private ExecutorService wrap(String name, ExecutorService executorService) {
return ExecutorServiceMetrics.monitor(
meterRegistry,

View File

@@ -54,9 +54,10 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
violations.add("Namespace '" + value.getNamespace() + "' does not exist but is required to exist before a flow can be created in it.");
}
List<Task> allTasks = value.allTasksWithChilds();
// tasks unique id
List<String> taskIds = value.allTasksWithChilds()
.stream()
List<String> taskIds = allTasks.stream()
.map(Task::getId)
.toList();
@@ -72,8 +73,8 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
violations.add("Duplicate trigger id with name [" + String.join(", ", duplicateIds) + "]");
}
value.allTasksWithChilds()
.stream().filter(task -> task instanceof ExecutableTask<?> executableTask
allTasks.stream()
.filter(task -> task instanceof ExecutableTask<?> executableTask
&& value.getId().equals(executableTask.subflowId().flowId())
&& value.getNamespace().equals(executableTask.subflowId().namespace()))
.forEach(task -> violations.add("Recursive call to flow [" + value.getNamespace() + "." + value.getId() + "]"));
@@ -102,7 +103,7 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
.map(input -> Pattern.compile("\\{\\{\\s*inputs." + input.getId() + "\\s*\\}\\}"))
.collect(Collectors.toList());
List<String> invalidTasks = value.allTasks()
List<String> invalidTasks = allTasks.stream()
.filter(task -> checkObjectFieldsWithPatterns(task, inputsWithMinusPatterns))
.map(task -> task.getId())
.collect(Collectors.toList());
@@ -112,12 +113,12 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
" [" + String.join(", ", invalidTasks) + "]");
}
List<Pattern> outputsWithMinusPattern = value.allTasks()
List<Pattern> outputsWithMinusPattern = allTasks.stream()
.filter(output -> Optional.ofNullable(output.getId()).orElse("").contains("-"))
.map(output -> Pattern.compile("\\{\\{\\s*outputs\\." + output.getId() + "\\.[^}]+\\s*\\}\\}"))
.collect(Collectors.toList());
invalidTasks = value.allTasks()
invalidTasks = allTasks.stream()
.filter(task -> checkObjectFieldsWithPatterns(task, outputsWithMinusPattern))
.map(task -> task.getId())
.collect(Collectors.toList());

View File

@@ -90,7 +90,7 @@ public class ExecutionOutputs extends Condition implements ScheduleCondition {
private static final String OUTPUTS_VAR = "outputs";
@NotNull
private Property<String> expression;
private Property<Boolean> expression;
/** {@inheritDoc} **/
@SuppressWarnings("unchecked")
@@ -105,9 +105,8 @@ public class ExecutionOutputs extends Condition implements ScheduleCondition {
conditionContext.getVariables(),
Map.of(TRIGGER_VAR, Map.of(OUTPUTS_VAR, conditionContext.getExecution().getOutputs()))
);
String render = conditionContext.getRunContext().render(expression).as(String.class, variables).orElseThrow();
return !(render.isBlank() || render.trim().equals("false"));
return conditionContext.getRunContext().render(expression).skipCache().as(Boolean.class, variables).orElseThrow();
}
private boolean hasNoOutputs(final Execution execution) {

View File

@@ -19,7 +19,6 @@ import lombok.experimental.SuperBuilder;
@NoArgsConstructor
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
@EqualsAndHashCode
//@TriggersDataFilterValidation
@Schema(
title = "Display Execution data in a dashboard chart.",
description = "Execution data can be displayed in charts broken out by Namespace and filtered by State, for example."

View File

@@ -21,6 +21,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.*;
import lombok.experimental.SuperBuilder;
import lombok.extern.slf4j.Slf4j;
import java.util.Optional;
@@ -68,6 +69,7 @@ import java.util.Optional;
)
}
)
@Slf4j
public class Exit extends Task implements ExecutionUpdatableTask {
@NotNull
@Schema(
@@ -104,12 +106,13 @@ public class Exit extends Task implements ExecutionUpdatableTask {
// ends all parents
while (newTaskRun.getParentTaskRunId() != null) {
newTaskRun = newExecution.findTaskRunByTaskRunId(newTaskRun.getParentTaskRunId()).withState(exitState);
newExecution = execution.withTaskRun(newTaskRun);
newExecution = newExecution.withTaskRun(newTaskRun);
}
return newExecution;
} catch (InternalException e) {
// in case we cannot update the last not terminated task run, we ignore it
return execution;
log.warn("Unable to update the taskrun state", e);
return execution.withState(exitState);
}
})
.orElse(execution)

View File

@@ -102,6 +102,14 @@ public class PurgeExecutions extends Task implements RunnableTask<PurgeExecution
@Builder.Default
private Property<Boolean> purgeStorage = Property.ofValue(true);
@Schema(
title = "The size of the bulk delete",
description = "For performance, deletion is made by batch of by default 100 executions/logs/metrics."
)
@Builder.Default
@NotNull
private Property<Integer> batchSize = Property.ofValue(100);
@Override
public PurgeExecutions.Output run(RunContext runContext) throws Exception {
ExecutionService executionService = ((DefaultRunContext)runContext).getApplicationContext().getBean(ExecutionService.class);
@@ -124,9 +132,10 @@ public class PurgeExecutions extends Task implements RunnableTask<PurgeExecution
flowInfo.tenantId(),
renderedNamespace,
runContext.render(flowId).as(String.class).orElse(null),
startDate != null ? ZonedDateTime.parse(runContext.render(startDate).as(String.class).orElseThrow()) : null,
runContext.render(startDate).as(String.class).map(ZonedDateTime::parse).orElse(null),
ZonedDateTime.parse(runContext.render(endDate).as(String.class).orElseThrow()),
this.states == null ? null : runContext.render(this.states).asList(State.Type.class)
this.states == null ? null : runContext.render(this.states).asList(State.Type.class),
runContext.render(this.batchSize).as(Integer.class).orElseThrow()
);
return Output.builder()

View File

@@ -208,48 +208,50 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
return Optional.empty();
}
boolean isOutputsAllowed = runContext
.<Boolean>pluginConfiguration(PLUGIN_FLOW_OUTPUTS_ENABLED)
.orElse(true);
final Output.OutputBuilder builder = Output.builder()
.executionId(execution.getId())
.state(execution.getState().getCurrent());
final Map<String, Object> subflowOutputs = Optional
.ofNullable(flow.getOutputs())
.map(outputs -> outputs
.stream()
.collect(Collectors.toMap(
io.kestra.core.models.flows.Output::getId,
io.kestra.core.models.flows.Output::getValue)
)
)
.orElseGet(() -> isOutputsAllowed ? this.getOutputs() : null);
VariablesService variablesService = ((DefaultRunContext) runContext).getApplicationContext().getBean(VariablesService.class);
if (subflowOutputs != null) {
try {
Map<String, Object> outputs = runContext.render(subflowOutputs);
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
if (flow.getOutputs() != null && flowInputOutput != null) {
outputs = flowInputOutput.typedOutputs(flow, execution, outputs);
}
builder.outputs(outputs);
} catch (Exception e) {
runContext.logger().warn("Failed to extract outputs with the error: '{}'", e.getLocalizedMessage(), e);
var state = State.Type.fail(this);
Variables variables = variablesService.of(StorageContext.forTask(taskRun), builder.build());
taskRun = taskRun
.withState(state)
.withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(state)).build()))
.withOutputs(variables);
if (this.wait) { // we only compute outputs if we wait for the subflow
boolean isOutputsAllowed = runContext
.<Boolean>pluginConfiguration(PLUGIN_FLOW_OUTPUTS_ENABLED)
.orElse(true);
return Optional.of(SubflowExecutionResult.builder()
.executionId(execution.getId())
.state(State.Type.FAILED)
.parentTaskRun(taskRun)
.build());
final Map<String, Object> subflowOutputs = Optional
.ofNullable(flow.getOutputs())
.map(outputs -> outputs
.stream()
.collect(Collectors.toMap(
io.kestra.core.models.flows.Output::getId,
io.kestra.core.models.flows.Output::getValue)
)
)
.orElseGet(() -> isOutputsAllowed ? this.getOutputs() : null);
if (subflowOutputs != null) {
try {
Map<String, Object> outputs = runContext.render(subflowOutputs);
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
if (flow.getOutputs() != null && flowInputOutput != null) {
outputs = flowInputOutput.typedOutputs(flow, execution, outputs);
}
builder.outputs(outputs);
} catch (Exception e) {
runContext.logger().warn("Failed to extract outputs with the error: '{}'", e.getLocalizedMessage(), e);
var state = State.Type.fail(this);
Variables variables = variablesService.of(StorageContext.forTask(taskRun), builder.build());
taskRun = taskRun
.withState(state)
.withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(state)).build()))
.withOutputs(variables);
return Optional.of(SubflowExecutionResult.builder()
.executionId(execution.getId())
.state(State.Type.FAILED)
.parentTaskRun(taskRun)
.build());
}
}
}

View File

@@ -102,7 +102,7 @@ public class Switch extends Task implements FlowableTask<Switch.Output> {
@Schema(
title = "The map of keys and a list of tasks to be executed if the conditional `value` matches the key."
)
@PluginProperty
@PluginProperty(additionalProperties = Task[].class)
private Map<String, List<Task>> cases;
@Valid

View File

@@ -20,8 +20,6 @@ import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.net.URI;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
@@ -60,7 +58,15 @@ import static io.kestra.core.utils.Rethrow.throwConsumer;
public class Download extends AbstractHttp implements RunnableTask<Download.Output> {
@Schema(title = "Should the task fail when downloading an empty file.")
@Builder.Default
private final Property<Boolean> failOnEmptyResponse = Property.ofValue(true);
private Property<Boolean> failOnEmptyResponse = Property.ofValue(true);
@Schema(
title = "Name of the file inside the output.",
description = """
If not provided, the filename will be extracted from the `Content-Disposition` header.
If no `Content-Disposition` header, a name would be generated."""
)
private Property<String> saveAs;
public Output run(RunContext runContext) throws Exception {
Logger logger = runContext.logger();
@@ -111,20 +117,22 @@ public class Download extends AbstractHttp implements RunnableTask<Download.Outp
}
}
String filename = null;
if (response.getHeaders().firstValue("Content-Disposition").isPresent()) {
String contentDisposition = response.getHeaders().firstValue("Content-Disposition").orElseThrow();
filename = filenameFromHeader(runContext, contentDisposition);
}
if (filename != null) {
filename = URLEncoder.encode(filename, StandardCharsets.UTF_8);
String rFilename = runContext.render(this.saveAs).as(String.class).orElse(null);
if (rFilename == null) {
if (response.getHeaders().firstValue("Content-Disposition").isPresent()) {
String contentDisposition = response.getHeaders().firstValue("Content-Disposition").orElseThrow();
rFilename = filenameFromHeader(runContext, contentDisposition);
if (rFilename != null) {
rFilename = rFilename.replace(' ', '+');
}
}
}
logger.debug("File '{}' downloaded with size '{}'", from, size);
return Output.builder()
.code(response.getStatus().getCode())
.uri(runContext.storage().putFile(tempFile, filename))
.uri(runContext.storage().putFile(tempFile, rFilename))
.headers(response.getHeaders().map())
.length(size.get())
.build();
@@ -173,8 +181,8 @@ public class Download extends AbstractHttp implements RunnableTask<Download.Outp
if (path.indexOf('/') != -1) {
path = path.substring(path.lastIndexOf('/')); // keep the last segment
}
if (path.indexOf('.') != -1) {
return path.substring(path.indexOf('.'));
if (path.lastIndexOf('.') != -1) {
return path.substring(path.lastIndexOf('.'));
}
return null;
}

View File

@@ -9,11 +9,11 @@ import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.services.FlowService;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import org.codehaus.commons.nullanalysis.NotNull;
import java.util.NoSuchElementException;

View File

@@ -103,8 +103,10 @@ public class Set extends Task implements RunnableTask<VoidOutput> {
KVStore kvStore = runContext.namespaceKv(renderedNamespace);
if (kvType != null && renderedValue instanceof String renderedValueStr) {
renderedValue = switch (runContext.render(kvType).as(KVType.class).orElseThrow()) {
if (kvType != null){
KVType renderedKvType = runContext.render(kvType).as(KVType.class).orElseThrow();
if (renderedValue instanceof String renderedValueStr) {
renderedValue = switch (renderedKvType) {
case NUMBER -> JacksonMapper.ofJson().readValue(renderedValueStr, Number.class);
case BOOLEAN -> Boolean.parseBoolean((String) renderedValue);
case DATETIME, DATE -> Instant.parse(renderedValueStr);
@@ -112,7 +114,10 @@ public class Set extends Task implements RunnableTask<VoidOutput> {
case JSON -> JacksonMapper.toObject(renderedValueStr);
default -> renderedValue;
};
} else if (renderedValue instanceof Number valueNumber && renderedKvType == KVType.STRING) {
renderedValue = valueNumber.toString();
}
}
kvStore.put(renderedKey, new KVValueAndMetadata(
new KVMetadata(

View File

@@ -56,7 +56,8 @@ public class OverrideRetryInterceptor implements MethodInterceptor<Object, Objec
retry.get("delay", Duration.class).orElse(Duration.ofSeconds(1)),
retry.get("maxDelay", Duration.class).orElse(null),
new DefaultRetryPredicate(resolveIncludes(retry, "includes"), resolveIncludes(retry, "excludes")),
Throwable.class
Throwable.class,
0
);
MutableConvertibleValues<Object> attrs = context.getAttributes();

View File

@@ -0,0 +1,11 @@
<svg width="512" height="512" viewBox="0 0 512 512" fill="currentColor" xmlns="http://www.w3.org/2000/svg">
<g clip-path="url(#clip0_1765_9330)">
<path d="M244.592 215.915C251.569 208.938 262.881 208.938 269.858 215.915L298.537 244.595C305.514 251.572 305.514 262.883 298.537 269.86L269.858 298.54C262.881 305.517 251.569 305.517 244.592 298.54L215.913 269.86C208.936 262.883 208.936 251.572 215.913 244.595L244.592 215.915Z" />
<path d="M376.685 215.687C383.537 208.835 394.646 208.835 401.498 215.687L430.63 244.818C437.482 251.67 437.482 262.78 430.63 269.632L401.498 298.763C394.646 305.615 383.537 305.615 376.685 298.763L347.553 269.632C340.701 262.78 340.701 251.67 347.553 244.818L376.685 215.687Z" />
<path d="M244.818 83.8243C251.671 76.9722 262.78 76.9722 269.632 83.8243L298.763 112.956C305.616 119.808 305.616 130.917 298.763 137.769L269.632 166.901C262.78 173.753 251.671 173.753 244.818 166.901L215.687 137.769C208.835 130.917 208.835 119.808 215.687 112.956L244.818 83.8243Z" />
<path d="M232.611 178.663C239.588 185.64 239.588 196.951 232.611 203.928L203.931 232.608C196.955 239.585 185.643 239.585 178.666 232.608L149.986 203.928C143.01 196.952 143.01 185.64 149.986 178.663L178.666 149.983C185.643 143.006 196.955 143.006 203.931 149.983L232.611 178.663Z" />
<path d="M166.901 244.818C173.753 251.67 173.753 262.78 166.901 269.632L137.77 298.763C130.918 305.615 119.808 305.615 112.956 298.763L83.8246 269.632C76.9725 262.78 76.9725 251.67 83.8246 244.818L112.956 215.687C119.808 208.835 130.918 208.835 137.77 215.687L166.901 244.818Z" />
<path d="M364.472 178.663C371.449 185.64 371.449 196.951 364.472 203.928L335.793 232.608C328.816 239.585 317.504 239.585 310.527 232.608L281.848 203.928C274.871 196.952 274.871 185.64 281.848 178.663L310.527 149.983C317.504 143.006 328.816 143.006 335.793 149.983L364.472 178.663Z" />
<path d="M285.45 367.015C301.037 382.602 301.037 407.873 285.45 423.46C269.863 439.047 244.591 439.047 229.004 423.46C213.417 407.873 213.417 382.602 229.004 367.015C244.591 351.428 269.863 351.428 285.45 367.015Z" />
</g>
</svg>

After

Width:  |  Height:  |  Size: 2.1 KiB

View File

@@ -0,0 +1,11 @@
<svg width="512" height="512" viewBox="0 0 512 512" fill="currentColor" xmlns="http://www.w3.org/2000/svg">
<g clip-path="url(#clip0_1765_9330)">
<path d="M244.592 215.915C251.569 208.938 262.881 208.938 269.858 215.915L298.537 244.595C305.514 251.572 305.514 262.883 298.537 269.86L269.858 298.54C262.881 305.517 251.569 305.517 244.592 298.54L215.913 269.86C208.936 262.883 208.936 251.572 215.913 244.595L244.592 215.915Z" />
<path d="M376.685 215.687C383.537 208.835 394.646 208.835 401.498 215.687L430.63 244.818C437.482 251.67 437.482 262.78 430.63 269.632L401.498 298.763C394.646 305.615 383.537 305.615 376.685 298.763L347.553 269.632C340.701 262.78 340.701 251.67 347.553 244.818L376.685 215.687Z" />
<path d="M244.818 83.8243C251.671 76.9722 262.78 76.9722 269.632 83.8243L298.763 112.956C305.616 119.808 305.616 130.917 298.763 137.769L269.632 166.901C262.78 173.753 251.671 173.753 244.818 166.901L215.687 137.769C208.835 130.917 208.835 119.808 215.687 112.956L244.818 83.8243Z" />
<path d="M232.611 178.663C239.588 185.64 239.588 196.951 232.611 203.928L203.931 232.608C196.955 239.585 185.643 239.585 178.666 232.608L149.986 203.928C143.01 196.952 143.01 185.64 149.986 178.663L178.666 149.983C185.643 143.006 196.955 143.006 203.931 149.983L232.611 178.663Z" />
<path d="M166.901 244.818C173.753 251.67 173.753 262.78 166.901 269.632L137.77 298.763C130.918 305.615 119.808 305.615 112.956 298.763L83.8246 269.632C76.9725 262.78 76.9725 251.67 83.8246 244.818L112.956 215.687C119.808 208.835 130.918 208.835 137.77 215.687L166.901 244.818Z" />
<path d="M364.472 178.663C371.449 185.64 371.449 196.951 364.472 203.928L335.793 232.608C328.816 239.585 317.504 239.585 310.527 232.608L281.848 203.928C274.871 196.952 274.871 185.64 281.848 178.663L310.527 149.983C317.504 143.006 328.816 143.006 335.793 149.983L364.472 178.663Z" />
<path d="M285.45 367.015C301.037 382.602 301.037 407.873 285.45 423.46C269.863 439.047 244.591 439.047 229.004 423.46C213.417 407.873 213.417 382.602 229.004 367.015C244.591 351.428 269.863 351.428 285.45 367.015Z" />
</g>
</svg>

After

Width:  |  Height:  |  Size: 2.1 KiB

View File

@@ -112,7 +112,7 @@ class JsonSchemaGeneratorTest {
var requiredWithDefault = definitions.get("io.kestra.core.docs.JsonSchemaGeneratorTest-RequiredWithDefault");
assertThat(requiredWithDefault, is(notNullValue()));
assertThat((List<String>) requiredWithDefault.get("required"), not(contains("requiredWithDefault")));
assertThat((List<String>) requiredWithDefault.get("required"), not(containsInAnyOrder("requiredWithDefault", "anotherRequiredWithDefault")));
var properties = (Map<String, Map<String, Object>>) flow.get("properties");
var listeners = properties.get("listeners");
@@ -253,7 +253,7 @@ class JsonSchemaGeneratorTest {
void requiredAreRemovedIfThereIsADefault() {
Map<String, Object> generate = jsonSchemaGenerator.properties(Task.class, RequiredWithDefault.class);
assertThat(generate, is(not(nullValue())));
assertThat((List<String>) generate.get("required"), not(containsInAnyOrder("requiredWithDefault")));
assertThat((List<String>) generate.get("required"), not(containsInAnyOrder("requiredWithDefault", "anotherRequiredWithDefault")));
assertThat((List<String>) generate.get("required"), containsInAnyOrder("requiredWithNoDefault"));
}
@@ -466,6 +466,11 @@ class JsonSchemaGeneratorTest {
@Builder.Default
private Property<TaskWithEnum.TestClass> requiredWithDefault = Property.ofValue(TaskWithEnum.TestClass.builder().testProperty("test").build());
@PluginProperty
@NotNull
@Builder.Default
private Property<TaskWithEnum.TestClass> anotherRequiredWithDefault = Property.ofValue(TaskWithEnum.TestClass.builder().testProperty("test2").build());
@PluginProperty
@NotNull
private Property<TaskWithEnum.TestClass> requiredWithNoDefault;

View File

@@ -44,6 +44,7 @@ import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static io.kestra.core.models.flows.FlowScope.USER;
@@ -392,6 +393,20 @@ public abstract class AbstractExecutionRepositoryTest {
assertThat(full.isPresent()).isFalse();
}
@Test
protected void purgeExecutions() {
var execution1 = ExecutionFixture.EXECUTION_1;
executionRepository.save(execution1);
var execution2 = ExecutionFixture.EXECUTION_2;
executionRepository.save(execution2);
var results = executionRepository.purge(List.of(execution1, execution2));
assertThat(results).isEqualTo(2);
assertThat(executionRepository.findById(null, execution1.getId())).isEmpty();
assertThat(executionRepository.findById(null, execution2.getId())).isEmpty();
}
@Test
protected void delete() {
executionRepository.save(ExecutionFixture.EXECUTION_1);
@@ -740,4 +755,16 @@ public abstract class AbstractExecutionRepositoryTest {
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
assertThat(executions.size()).isEqualTo(0L);
}
@Test
protected void shouldReturnLastExecutionsWhenInputsAreNull() {
inject();
List<Execution> lastExecutions = executionRepository.lastExecutions(MAIN_TENANT, null);
assertThat(lastExecutions).isNotEmpty();
Set<String> flowIds = lastExecutions.stream().map(Execution::getFlowId).collect(Collectors.toSet());
assertThat(flowIds.size()).isEqualTo(lastExecutions.size());
}
}

View File

@@ -113,7 +113,8 @@ public abstract class AbstractExecutionServiceTest {
flow.getId(),
null,
ZonedDateTime.now(),
null
null,
100
);
assertThat(purge.getExecutionsCount()).isEqualTo(1);
@@ -131,7 +132,8 @@ public abstract class AbstractExecutionServiceTest {
flow.getId(),
null,
ZonedDateTime.now(),
null
null,
100
);
assertThat(purge.getExecutionsCount()).isZero();

View File

@@ -281,18 +281,6 @@ public abstract class AbstractFlowRepositoryTest {
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT);
}
@Test
void findByNamespace() {
List<Flow> save = flowRepository.findByNamespace(MAIN_TENANT, "io.kestra.tests");
assertThat((long) save.size()).isEqualTo(Helpers.FLOWS_COUNT - 22);
save = flowRepository.findByNamespace(MAIN_TENANT, "io.kestra.tests2");
assertThat((long) save.size()).isEqualTo(1L);
save = flowRepository.findByNamespace(MAIN_TENANT, "io.kestra.tests.minimal.bis");
assertThat((long) save.size()).isEqualTo(1L);
}
@Test
void findByNamespacePrefix() {
List<Flow> save = flowRepository.findByNamespacePrefix(MAIN_TENANT, "io.kestra.tests");
@@ -609,13 +597,7 @@ public abstract class AbstractFlowRepositoryTest {
assertThat(FlowListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.CREATE).count()).isEqualTo(1L);
assertThat(FlowListener.getEmits().stream().filter(r -> r.getType() == CrudEventType.DELETE).count()).isEqualTo(1L);
}
@Test
void findDistinctNamespace() {
List<String> distinctNamespace = flowRepository.findDistinctNamespace(MAIN_TENANT);
assertThat((long) distinctNamespace.size()).isEqualTo(8L);
}
@Test
protected void shouldReturnNullRevisionForNonExistingFlow() {
assertThat(flowRepository.lastRevision(TEST_TENANT_ID, TEST_NAMESPACE, IdUtils.create())).isNull();

View File

@@ -13,6 +13,8 @@ import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.flows.State;
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.dashboard.data.Executions;
import io.kestra.plugin.core.dashboard.data.Logs;
import io.micronaut.data.model.Pageable;
import jakarta.inject.Inject;
@@ -41,12 +43,16 @@ public abstract class AbstractLogRepositoryTest {
@Inject
protected LogRepositoryInterface logRepository;
protected static LogEntry.LogEntryBuilder logEntry(Level level) {
private static LogEntry.LogEntryBuilder logEntry(Level level) {
return logEntry(level, "executionId");
}
protected static LogEntry.LogEntryBuilder logEntry(Level level, String executionId) {
return LogEntry.builder()
.flowId("flowId")
.namespace("io.kestra.unittest")
.taskId("taskId")
.executionId("executionId")
.executionId(executionId)
.taskRunId(IdUtils.create())
.attemptNumber(0)
.timestamp(Instant.now())
@@ -343,4 +349,15 @@ public abstract class AbstractLogRepositoryTest {
assertThat(results).hasSize(1);
}
@Test
void purge() {
logRepository.save(logEntry(Level.INFO, "execution1").build());
logRepository.save(logEntry(Level.INFO, "execution1").build());
logRepository.save(logEntry(Level.INFO, "execution2").build());
logRepository.save(logEntry(Level.INFO, "execution2").build());
var result = logRepository.purge(List.of(Execution.builder().id("execution1").build(), Execution.builder().id("execution2").build()));
assertThat(result).isEqualTo(4);
}
}

View File

@@ -1,6 +1,7 @@
package io.kestra.core.repositories;
import com.devskiller.friendly_id.FriendlyId;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKind;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.models.executions.TaskRun;
@@ -11,6 +12,7 @@ import io.micronaut.data.model.Pageable;
import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import org.slf4j.event.Level;
import java.time.Duration;
import java.time.ZonedDateTime;
@@ -115,6 +117,17 @@ public abstract class AbstractMetricRepositoryTest {
assertThat(results).hasSize(3);
}
@Test
void purge() {
metricRepository.save(MetricEntry.of(taskRun("execution1", "task"), counter("counter1"), null));
metricRepository.save(MetricEntry.of(taskRun("execution1", "task"), counter("counter2"), null));
metricRepository.save(MetricEntry.of(taskRun("execution2", "task"), counter("counter1"), null));
metricRepository.save(MetricEntry.of(taskRun("execution2", "task"), counter("counter2"), null));
var result = metricRepository.purge(List.of(Execution.builder().id("execution1").build(), Execution.builder().id("execution2").build()));
assertThat(result).isEqualTo(4);
}
private Counter counter(String metricName) {
return Counter.of(metricName, 1);
}

View File

@@ -387,6 +387,13 @@ public abstract class AbstractRunnerTest {
forEachItemCaseTest.forEachItemInIf();
}
@Test
@LoadFlows({"flows/valids/for-each-item-subflow-after-execution.yaml",
"flows/valids/for-each-item-after-execution.yaml"})
protected void forEachItemWithAfterExecution() throws Exception {
forEachItemCaseTest.forEachItemWithAfterExecution();
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-cancel.yml"})
void concurrencyCancel() throws Exception {
@@ -423,6 +430,24 @@ public abstract class AbstractRunnerTest {
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem();
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-queue-fail.yml"})
protected void concurrencyQueueRestarted() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueueRestarted();
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-queue-after-execution.yml"})
void concurrencyQueueAfterExecution() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueueAfterExecution();
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-subflow.yml", "flows/valids/flow-concurrency-cancel.yml"})
void flowConcurrencySubflow() throws Exception {
flowConcurrencyCaseTest.flowConcurrencySubflow();
}
@Test
@ExecuteFlow("flows/valids/executable-fail.yml")
void badExecutable(Execution execution) {

View File

@@ -0,0 +1,58 @@
package io.kestra.core.runners;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.junit.annotations.KestraTest;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Primary;
import io.micronaut.context.annotation.Requires;
import io.micronaut.test.annotation.MockBean;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.junit.jupiter.api.Test;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
class CustomVariableRendererTest {
@Inject
private SecureVariableRendererFactory secureVariableRendererFactory;
@Inject
private VariableRenderer renderer;
@Test
void shouldUseCustomVariableRender() throws IllegalVariableEvaluationException {
// When
String result = renderer.render("{{ dummy }}", Map.of());
// Then
assertThat(result).isEqualTo("alternativeRender");
}
@Test
void shouldUseCustomVariableRenderWhenUsingSecured() throws IllegalVariableEvaluationException {
// Given
VariableRenderer renderer = secureVariableRendererFactory.createOrGet();
// When
String result = renderer.render("{{ dummy }}", Map.of());
// Then
assertThat(result).isEqualTo("alternativeRender");
}
@MockBean(VariableRenderer.class)
VariableRenderer testCustomRenderer(ApplicationContext applicationContext) {
return new VariableRenderer(applicationContext, null) {
@Override
protected String alternativeRender(Exception e, String inline, Map<String, Object> variables) {
return "alternativeRender";
}
};
}
}

View File

@@ -119,6 +119,7 @@ class ExecutionServiceTest {
assertThat(restart.getState().getHistories()).hasSize(4);
assertThat(restart.getTaskRunList().stream().filter(taskRun -> taskRun.getState().getCurrent() == State.Type.RESTARTED).count()).isGreaterThan(1L);
assertThat(restart.getTaskRunList().stream().filter(taskRun -> taskRun.getState().getCurrent() == State.Type.RUNNING).count()).isGreaterThan(1L);
assertThat(restart.getTaskRunList().getFirst().getId()).isEqualTo(restart.getTaskRunList().getFirst().getId());
assertThat(restart.getLabels()).contains(new Label(Label.RESTARTED, "true"));
}
@@ -413,9 +414,9 @@ class ExecutionServiceTest {
Execution killed = executionService.kill(execution, flow);
assertThat(killed.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
assertThat(killed.getState().getCurrent()).isEqualTo(State.Type.KILLING);
assertThat(killed.findTaskRunsByTaskId("pause").getFirst().getState().getCurrent()).isEqualTo(State.Type.KILLED);
assertThat(killed.getState().getHistories()).hasSize(4);
assertThat(killed.getState().getHistories()).hasSize(5);
}
@Test
@@ -442,4 +443,22 @@ class ExecutionServiceTest {
assertThat(killed.findTaskRunsByTaskId("pause").getFirst().getState().getCurrent()).isEqualTo(State.Type.KILLED);
assertThat(killed.getState().getHistories()).hasSize(5);
}
@Test
@LoadFlows({"flows/valids/change-state-errors.yaml"})
void changeStateWithErrorBranch() throws Exception {
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "change-state-errors");
Flow flow = flowRepository.findByExecution(execution);
assertThat(execution.getTaskRunList()).hasSize(3);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
Execution restart = executionService.changeTaskRunState(execution, flow, execution.findTaskRunsByTaskId("make_error").getFirst().getId(), State.Type.SUCCESS);
assertThat(restart.getState().getCurrent()).isEqualTo(State.Type.RESTARTED);
assertThat(restart.getMetadata().getAttemptNumber()).isEqualTo(2);
assertThat(restart.getState().getHistories()).hasSize(4);
assertThat(restart.getTaskRunList()).hasSize(2);
assertThat(restart.findTaskRunsByTaskId("make_error").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
}
}

View File

@@ -9,6 +9,7 @@ import io.micronaut.context.annotation.Property;
import jakarta.inject.Inject;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.ByteArrayInputStream;
import java.io.File;
@@ -97,6 +98,35 @@ class FilesServiceTest {
assertThat(outputs.size()).isEqualTo(1);
}
@Test
void testOutputFilesWithSpecialCharacters(@TempDir Path tempDir) throws Exception {
var runContext = runContextFactory.of();
Path fileWithSpace = tempDir.resolve("with space.txt");
Path fileWithUnicode = tempDir.resolve("สวัสดี.txt");
Files.writeString(fileWithSpace, "content");
Files.writeString(fileWithUnicode, "content");
Path targetFileWithSpace = runContext.workingDir().path().resolve("with space.txt");
Path targetFileWithUnicode = runContext.workingDir().path().resolve("สวัสดี.txt");
Files.copy(fileWithSpace, targetFileWithSpace);
Files.copy(fileWithUnicode, targetFileWithUnicode);
Map<String, URI> outputFiles = FilesService.outputFiles(
runContext,
List.of("with space.txt", "สวัสดี.txt")
);
assertThat(outputFiles).hasSize(2);
assertThat(outputFiles).containsKey("with space.txt");
assertThat(outputFiles).containsKey("สวัสดี.txt");
assertThat(runContext.storage().getFile(outputFiles.get("with space.txt"))).isNotNull();
assertThat(runContext.storage().getFile(outputFiles.get("สวัสดี.txt"))).isNotNull();
}
private URI createFile() throws IOException {
File tempFile = File.createTempFile("file", ".txt");
Files.write(tempFile.toPath(), "Hello World".getBytes());

View File

@@ -8,6 +8,7 @@ import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
@@ -53,6 +54,9 @@ public class FlowConcurrencyCaseTest {
@Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue;
@Inject
private ExecutionService executionService;
public void flowConcurrencyCancel() throws TimeoutException, QueueException, InterruptedException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
Execution execution2 = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel");
@@ -278,6 +282,161 @@ public class FlowConcurrencyCaseTest {
assertThat(terminated.getState().getCurrent()).isEqualTo(Type.SUCCESS);
}
public void flowConcurrencyQueueRestarted() throws Exception {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-fail", null, null, Duration.ofSeconds(30));
Flow flow = flowRepository
.findById(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-fail", Optional.empty())
.orElseThrow();
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
executionQueue.emit(execution2);
assertThat(execution1.getState().isRunning()).isTrue();
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CREATED);
var executionResult1 = new AtomicReference<Execution>();
var executionResult2 = new AtomicReference<Execution>();
CountDownLatch latch1 = new CountDownLatch(2);
AtomicReference<Execution> failedExecution = new AtomicReference<>();
CountDownLatch latch2 = new CountDownLatch(1);
CountDownLatch latch3 = new CountDownLatch(1);
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
if (e.getLeft().getId().equals(execution1.getId())) {
executionResult1.set(e.getLeft());
if (e.getLeft().getState().getCurrent() == Type.FAILED) {
failedExecution.set(e.getLeft());
latch1.countDown();
}
}
if (e.getLeft().getId().equals(execution2.getId())) {
executionResult2.set(e.getLeft());
if (e.getLeft().getState().getCurrent() == State.Type.RUNNING) {
latch2.countDown();
}
if (e.getLeft().getState().getCurrent() == Type.FAILED) {
latch3.countDown();
}
}
});
assertTrue(latch2.await(1, TimeUnit.MINUTES));
assertThat(failedExecution.get()).isNotNull();
// here the first fail and the second is now running.
// we restart the first one, it should be queued then fail again.
Execution restarted = executionService.restart(failedExecution.get(), null);
executionQueue.emit(restarted);
assertTrue(latch3.await(1, TimeUnit.MINUTES));
assertTrue(latch1.await(1, TimeUnit.MINUTES));
receive.blockLast();
assertThat(executionResult1.get().getState().getCurrent()).isEqualTo(Type.FAILED);
// it should have been queued after restarted
assertThat(executionResult1.get().getState().getHistories().stream().anyMatch(history -> history.getState() == Type.RESTARTED)).isTrue();
assertThat(executionResult1.get().getState().getHistories().stream().anyMatch(history -> history.getState() == Type.QUEUED)).isTrue();
assertThat(executionResult2.get().getState().getCurrent()).isEqualTo(Type.FAILED);
assertThat(executionResult2.get().getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
assertThat(executionResult2.get().getState().getHistories().get(1).getState()).isEqualTo(State.Type.QUEUED);
assertThat(executionResult2.get().getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
}
public void flowConcurrencyQueueAfterExecution() throws TimeoutException, QueueException, InterruptedException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-after-execution", null, null, Duration.ofSeconds(30));
Flow flow = flowRepository
.findById(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-after-execution", Optional.empty())
.orElseThrow();
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
executionQueue.emit(execution2);
assertThat(execution1.getState().isRunning()).isTrue();
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CREATED);
var executionResult1 = new AtomicReference<Execution>();
var executionResult2 = new AtomicReference<Execution>();
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
CountDownLatch latch3 = new CountDownLatch(1);
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
if (e.getLeft().getId().equals(execution1.getId())) {
executionResult1.set(e.getLeft());
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
latch1.countDown();
}
}
if (e.getLeft().getId().equals(execution2.getId())) {
executionResult2.set(e.getLeft());
if (e.getLeft().getState().getCurrent() == State.Type.RUNNING) {
latch2.countDown();
}
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
latch3.countDown();
}
}
});
assertTrue(latch1.await(1, TimeUnit.MINUTES));
assertTrue(latch2.await(1, TimeUnit.MINUTES));
assertTrue(latch3.await(1, TimeUnit.MINUTES));
receive.blockLast();
assertThat(executionResult1.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(executionResult2.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(executionResult2.get().getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
assertThat(executionResult2.get().getState().getHistories().get(1).getState()).isEqualTo(State.Type.QUEUED);
assertThat(executionResult2.get().getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
}
public void flowConcurrencySubflow() throws TimeoutException, QueueException, InterruptedException {
CountDownLatch successLatch = new CountDownLatch(1);
CountDownLatch canceledLatch = new CountDownLatch(1);
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
if (e.getLeft().getFlowId().equals("flow-concurrency-cancel")) {
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
successLatch.countDown();
}
if (e.getLeft().getState().getCurrent() == Type.CANCELLED) {
canceledLatch.countDown();
}
}
// FIXME we should fail if we receive the cancel execution again but on Kafka it happens
});
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-subflow", null, null, Duration.ofSeconds(30));
Execution execution2 = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-subflow");
assertThat(execution1.getState().isRunning()).isTrue();
assertThat(execution2.getState().getCurrent()).isEqualTo(Type.SUCCESS);
// assert we have one canceled subflow and one in success
assertTrue(canceledLatch.await(1, TimeUnit.MINUTES));
assertTrue(successLatch.await(1, TimeUnit.MINUTES));
receive.blockLast();
// run another execution to be sure that everything work (purge is correctly done)
CountDownLatch newSuccessLatch = new CountDownLatch(1);
Flux<Execution> secondReceive = TestsUtils.receive(executionQueue, e -> {
if (e.getLeft().getFlowId().equals("flow-concurrency-cancel")) {
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
newSuccessLatch.countDown();
}
}
// FIXME we should fail if we receive the cancel execution again but on Kafka it happens
});
Execution execution3 = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-subflow");
assertThat(execution3.getState().getCurrent()).isEqualTo(Type.SUCCESS);
// assert we have two successful subflow
assertTrue(newSuccessLatch.await(1, TimeUnit.MINUTES));
secondReceive.blockLast();
}
private URI storageUpload() throws URISyntaxException, IOException {
File tempFile = File.createTempFile("file", ".txt");

View File

@@ -83,4 +83,24 @@ class RunContextPropertyTest {
runContextProperty = new RunContextProperty<>(Property.<Map<String, String>>builder().expression("{ \"key\": \"{{ key }}\"}").build(), runContext);
assertThat(runContextProperty.asMap(String.class, String.class, Map.of("key", "value"))).containsEntry("key", "value");
}
@Test
void asShouldReturnCachedRenderedProperty() throws IllegalVariableEvaluationException {
var runContext = runContextFactory.of();
var runContextProperty = new RunContextProperty<>(Property.<String>builder().expression("{{ variable }}").build(), runContext);
assertThat(runContextProperty.as(String.class, Map.of("variable", "value1"))).isEqualTo(Optional.of("value1"));
assertThat(runContextProperty.as(String.class, Map.of("variable", "value2"))).isEqualTo(Optional.of("value1"));
}
@Test
void asShouldNotReturnCachedRenderedPropertyWithSkipCache() throws IllegalVariableEvaluationException {
var runContext = runContextFactory.of();
var runContextProperty = new RunContextProperty<>(Property.<String>builder().expression("{{ variable }}").build(), runContext);
assertThat(runContextProperty.as(String.class, Map.of("variable", "value1"))).isEqualTo(Optional.of("value1"));
assertThat(runContextProperty.skipCache().as(String.class, Map.of("variable", "value2"))).isEqualTo(Optional.of("value2"));
}
}

View File

@@ -140,7 +140,7 @@ class RunContextTest {
List<LogEntry> logs = new CopyOnWriteArrayList<>();
Flux<LogEntry> receive = TestsUtils.receive(workerTaskLogQueue, either -> logs.add(either.getLeft()));
char[] chars = new char[1024 * 11];
char[] chars = new char[1024 * 16];
Arrays.fill(chars, 'a');
Map<String, Object> inputs = new HashMap<>(InputsTest.inputs);

View File

@@ -3,7 +3,6 @@ package io.kestra.core.runners;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.micronaut.context.annotation.Property;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

View File

@@ -0,0 +1,270 @@
package io.kestra.core.runners;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.secret.SecretNotFoundException;
import io.kestra.core.secret.SecretService;
import io.micronaut.test.annotation.MockBean;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/**
* Unit tests for SecureVariableRendererFactory.
*
* This class tests the factory's ability to create debug renderers that:
* - Properly mask secret functions
* - Maintain security by preventing secret value leakage
* - Delegate to the base renderer for non-secret operations
* - Handle errors appropriately
*/
@KestraTest
class SecureVariableRendererFactoryTest {
@Inject
private SecureVariableRendererFactory secureVariableRendererFactory;
@Inject
private VariableRenderer renderer;
@MockBean(SecretService.class)
SecretService testSecretService() {
return new SecretService() {
@Override
public String findSecret(String tenantId, String namespace, String key) throws SecretNotFoundException, IOException {
return switch (key) {
case "MY_SECRET" -> "my-secret-value-12345";
case "API_KEY" -> "api-key-value-67890";
case "DB_PASSWORD" -> "db-password-secret";
case "TOKEN" -> "token-value-abc123";
case "KEY1" -> "secret-value-1";
case "KEY2" -> "secret-value-2";
case "JSON_SECRET" -> "{\"api_key\": \"secret123\", \"token\": \"token456\"}";
default -> throw new SecretNotFoundException("Secret not found: " + key);
};
}
};
}
@Test
void shouldCreateDebugRenderer() {
// When
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
// Then
assertThat(debugRenderer).isNotNull();
}
@Test
void shouldCreateDebugRendererThatIsNotSameAsBaseRenderer() {
// When
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
// Then
assertThat(debugRenderer).isNotSameAs(renderer);
}
@Test
void shouldCreateDebugRendererThatMasksSecrets() throws IllegalVariableEvaluationException {
// Given
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
Map<String, Object> context = Map.of(
"flow", Map.of("namespace", "io.kestra.unittest")
);
// When
String result = debugRenderer.render("{{ secret('MY_SECRET') }}", context);
// Then
assertThat(result).isEqualTo("******");
assertThat(result).doesNotContain("my-secret-value-12345");
}
@Test
void shouldCreateDebugRendererThatMasksMultipleSecrets() throws IllegalVariableEvaluationException {
// Given
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
Map<String, Object> context = Map.of(
"flow", Map.of("namespace", "io.kestra.unittest")
);
// When
String result = debugRenderer.render(
"API: {{ secret('API_KEY') }}, DB: {{ secret('DB_PASSWORD') }}, Token: {{ secret('TOKEN') }}",
context
);
// Then
assertThat(result).isEqualTo("API: ******, DB: ******, Token: ******");
assertThat(result).doesNotContain("api-key-value-67890");
assertThat(result).doesNotContain("db-password-secret");
assertThat(result).doesNotContain("token-value-abc123");
}
@Test
void shouldCreateDebugRendererThatDoesNotMaskNonSecretVariables() throws IllegalVariableEvaluationException {
// Given
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
Map<String, Object> context = Map.of(
"username", "testuser",
"email", "test@example.com",
"count", 42
);
// When
String result = debugRenderer.render(
"User: {{ username }}, Email: {{ email }}, Count: {{ count }}",
context
);
// Then
assertThat(result).isEqualTo("User: testuser, Email: test@example.com, Count: 42");
}
@Test
void shouldCreateDebugRendererThatMasksOnlySecretFunctions() throws IllegalVariableEvaluationException {
// Given
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
Map<String, Object> context = Map.of(
"flow", Map.of("namespace", "io.kestra.unittest"),
"username", "testuser",
"environment", "production"
);
// When
String result = debugRenderer.render(
"User: {{ username }}, Env: {{ environment }}, Secret: {{ secret('MY_SECRET') }}",
context
);
// Then
assertThat(result).isEqualTo("User: testuser, Env: production, Secret: ******");
assertThat(result).contains("testuser");
assertThat(result).contains("production");
assertThat(result).doesNotContain("my-secret-value-12345");
}
@Test
void shouldCreateDebugRendererThatHandlesMissingSecrets() {
// Given
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
Map<String, Object> context = Map.of(
"flow", Map.of("namespace", "io.kestra.unittest")
);
// When/Then
assertThatThrownBy(() -> debugRenderer.render("{{ secret('NON_EXISTENT_SECRET') }}", context))
.isInstanceOf(IllegalVariableEvaluationException.class)
.hasMessageContaining("Secret not found: NON_EXISTENT_SECRET");
}
@Test
void shouldCreateDebugRendererThatMasksSecretsInComplexExpressions() throws IllegalVariableEvaluationException {
// Given
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
Map<String, Object> context = Map.of(
"flow", Map.of("namespace", "io.kestra.unittest")
);
// When
String result = debugRenderer.render(
"{{ 'API Key: ' ~ secret('API_KEY') }}",
context
);
// Then
assertThat(result).isEqualTo("API Key: ******");
assertThat(result).doesNotContain("api-key-value-67890");
}
@Test
void shouldCreateDebugRendererThatMasksSecretsInConditionals() throws IllegalVariableEvaluationException {
// Given
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
Map<String, Object> context = Map.of(
"flow", Map.of("namespace", "io.kestra.unittest")
);
// When
String result = debugRenderer.render(
"{{ secret('MY_SECRET') is defined ? 'Secret exists' : 'No secret' }}",
context
);
// Then
assertThat(result).isEqualTo("Secret exists");
assertThat(result).doesNotContain("my-secret-value-12345");
}
@Test
void shouldCreateDebugRendererThatMasksSecretsWithSubkeys() throws IllegalVariableEvaluationException {
// Given
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
Map<String, Object> context = Map.of(
"flow", Map.of("namespace", "io.kestra.unittest")
);
// When
String result = debugRenderer.render(
"{{ secret('JSON_SECRET', subkey='api_key') }}",
context
);
// Then
assertThat(result).isEqualTo("******");
assertThat(result).doesNotContain("secret123");
}
@Test
void shouldCreateDebugRendererThatHandlesEmptyContext() throws IllegalVariableEvaluationException {
// Given
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
Map<String, Object> emptyContext = Map.of();
// When
String result = debugRenderer.render("Hello World", emptyContext);
// Then
assertThat(result).isEqualTo("Hello World");
}
@Test
void shouldCreateDebugRendererThatHandlesNullValues() throws IllegalVariableEvaluationException {
// Given
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
Map<String, Object> context = Map.of(
"value", "test"
);
// When
String result = debugRenderer.render("{{ value }}", context);
// Then
assertThat(result).isEqualTo("test");
}
@Test
void shouldCreateDebugRendererThatMasksSecretsInNestedRender() throws IllegalVariableEvaluationException {
// Given
VariableRenderer debugRenderer = secureVariableRendererFactory.createOrGet();
Map<String, Object> context = Map.of(
"flow", Map.of("namespace", "io.kestra.unittest")
);
// When - Using concatenation to avoid immediate evaluation
String result = debugRenderer.render(
"{{ render('{{s'~'ecret(\"MY_SECRET\")}}') }}",
context
);
// Then
assertThat(result).isEqualTo("******");
assertThat(result).doesNotContain("my-secret-value-12345");
}
}

View File

@@ -5,6 +5,7 @@ import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.queues.QueueException;
@@ -18,6 +19,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -77,8 +79,12 @@ public class TaskCacheTest {
@Plugin
public static class CounterTask extends Task implements RunnableTask<CounterTask.Output> {
private String workingDir;
@Override
public Output run(RunContext runContext) throws Exception {
Map<String, Object> variables = Map.of("workingDir", runContext.workingDir().path().toString());
runContext.render(this.workingDir, variables);
return Output.builder()
.counter(COUNTER.incrementAndGet())
.build();

Some files were not shown because too many files have changed in this diff Show More