Compare commits

...

144 Commits

Author SHA1 Message Date
Roman Acevedo
2eed738b83 ci: add skip test param to pre-release.yml 2025-10-28 17:54:26 +01:00
brian.mulier
5e2609ce5e chore(version): update to version '1.0.8' 2025-10-28 14:37:22 +01:00
Florian Hussonnois
86f909ce93 fix(flows): KV pebble expressions with input defaults (#12314)
Fixes: #12314
2025-10-28 14:32:44 +01:00
Loïc Mathieu
a8cb28a127 fix(executions): remove errors and finally tasks when restarting
Otherwize we would detect that an error or a finally branch is processing and the flowable state would not be correctly taken.

Moreover, it prevent this branch to be taken again after a restart.

Fixes #11731
2025-10-28 14:30:27 +01:00
brian.mulier
0fe9ba3e13 fix(tests): was missing some utils 2025-10-28 12:31:59 +01:00
brian-mulier-p
40f5aadd1a fix(kv): don't throw in KV function with errorOnMissing=false for expired kv (#12321)
closes #12294
2025-10-24 11:42:02 +02:00
Bart Ledoux
ceac25429a fix(ui): update ui-libs to make docs work
closes #12252
2025-10-23 12:24:13 +02:00
Bart Ledoux
4144d9fbb1 build: avoid using posthog in development 2025-10-23 12:21:41 +02:00
Florian Hussonnois
9cc7d45f74 fix(core): allow secrets to be render for multiselect (#12045)
Fix: #12045
2025-10-23 11:32:21 +02:00
Florian Hussonnois
81ee330b9e fix(core): ignore not found plugin types for schema generation 2025-10-23 11:32:10 +02:00
Hemant M Mehta
5382655a2e fix: file-download-issue (#11774)
* fix: file-download-issue

closes: #11569

* fix: test case

Signed-off-by: Hemant M Mehta <hemant29mehta@gmail.com>

---------

Signed-off-by: Hemant M Mehta <hemant29mehta@gmail.com>
2025-10-22 11:49:54 +02:00
github-actions[bot]
483f7dc3b2 chore(version): update to version '1.0.7' 2025-10-21 12:03:05 +00:00
Piyush Bhaskar
3c2da63837 fix(core): handle 404 error in kv retrieval with message (#12191) 2025-10-21 15:19:47 +05:30
Nicolas K.
31527891b2 feat(flows): add truncate parameter for log shipper (#12131)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-10-21 11:06:51 +02:00
Roman Acevedo
6364f419d9 fix(flows): allow using OSS CLI to deploy EE flows
- fixes https://github.com/kestra-io/kestra-ee/issues/5490
2025-10-21 09:33:15 +02:00
Irfan
3c14432412 feat(plugins): enhance documentation request handling to prevent unnecessary reloads (#11911)
Co-authored-by: Barthélémy Ledoux <ledouxb@me.com>
Co-authored-by: iitzIrFan <irfanlhawk@gmail.com>
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
Co-authored-by: Bart Ledoux <bledoux@kestra.io>
2025-10-17 11:48:25 +02:00
YannC
eaea4f5012 Fix/validate endpoint fix (#12121)
* fix: validateTask & validateTrigger endpoint changes for SDK

* fix: validateTask & validateTrigger endpoint changes for SDK
2025-10-17 11:12:18 +02:00
Roman Acevedo
d43390a579 fix(flows): allow using OSS CLI to validate EE flows (#12104)
* fix(flows): allow using OSS CLI to validate EE flows

https://github.com/kestra-io/kestra/pull/12047 was not enough

- fixxes https://github.com/kestra-io/kestra-ee/issues/5455

* f
2025-10-16 19:34:02 +02:00
Roman Acevedo
2404c36d35 fix(flows): allow using OSS CLI to validate EE flows
- fixes https://github.com/kestra-io/kestra-ee/issues/5455
2025-10-16 18:55:39 +02:00
Miloš Paunović
bdbd217171 fix(iam): prevent infinite loop when permissions are missing while loading custom blueprints (#12092)
Closes https://github.com/kestra-io/kestra-ee/issues/5405.
2025-10-16 14:39:05 +02:00
brian-mulier-p
019c16af3c feat(ai): add PEM Certificate handling to GeminiAiService (#11739)
closes kestra-io/kestra-ee#5342
2025-10-15 14:13:19 +02:00
Hemant M Mehta
ff7d7c6a0b fix(executions): properly handle filename with special chars (#11814)
* fix: artifact-filename-validation

closes: #10802

* fix: test

Signed-off-by: Hemant M Mehta <hemant29mehta@gmail.com>

* fix: test

Signed-off-by: Hemant M Mehta <hemant29mehta@gmail.com>

* fix: test

* fix(core): use deterministic file naming in FilesService

---------

Signed-off-by: Hemant M Mehta <hemant29mehta@gmail.com>
2025-10-15 09:28:53 +02:00
github-actions[bot]
1042be87da chore(version): update to version '1.0.6' 2025-10-14 12:30:55 +00:00
brian-mulier-p
104805d780 fix(flows): pebble autocompletion performance optimization (#11981)
closes #11881
2025-10-14 11:37:46 +02:00
YannC
33c8e54f36 Fix: openapi tweaks (#11929)
* fix: added some on @ApiResponse annotation + added nullable annotation for TaskRun class

* fix: review changes
2025-10-13 18:05:38 +02:00
nKwiatkowski
ff2e00d1ca feat(tests): add flaky tests handling 2025-10-13 17:06:28 +02:00
brian-mulier-p
0fe3f317c7 feat(runners): add syncWorkingDirectory property to remote task runners (#11945)
part of kestra-io/kestra-ee#4761
2025-10-13 11:35:52 +02:00
brian-mulier-p
f753d15c91 feat(runners): add syncWorkingDirectory property to remote task runners (#11602)
part of kestra-io/kestra-ee#4761
2025-10-13 11:35:52 +02:00
brian-mulier-p
c03e31de68 fix(ai): remove thoughts return from AI Copilot (#11935)
closes kestra-io/kestra-ee#5422
2025-10-13 09:56:11 +02:00
Miloš Paunović
9a79f9a64c feat(flows): save editor panel layout after creation (#11276)
Closes https://github.com/kestra-io/kestra/issues/9887.

Co-authored-by: Bart Ledoux <bledoux@kestra.io>
2025-10-10 12:59:11 +02:00
github-actions[bot]
41468652d4 chore(version): update to version '1.0.5' 2025-10-09 14:03:47 +00:00
Loïc Mathieu
bc182277de fix(system): refactor concurrency limit to use a counter
A counter allow to lock by flow which solves the race when two executions are created at the same time and the executoion_runnings table is empty.

Evaluating concurrency limit on the main executionQueue method also avoid an unexpected behavior where the CREATED execution is processed twice as its status didn't change immediatly when QUEUED.

Closes https://github.com/kestra-io/kestra-ee/issues/4877
2025-10-09 15:40:44 +02:00
Roman Acevedo
8c2271089c test: re enabling shouldGetReport, unflaky it with fixed date 2025-10-08 13:21:06 +02:00
Sanket Mundra
9973a2120b fix(backend): failing /resume/validate endpoint for integer label values (#11688)
* fix: cast label values to string

* fix: use findByIdWithSourceWithoutAcl() instead of findByIdWithoutAcl() and add test

* remove unwanted files
2025-10-08 10:13:31 +02:00
Roman Acevedo
bdfd038d40 ci: change Dockerfile.pr to dynamic version 2025-10-07 19:03:09 +02:00
YannC
a3fd734082 fix: modify annotations to improve openapi spec file generated (#11785) 2025-10-07 16:41:45 +02:00
github-actions[bot]
553a1d5389 chore(version): update to version '1.0.4' 2025-10-07 13:22:11 +00:00
Florian Hussonnois
c58aca967b fix(core): decrypt input secrets passed to exec (#11681) 2025-10-07 12:05:46 +02:00
Florian Hussonnois
27dcf60770 fix(core): obfuscate secrets used as default inputs (#11681)
Make sure values return from pebble function are obfuscate
when return from the input validation endpoints.

Changes:
* UI: Don't send default input values when creating new execution

Fixes: #11681
2025-10-07 12:05:46 +02:00
Roman Acevedo
4e7c75232a test: remove findByNamespace and findDistinctNamespace
they are too hard to maintain
2025-10-07 11:13:31 +02:00
Florian Hussonnois
f452da7ce1 fix(core): catch any exception on schema generation 2025-10-07 09:29:12 +02:00
Florian Hussonnois
43401c5017 fix(core): properly publish CrudEvent for killed execution
Fixes: kestra-io/kestra-ee#5165
2025-10-07 09:29:01 +02:00
Roman Acevedo
067b110cf0 ci: forgot to remove (now unused) actions 2025-10-06 17:40:13 +02:00
Florian Hussonnois
4ceff83a28 fix(core): use primary pebble renderer with masked functions (#11535)
Extract a PebbleEngineFactory class and refactor VariableRenderer to
support engine injection via setter; Delete DebugVariableRenderer.

Fixes: #11535
2025-10-06 17:36:01 +02:00
hemanthsavasere
5026afe5bf refactor(tests): remove outdated README for SecureVariableRendererFactory tests 2025-10-06 17:35:51 +02:00
hemanthsavasere
3c899fcb2f feat(tests): add comprehensive tests for SecureVariableRendererFactory to ensure secret masking functionality 2025-10-06 17:35:26 +02:00
hemanthsavasere
cee412ffa9 feat(execution): add secure variable renderer factory for debug mode
Introduce SecureVariableRendererFactory to create debug renderer instances that wrap the base renderer while maintaining security by masking sensitive functions. This provides a consistent way to handle variable rendering in debug contexts.
2025-10-06 17:35:12 +02:00
Roman Acevedo
3a57a683be ci: migrate CI to kestra-io/actions
- advance on https://github.com/kestra-io/kestra-ee/issues/5363
2025-10-06 17:32:49 +02:00
Roman Acevedo
a0b9de934e fix(kv): revert BC renaming of listKeysWithInheritence 2025-10-06 12:37:30 +02:00
Roman Acevedo
d677317cc5 fix(executions): try to mitigate SSE and debug log SSE errors
- advance on https://github.com/kestra-io/kestra/issues/11608
2025-10-06 12:27:01 +02:00
mustafatarek
9e661195e5 refactor: change iteration to start with 0 2025-10-06 11:29:47 +02:00
mustafatarek
09c921bee5 fix(core): fix ForEach plugin task.iteration property to show the correct number of Iteration 2025-10-06 11:29:12 +02:00
Carlos Longhi
d21ec4e899 fix(core): amend the code color variable value for light mode (#11736)
Closes https://github.com/kestra-io/kestra/issues/11682.

Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-10-06 10:45:52 +02:00
Sandip Mandal
efdb25fa97 chore(core): make sure kv listing is filterable (#11536)
Closes https://github.com/kestra-io/kestra/issues/11413.

Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-10-04 09:37:46 +02:00
Loïc Mathieu
37bdcc342c fix(executions): purge executions by 100 by default
As 500 may be too much if executions are huge as the batch will be loaded in memory.
2025-10-03 17:00:43 +02:00
Loïc Mathieu
6d35f2b7a6 Revert "fix(core): properly encode filenames with spaces in URI (#11599)"
This reverts commit d02fd53287.
2025-10-03 16:57:00 +02:00
Loïc Mathieu
fe46ddf381 fix(system): compilation issue 2025-10-03 16:17:15 +02:00
Loïc Mathieu
359dc9adc0 feat(executions): improve performance of PurgeExecutions by batch deleting executions, logs and metrics
Closes #11680
2025-10-03 15:30:26 +02:00
Miloš Paunović
39c930124f fix(core): amend add/edit actions from topology view (#11589)
Closes https://github.com/kestra-io/kestra/issues/11408.
Closes https://github.com/kestra-io/kestra/issues/11417.
2025-10-03 14:54:34 +02:00
brian.mulier
1686fc3b4e fix(tests): new namespace was introduced 2025-10-03 14:47:04 +02:00
Loïc Mathieu
03ff25ff55 fix(system): potential NPE in Execution.withTaskRun()
This should never happen as normally we should have taskrun already in place whenever we call this method.

But a user report seeing it and I also already seen it once or two. I think it can happen when there is an unexpected event (like a restart or a bug somewhere else that lead to an execution in an unexpected state) so it's better to fix it to be more resilient.

Fixes #11703
2025-10-03 14:33:58 +02:00
Vedant794
d02fd53287 fix(core): properly encode filenames with spaces in URI (#11599)
* Fix the issue of downloading the file with space in name

* fix(core): encode filenames with spaces in URI and add test

* fix: Indent Issue and remove the empty unnecessary lines

* Resolve the error in DownloadFileTest

* Fix: DownloadFileTest issue

* resolve the weirdName issue
2025-10-03 14:22:19 +02:00
brian.mulier
6c16bbe853 chore(deps): bump langchain4j from 1.6.0 to 1.7.1 2025-10-03 12:06:32 +02:00
Loïc Mathieu
aa7a473d49 fix(executions): evaluate multiple conditions in a separate queue
By evaluating multiple condition in a separate queue, we serialize their evaluation which avoir races when we compute the outputs for flow triggers.
This is because evaluation is a multi step process: first you get the existing condtion, then you evaluate, then you store the result. As this is not guarded by a lock you must not do it concurrently.

The race can still occurs if muiltiple executors run but this is less probable. A re-implementation would be needed probably in 2.0 for that.

Fixes https://github.com/kestra-io/kestra-ee/issues/4602
2025-10-03 11:11:46 +02:00
brian-mulier-p
95133ebc40 fix(core): avoid crashing UI in case of multiline function autocomplete (#11684) 2025-10-03 09:36:55 +02:00
YannC.
54482e1d06 fix: missing import 2025-10-03 09:22:13 +02:00
YannC
54b7811812 fix: set Label schema definition as list of label only, deprecate old… (#11648)
* fix: set Label schema definition as list of label only, deprecate old serdes for it and add schema definition for label

related to kestra-io/client-sdk#62

* fix: Modified the @Schema to avoid remove the map.class definition in schema annotation
2025-10-03 09:05:43 +02:00
YannC
050ad60a95 fix: use filters query instead of deprecated prop to filter by triggerExecutionId when clicking on failed execution of a ForEachItem (#11690) 2025-10-02 23:51:46 +02:00
mustafatarek
030627ba7b refactor(kv): update namespace filtering for readability 2025-10-02 18:18:19 +02:00
mustafatarek
c06ef7958f fix(test): update test assertion for listKeysWithInheritance() to be on ancestor keys only 2025-10-02 18:18:12 +02:00
mustafatarek
692d046289 fix(core): exclude current namespace in listKeysWithInheritance
- Returns only ancestor namespaces
- Handles single-level namespace edge case
- Verified with KVControllerTest
2025-10-02 18:18:06 +02:00
Loïc Mathieu
92c1f04ec0 fix(flows): flow validation could NPE when the id is not set
This is because contains on an unmodified collection throws NPE is the param is null
2025-10-01 16:47:02 +02:00
Loïc Mathieu
9e11d5fe5e fix(system): compilation issue 2025-10-01 12:21:47 +02:00
Loïc Mathieu
14952c9457 fix(executions): killing queued exec. didn't respect concurrency limit
There was two issues here:
- When killing a queued execution, the associated ExecutionQueued record was not deleted
- When terminating a killed execution that has concurrency limit, we poped an execution even if the execution was not running (no associated ExecutionRunning record) which may exceed concurrency limit

Fixes #11574

I also fix the TestRunnerUtils that should test the predicate before returning the last execution not after.
2025-10-01 12:16:04 +02:00
Loïc Mathieu
ae314c301d chore(system): move the SkipExecution service to the services package
It was there before so it will be easier to backport the change if it moves there.
2025-10-01 11:45:27 +02:00
Loïc Mathieu
f8aa5fb6ba feat(system): allow to skip an indexer record
Part-of: https://github.com/kestra-io/kestra-ee/issues/5263
2025-10-01 11:45:15 +02:00
MilosPaunovic
c87d7e4da0 refactor(logs): remove empy line 2025-10-01 09:20:35 +02:00
yuri
c928f1d822 chore(logs): make search queries case-insensitive (#11313)
Execution logs' filter query used to be case-sensitive - for example, the `hello` query did not match `Hello World` log lines.
2025-10-01 09:19:49 +02:00
YannC.
baa07dd02b fix: disabled flakky test shouldGetReport 2025-09-30 13:16:48 +02:00
github-actions[bot]
260cb50651 chore(version): update to version '1.0.3' 2025-09-30 07:07:34 +00:00
YannC
0a45325c69 fix(ui): avoid having a authentication dialog open when credentials are wrong (#11576) 2025-09-30 09:00:55 +02:00
Florian Hussonnois
c2522e2544 fix(triggers): do not resolve recoverMissedSchedule when enabling back a trigger
Add some refactoring to allow some methods to be overrided
2025-09-29 20:43:35 +02:00
Florian Hussonnois
27476279ae fix(triggers): handle RecoverMissedSchedules on trigger batch update
* Fix and clean code in TriggerController
* Remove duplicate code in Trigger class
2025-09-29 20:43:34 +02:00
YannC.
3cc6372cb5 fix: missing import due to backport 2025-09-29 18:09:25 +02:00
YannC
5f6e9dbe06 fix(dashboard): show startDate instead of duration in defaults, and avoid formatting date in JDBC if there is no aggregations (#11467)
close #5867
2025-09-29 17:51:36 +02:00
yuri1969
5078ce741d fix(core): enable runIf at execution updating tasks 2025-09-25 14:46:08 +02:00
github-actions[bot]
b7e17b7114 chore(version): update to version '1.0.2' 2025-09-24 08:03:43 +00:00
nKwiatkowski
acaee34b0e chore(version): update to version '1.0.1' 2025-09-24 10:03:23 +02:00
github-actions[bot]
1d78332505 chore(version): update to version '1.0.2' 2025-09-24 08:02:25 +00:00
nKwiatkowski
7249632510 fix(tests): disable flaky test that prevent the release 2025-09-24 10:01:43 +02:00
Sanjay Ramsinghani
4a66a08c3b chore(core): align toggle icon in failed execution collapse element (#11430)
Closes https://github.com/kestra-io/kestra/issues/11406.

Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-09-23 14:20:10 +02:00
Antoine Gauthier
22fd6e97ea chore(logs): display copy button only on row hover (#11254)
Closes https://github.com/kestra-io/kestra/issues/11220.

Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-09-23 14:18:34 +02:00
Jaem Dessources
9afd86d32b fix(core): align copy logs button to each row’s right edge (#11216)
Closes https://github.com/kestra-io/kestra/issues/10898.

Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-09-23 14:18:28 +02:00
github-actions[bot]
797ea6c9e4 chore(version): update to version '1.0.2' 2025-09-23 12:10:01 +00:00
nKwiatkowski
07d5e815c4 chore(version): update to version '1.0.1' 2025-09-23 14:09:38 +02:00
github-actions[bot]
33ac9b1495 chore(version): update to version '1.0.2' 2025-09-23 09:22:01 +00:00
Bart Ledoux
4d5b95d040 chore: update package-lock 2025-09-23 11:17:48 +02:00
brian-mulier-p
667aca7345 fix(ai): avoid moving cursor twice after using AI Copilot (#11451)
closes #11314
2025-09-23 10:40:32 +02:00
brian.mulier
e05cc65202 fix(system): avoid trigger locking after scheduler restart
closes #11434
2025-09-22 18:40:22 +02:00
brian.mulier
71b606c27c fix(ci): same CI as develop 2025-09-22 18:40:19 +02:00
Florian Hussonnois
47f9f12ce8 chore(websever): make kvStore method in KVController protected
Related-to: kestra-io/kestra-ee#5055
2025-09-22 13:57:59 +02:00
Florian Hussonnois
01acae5e97 feat(core): add new findMetadataAndValue to KVStore
Related-to: kestra-io/kestra-ee#5055
2025-09-22 13:57:58 +02:00
Florian Hussonnois
e5878f08b7 fix(core): fix NPE in JackMapping.applyPatchesOnJsonNode method 2025-09-22 13:57:57 +02:00
brian-mulier-p
0bcb6b4e0d fix(tests): enforce closing consumers after each tests (#11399) 2025-09-19 16:35:23 +02:00
brian-mulier-p
3c2ecf4342 fix(core): avoid ClassCastException when doing secret decryption (#11393)
closes kestra-io/kestra-ee#5191
2025-09-19 11:32:27 +02:00
Piyush Bhaskar
3d4f66772e fix(core: webhook curl coomand needs tenant. 2025-09-19 14:17:00 +05:30
Sandip Mandal
e2afd4bcc3 fix(core: webhook curl coomand needs tenant. (#11391)
Co-authored-by: Piyush Bhaskar <102078527+Piyush-r-bhaskar@users.noreply.github.com>
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-09-19 14:10:36 +05:30
Loïc Mathieu
d143097f03 fix(executions): computing subflow outputs could fail when the executioin is failing or killing
Fixes https://github.com/kestra-io/kestra/issues/11379
2025-09-18 17:42:15 +02:00
Loïc Mathieu
72c0d91c1a fix(executions): concurrency limit should update the executioin
As if it's not updated in the database, it would not be detected as changed so that terminal actions (like purge) would not be done.

Fixes  #11022
Fixes #11025
Fixes #8143
2025-09-18 12:10:36 +02:00
Loïc Mathieu
1d692e56b0 fix(executions): the Exit task was not correctly ends parent tasks
Fixes https://github.com/kestra-io/kestra-ee/issues/5168
2025-09-18 11:39:16 +02:00
Miloš Paunović
0352d617ac chore(core): improve coloring scheme for dependencies graph (#11306) 2025-09-18 09:22:27 +02:00
Miloš Paunović
b41aa4e0b9 fix(core): adjust positioning of default tour elements (#11286)
The problem occurred when `No Code` was selected as the `Default Editor Type` in `Settings`. This `PR` resolves the issue.

Closes https://github.com/kestra-io/kestra/issues/9556.
2025-09-18 09:21:40 +02:00
Miloš Paunović
d811dc030b chore(core): ensure editor suggestion widget renders above other elements (#11258)
Closes https://github.com/kestra-io/kestra/issues/10702.
Closes https://github.com/kestra-io/kestra/issues/11033.
2025-09-18 09:21:18 +02:00
Miloš Paunović
105e62eee1 fix(namespaces): open details page at top (#11221)
Closes https://github.com/kestra-io/kestra/issues/10536.
2025-09-18 09:20:55 +02:00
Loïc Mathieu
28796862a4 fix(executions): possible NPE on dynamic taskrun
Fixes https://github.com/kestra-io/kestra-ee/issues/5166
2025-09-17 15:56:28 +02:00
brian.mulier
637cd794a4 fix(core): filters weren't applying anymore 2025-09-17 12:57:47 +02:00
Miloš Paunović
fdd5c6e63d chore(core): remove unused decompress library (#11346) 2025-09-17 11:15:43 +02:00
brian.mulier
eda2483ec9 fix(core): avoid filters from overlapping on other pages when changing query params 2025-09-17 10:37:58 +02:00
brian.mulier
7b3c296489 fix(core): avoid clearing filters when reclicking on current left menu item
closes #9476
2025-09-17 10:37:56 +02:00
brian.mulier
fe6f8b4ed9 fix(core): avoid undefined error on refresh chart 2025-09-17 10:37:04 +02:00
Roman Acevedo
17ff539690 ci: fix some non-release workflows were not using develop 2025-09-16 14:43:24 +02:00
Roman Acevedo
bbd0dda47e ci: readd back workflow-publish-docker.yml needed for release 2025-09-16 12:16:15 +02:00
github-actions[bot]
27a8e8b5a7 chore(version): update to version '1.0.1' 2025-09-16 10:00:39 +00:00
Roman Acevedo
d6620a34cd ci: try to use develop CI workflows 2025-09-16 11:38:34 +02:00
Loïc Mathieu
6f8b3c5cfd fix(flows): properly coompute flow dependencies with preconditions
When both upstream flows and where are set, it should be a AND between the two as dependencies must match the upstream flows.

Fixes #11164
2025-09-16 10:44:26 +02:00
Florian Hussonnois
6da6cbab60 fix(executions): add missing CrudEvent on purge execution
Related-to: kestra-io/kestra-ee#5061
2025-09-16 10:30:53 +02:00
Loïc Mathieu
a899e16178 fix(system): allow flattening a map with duplicated keys 2025-09-16 10:25:25 +02:00
Florian Hussonnois
568cd0b0c7 fix(core): fix CrudEvent model for DELETE operation
Refactor XxxRepository class to use new factory methods
from the CrudEvent class

Related-to: kestra-io/kestra-ee#4727
2025-09-15 18:51:36 +02:00
Loïc Mathieu
92e1dcb6eb fix(executions): truncate the execution_running table as in 0.24 there was an issue in the purge
This table contains executions for flows that have a concurrency that are currently running.
It has been added in 0.24 but in that release there was a bug that may prevent some records to being correctly removed from this table.
To fix that, we truncate it once.
2025-09-15 17:30:08 +02:00
brian-mulier-p
499e040cd0 fix(test): add tenant-in-path storage test (#11292)
part of kestra-io/storage-s3#166
2025-09-15 16:53:56 +02:00
brian-mulier-p
5916831d62 fix(security): enhance basic auth security (#11285)
closes kestra-io/kestra-ee#5111
2025-09-15 16:28:16 +02:00
Bart Ledoux
0b1b55957e fix: remove last uses of vuex as a store 2025-09-12 16:23:25 +02:00
Bart Ledoux
7ee40d376a flows: clear tasks list when last task is deleted 2025-09-12 16:15:36 +02:00
Florian Hussonnois
e2c9b3e256 fix(core): make CRC32 for plugin JARs lazy
Make CRC32 calculation for lazy plugin JAR files
to avoid excessive startup time and performance impact.

Avoid byte buffer reallocation while computing CRC32.
2025-09-12 14:02:23 +02:00
brian-mulier-p
556730777b fix(core): add ability to remap sort keys (#11233)
part of kestra-io/kestra-ee#5075
2025-09-12 09:44:32 +02:00
brian.mulier
c1a75a431f fix(ai): increase maxOutputToken default 2025-09-11 18:24:21 +02:00
brian-mulier-p
4a5b91667a fix(flows): avoid failing flow dependencies with dynamic defaults (#11166)
closes #11117
2025-09-10 16:15:04 +02:00
Roman Acevedo
f7b2af16a1 fix(flows): topology would not load when having many flows and cyclic relations
- this will probably fix https://github.com/kestra-io/kestra-ee/issues/4980

the issue was recursiveFlowTopology was returning a lot of duplicates, it was aggravated when having many Flows and multiple Flow triggers
2025-09-10 16:14:41 +02:00
Loïc Mathieu
9351cb22e0 fixsystem): always load netty from the app classloader
As Netty is used in core and a lot of plugins, and we already load project reactor from the app classloader that depends in Netty.

Fixes https://github.com/kestra-io/kestra-ee/issues/5038
2025-09-10 10:51:31 +02:00
brian-mulier-p
b1ecb82fdc fix(namespaces): avoid adding 'company.team' as default ns (#11174)
closes #11168
2025-09-09 17:14:27 +02:00
Miloš Paunović
c6d56151eb chore(flows): display correct flow dependency count (#11169)
Closes https://github.com/kestra-io/kestra/issues/11127.
2025-09-09 13:57:00 +02:00
François Delbrayelle
ed4398467a fix(outputs): open external file was not working (#11154) 2025-09-09 09:46:02 +02:00
brian-mulier-p
c51947419a chore(ci): add LTS tagging (#11131) 2025-09-08 14:10:53 +02:00
github-actions[bot]
ccb6a1f4a7 chore(version): update to version 'v1.0.0'. 2025-09-08 08:00:59 +00:00
275 changed files with 4813 additions and 4086 deletions

View File

@@ -1,29 +0,0 @@
name: 'Load Kestra Plugin List'
description: 'Composite action to load list of plugins'
inputs:
plugin-version:
description: "Kestra version"
default: 'LATEST'
required: true
plugin-file:
description: "File of the plugins"
default: './.plugins'
required: true
outputs:
plugins:
description: "List of all Kestra plugins"
value: ${{ steps.plugins.outputs.plugins }}
repositories:
description: "List of all Kestra repositories of plugins"
value: ${{ steps.plugins.outputs.repositories }}
runs:
using: composite
steps:
- name: Get Plugins List
id: plugins
shell: bash
run: |
PLUGINS=$([ -f ${{ inputs.plugin-file }} ] && cat ${{ inputs.plugin-file }} | grep "io\\.kestra\\." | sed -e '/#/s/^.//' | sed -e "s/LATEST/${{ inputs.plugin-version }}/g" | cut -d':' -f2- | xargs || echo '');
REPOSITORIES=$([ -f ${{ inputs.plugin-file }} ] && cat ${{ inputs.plugin-file }} | grep "io\\.kestra\\." | sed -e '/#/s/^.//' | cut -d':' -f1 | uniq | sort | xargs || echo '')
echo "plugins=$PLUGINS" >> $GITHUB_OUTPUT
echo "repositories=$REPOSITORIES" >> $GITHUB_OUTPUT

View File

@@ -1,20 +0,0 @@
name: 'Setup vars'
description: 'Composite action to setup common vars'
outputs:
tag:
description: "Git tag"
value: ${{ steps.vars.outputs.tag }}
commit:
description: "Git commit"
value: ${{ steps.vars.outputs.commit }}
runs:
using: composite
steps:
# Setup vars
- name: Set variables
id: vars
shell: bash
run: |
TAG=${GITHUB_REF#refs/*/}
echo "tag=${TAG}" >> $GITHUB_OUTPUT
echo "commit=$(git rev-parse --short "$GITHUB_SHA")" >> $GITHUB_OUTPUT

View File

@@ -1,67 +0,0 @@
name: Auto-Translate UI keys and create PR
on:
schedule:
- cron: "0 9-21/3 * * *" # Every 3 hours from 9 AM to 9 PM
workflow_dispatch:
inputs:
retranslate_modified_keys:
description: "Whether to re-translate modified keys even if they already have translations."
type: choice
options:
- "false"
- "true"
default: "false"
required: false
jobs:
translations:
name: Translations
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- uses: actions/checkout@v5
name: Checkout
with:
fetch-depth: 0
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.x"
- name: Install Python dependencies
run: pip install gitpython openai
- name: Generate translations
run: python ui/src/translations/generate_translations.py ${{ github.event.inputs.retranslate_modified_keys }}
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
- name: Set up Node
uses: actions/setup-node@v4
with:
node-version: "20.x"
- name: Set up Git
run: |
git config --global user.name "GitHub Action"
git config --global user.email "actions@github.com"
- name: Commit and create PR
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
BRANCH_NAME="chore/update-translations-$(date +%s)"
git checkout -b $BRANCH_NAME
git add ui/src/translations/*.json
if git diff --cached --quiet; then
echo "No changes to commit. Exiting with success."
exit 0
fi
git commit -m "chore(core): localize to languages other than english" -m "Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference."
git push -u origin $BRANCH_NAME || (git push origin --delete $BRANCH_NAME && git push -u origin $BRANCH_NAME)
gh pr create --title "Translations from en.json" --body $'This PR was created automatically by a GitHub Action.\n\nSomeone from the @kestra-io/frontend team needs to review and merge.' --base ${{ github.ref_name }} --head $BRANCH_NAME
- name: Check keys matching
run: node ui/src/translations/check.js

View File

@@ -1,85 +0,0 @@
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
name: "CodeQL"
on:
schedule:
- cron: '0 5 * * 1'
workflow_dispatch: {}
jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
# Override automatic language detection by changing the below list
# Supported options are ['csharp', 'cpp', 'go', 'java', 'javascript', 'python']
language: ['java', 'javascript']
# Learn more...
# https://docs.github.com/en/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#overriding-automatic-language-detection
steps:
- name: Checkout repository
uses: actions/checkout@v5
with:
# We must fetch at least the immediate parents so that if this is
# a pull request then we can checkout the head.
fetch-depth: 2
# If this run was triggered by a pull request event, then checkout
# the head of the pull request instead of the merge commit.
- run: git checkout HEAD^2
if: ${{ github.event_name == 'pull_request' }}
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v3
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main
# Set up JDK
- name: Set up JDK
uses: actions/setup-java@v5
if: ${{ matrix.language == 'java' }}
with:
distribution: 'temurin'
java-version: 21
- name: Setup gradle
if: ${{ matrix.language == 'java' }}
uses: gradle/actions/setup-gradle@v4
- name: Build with Gradle
if: ${{ matrix.language == 'java' }}
run: ./gradlew testClasses -x :ui:assembleFrontend
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
if: ${{ matrix.language != 'java' }}
uses: github/codeql-action/autobuild@v3
# Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
# and modify them (or add more) to build your code if your project
# uses a compiled language
#- run: |
# make bootstrap
# make release
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v3

View File

@@ -1,86 +0,0 @@
name: 'E2E tests revival'
description: 'New E2E tests implementation started by Roman. Based on playwright in npm UI project, tests Kestra OSS develop docker image. These tests are written from zero, lets make them unflaky from the start!.'
on:
schedule:
- cron: "0 * * * *" # Every hour
workflow_call:
inputs:
noInputYet:
description: 'not input yet.'
required: false
type: string
default: "no input"
workflow_dispatch:
inputs:
noInputYet:
description: 'not input yet.'
required: false
type: string
default: "no input"
jobs:
check:
timeout-minutes: 15
runs-on: ubuntu-latest
env:
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
steps:
- name: Login to DockerHub
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ github.token }}
- name: Checkout kestra
uses: actions/checkout@v5
with:
path: kestra
# Setup build
- uses: kestra-io/actions/.github/actions/setup-build@main
name: Setup - Build
id: build
with:
java-enabled: true
node-enabled: true
python-enabled: true
- name: Install Npm dependencies
run: |
cd kestra/ui
npm i
npx playwright install --with-deps chromium
- name: Run E2E Tests
run: |
cd kestra
sh build-and-start-e2e-tests.sh
- name: Upload Playwright Report as Github artifact
# 'With this report, you can analyze locally the results of the tests. see https://playwright.dev/docs/ci-intro#html-report'
uses: actions/upload-artifact@v4
if: ${{ !cancelled() }}
with:
name: playwright-report
path: kestra/ui/playwright-report/
retention-days: 7
# Allure check
# TODO I don't know what it should do
# - uses: rlespinasse/github-slug-action@v5
# name: Allure - Generate slug variables
#
# - name: Allure - Publish report
# uses: andrcuns/allure-publish-action@v2.9.0
# if: always() && env.GOOGLE_SERVICE_ACCOUNT != ''
# continue-on-error: true
# env:
# GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_AUTH_TOKEN }}
# JAVA_HOME: /usr/lib/jvm/default-jvm/
# with:
# storageType: gcs
# resultsGlob: "**/build/allure-results"
# bucket: internal-kestra-host
# baseUrl: "https://internal.dev.kestra.io"
# prefix: ${{ format('{0}/{1}', github.repository, 'allure/java') }}
# copyLatest: true
# ignoreMissingResults: true

View File

@@ -1,82 +0,0 @@
name: Run Gradle Release for Kestra Plugins
on:
workflow_dispatch:
inputs:
releaseVersion:
description: 'The release version (e.g., 0.21.0)'
required: true
type: string
nextVersion:
description: 'The next version (e.g., 0.22.0-SNAPSHOT)'
required: true
type: string
dryRun:
description: 'Use DRY_RUN mode'
required: false
default: 'false'
jobs:
release:
name: Release plugins
runs-on: ubuntu-latest
steps:
# Checkout
- uses: actions/checkout@v5
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: ./actions/.github/actions/setup-build
id: build
with:
java-enabled: true
node-enabled: true
python-enabled: true
caches-enabled: true
# Get Plugins List
- name: Get Plugins List
uses: ./.github/actions/plugins-list
id: plugins-list
with:
plugin-version: 'LATEST'
- name: 'Configure Git'
run: |
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
git config --global user.name "github-actions[bot]"
# Execute
- name: Run Gradle Release
if: ${{ github.event.inputs.dryRun == 'false' }}
env:
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
run: |
chmod +x ./dev-tools/release-plugins.sh;
./dev-tools/release-plugins.sh \
--release-version=${{github.event.inputs.releaseVersion}} \
--next-version=${{github.event.inputs.nextVersion}} \
--yes \
${{ steps.plugins-list.outputs.repositories }}
- name: Run Gradle Release (DRY_RUN)
if: ${{ github.event.inputs.dryRun == 'true' }}
env:
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
run: |
chmod +x ./dev-tools/release-plugins.sh;
./dev-tools/release-plugins.sh \
--release-version=${{github.event.inputs.releaseVersion}} \
--next-version=${{github.event.inputs.nextVersion}} \
--dry-run \
--yes \
${{ steps.plugins-list.outputs.repositories }}

View File

@@ -1,91 +0,0 @@
name: Run Gradle Release
run-name: "Releasing Kestra ${{ github.event.inputs.releaseVersion }} 🚀"
on:
workflow_dispatch:
inputs:
releaseVersion:
description: 'The release version (e.g., 0.21.0)'
required: true
type: string
nextVersion:
description: 'The next version (e.g., 0.22.0-SNAPSHOT)'
required: true
type: string
env:
RELEASE_VERSION: "${{ github.event.inputs.releaseVersion }}"
NEXT_VERSION: "${{ github.event.inputs.nextVersion }}"
jobs:
release:
name: Release Kestra
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/develop'
steps:
# Checks
- name: Check Inputs
run: |
if ! [[ "$RELEASE_VERSION" =~ ^[0-9]+(\.[0-9]+)\.0$ ]]; then
echo "Invalid release version. Must match regex: ^[0-9]+(\.[0-9]+)\.0$"
exit 1
fi
if ! [[ "$NEXT_VERSION" =~ ^[0-9]+(\.[0-9]+)\.0-SNAPSHOT$ ]]; then
echo "Invalid next version. Must match regex: ^[0-9]+(\.[0-9]+)\.0-SNAPSHOT$"
exit 1;
fi
# Checkout
- uses: actions/checkout@v5
with:
fetch-depth: 0
path: kestra
# Checkout GitHub Actions
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: ./actions/.github/actions/setup-build
id: build
with:
java-enabled: true
node-enabled: true
python-enabled: true
caches-enabled: true
- name: Configure Git
run: |
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
git config --global user.name "github-actions[bot]"
# Execute
- name: Run Gradle Release
env:
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
run: |
# Extract the major and minor versions
BASE_VERSION=$(echo "$RELEASE_VERSION" | sed -E 's/^([0-9]+\.[0-9]+)\..*/\1/')
PUSH_RELEASE_BRANCH="releases/v${BASE_VERSION}.x"
cd kestra
# Create and push release branch
git checkout -b "$PUSH_RELEASE_BRANCH";
git push -u origin "$PUSH_RELEASE_BRANCH";
# Run gradle release
git checkout develop;
if [[ "$RELEASE_VERSION" == *"-SNAPSHOT" ]]; then
./gradlew release -Prelease.useAutomaticVersion=true \
-Prelease.releaseVersion="${RELEASE_VERSION}" \
-Prelease.newVersion="${NEXT_VERSION}" \
-Prelease.pushReleaseVersionBranch="${PUSH_RELEASE_BRANCH}" \
-Prelease.failOnSnapshotDependencies=false
else
./gradlew release -Prelease.useAutomaticVersion=true \
-Prelease.releaseVersion="${RELEASE_VERSION}" \
-Prelease.newVersion="${NEXT_VERSION}" \
-Prelease.pushReleaseVersionBranch="${PUSH_RELEASE_BRANCH}"
fi

86
.github/workflows/main-build.yml vendored Normal file
View File

@@ -0,0 +1,86 @@
name: Main Workflow
on:
push:
branches:
- releases/*
- develop
workflow_dispatch:
inputs:
skip-test:
description: 'Skip test'
type: choice
required: true
default: 'false'
options:
- "true"
- "false"
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}-main
cancel-in-progress: true
jobs:
backend-tests:
name: Backend tests
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
uses: kestra-io/actions/.github/workflows/kestra-oss-backend-tests.yml@main
secrets:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
frontend-tests:
name: Frontend tests
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
uses: kestra-io/actions/.github/workflows/kestra-oss-frontend-tests.yml@main
secrets:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
publish-develop-docker:
name: Publish Docker
needs: [backend-tests, frontend-tests]
if: "!failure() && !cancelled() && github.ref == 'refs/heads/develop'"
uses: kestra-io/actions/.github/workflows/kestra-oss-publish-docker.yml@main
with:
plugin-version: 'LATEST-SNAPSHOT'
secrets:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
publish-develop-maven:
name: Publish develop Maven
needs: [ backend-tests, frontend-tests ]
if: "!failure() && !cancelled() && github.ref == 'refs/heads/develop'"
uses: kestra-io/actions/.github/workflows/kestra-oss-publish-maven.yml@main
secrets:
SONATYPE_USER: ${{ secrets.SONATYPE_USER }}
SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }}
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
end:
runs-on: ubuntu-latest
needs: [publish-develop-docker, publish-develop-maven]
if: always()
steps:
- name: Trigger EE Workflow
uses: peter-evans/repository-dispatch@v3
if: github.ref == 'refs/heads/develop' && needs.release.result == 'success'
with:
token: ${{ secrets.GH_PERSONAL_TOKEN }}
repository: kestra-io/kestra-ee
event-type: "oss-updated"
# Slack
- name: Slack - Notification
if: ${{ failure() && env.SLACK_WEBHOOK_URL != 0 && (github.ref == 'refs/heads/master' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/develop') }}
uses: kestra-io/actions/composite/slack-status@main
with:
webhook-url: ${{ secrets.SLACK_WEBHOOK_URL }}

View File

@@ -1,83 +0,0 @@
name: Main Workflow
on:
workflow_dispatch:
inputs:
skip-test:
description: 'Skip test'
type: choice
required: true
default: 'false'
options:
- "true"
- "false"
plugin-version:
description: "plugins version"
required: false
type: string
push:
branches:
- master
- main
- releases/*
- develop
tags:
- v*
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}-main
cancel-in-progress: true
jobs:
tests:
name: Execute tests
uses: ./.github/workflows/workflow-test.yml
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
with:
report-status: false
release:
name: Release
needs: [tests]
if: "!failure() && !cancelled() && !startsWith(github.ref, 'refs/heads/releases')"
uses: ./.github/workflows/workflow-release.yml
with:
plugin-version: ${{ inputs.plugin-version != '' && inputs.plugin-version || (github.ref == 'refs/heads/develop' && 'LATEST-SNAPSHOT' || 'LATEST') }}
secrets:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
SONATYPE_USER: ${{ secrets.SONATYPE_USER }}
SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }}
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
end:
runs-on: ubuntu-latest
needs:
- release
if: always()
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
steps:
- name: Trigger EE Workflow
uses: peter-evans/repository-dispatch@v3
if: github.ref == 'refs/heads/develop' && needs.release.result == 'success'
with:
token: ${{ secrets.GH_PERSONAL_TOKEN }}
repository: kestra-io/kestra-ee
event-type: "oss-updated"
# Slack
- name: Slack - Notification
uses: Gamesight/slack-workflow-status@master
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
name: GitHub Actions
icon_emoji: ":github-actions:"
channel: "C02DQ1A7JLR" # _int_git channel

60
.github/workflows/pre-release.yml vendored Normal file
View File

@@ -0,0 +1,60 @@
name: Pre Release
on:
push:
tags:
- 'v*'
workflow_dispatch:
inputs:
skip-test:
description: 'Skip test'
type: choice
required: true
default: 'false'
options:
- "true"
- "false"
jobs:
build-artifacts:
name: Build Artifacts
uses: kestra-io/actions/.github/workflows/kestra-oss-build-artifacts.yml@main
backend-tests:
name: Backend tests
uses: kestra-io/actions/.github/workflows/kestra-oss-backend-tests.yml@main
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
secrets:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
frontend-tests:
name: Frontend tests
uses: kestra-io/actions/.github/workflows/kestra-oss-frontend-tests.yml@main
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
secrets:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
publish-maven:
name: Publish Maven
needs: [ backend-tests, frontend-tests ]
if: "!failure() && !cancelled()"
uses: kestra-io/actions/.github/workflows/kestra-oss-publish-maven.yml@main
secrets:
SONATYPE_USER: ${{ secrets.SONATYPE_USER }}
SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }}
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
publish-github:
name: Github Release
needs: [build-artifacts, backend-tests, frontend-tests]
if: "!failure() && !cancelled()"
uses: kestra-io/actions/.github/workflows/kestra-oss-publish-github.yml@main
secrets:
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}

View File

@@ -3,11 +3,11 @@ name: Pull Request - Delete Docker
on:
pull_request:
types: [closed]
# TODO import a reusable one
jobs:
publish:
name: Pull Request - Delete Docker
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
if: github.repository == 'kestra-io/kestra' # prevent running on forks
runs-on: ubuntu-latest
steps:
- uses: dataaxiom/ghcr-cleanup-action@v1

View File

@@ -2,17 +2,12 @@ name: Pull Request Workflow
on:
pull_request:
branches:
- develop
concurrency:
group: ${{ github.workflow }}-${{ github.ref_name }}-pr
cancel-in-progress: true
jobs:
# ********************************************************************************************************************
# File changes detection
# ********************************************************************************************************************
file-changes:
if: ${{ github.event.pull_request.draft == false }}
name: File changes detection
@@ -33,14 +28,11 @@ jobs:
- '!{ui,.github}/**'
token: ${{ secrets.GITHUB_TOKEN }}
# ********************************************************************************************************************
# Tests
# ********************************************************************************************************************
frontend:
name: Frontend - Tests
needs: [file-changes]
if: "needs.file-changes.outputs.ui == 'true'"
uses: ./.github/workflows/workflow-frontend-test.yml
uses: kestra-io/actions/.github/workflows/kestra-oss-frontend-tests.yml@main
secrets:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
@@ -49,7 +41,7 @@ jobs:
name: Backend - Tests
needs: file-changes
if: "needs.file-changes.outputs.backend == 'true'"
uses: ./.github/workflows/workflow-backend-test.yml
uses: kestra-io/actions/.github/workflows/kestra-oss-backend-tests.yml@main
secrets:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
@@ -58,21 +50,8 @@ jobs:
e2e-tests:
name: E2E - Tests
uses: ./.github/workflows/e2e.yml
uses: kestra-io/actions/.github/workflows/kestra-oss-e2e-tests.yml@main
end:
name: End
runs-on: ubuntu-latest
if: always()
needs: [frontend, backend]
steps:
# Slack
- name: Slack notification
uses: Gamesight/slack-workflow-status@master
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
name: GitHub Actions
icon_emoji: ":github-actions:"
channel: "C02DQ1A7JLR"
generate-pull-request-docker-image:
name: Generate PR docker image
uses: kestra-io/actions/.github/workflows/kestra-oss-pullrequest-publish-docker.yml@main

34
.github/workflows/release-docker.yml vendored Normal file
View File

@@ -0,0 +1,34 @@
name: Publish docker
on:
workflow_dispatch:
inputs:
retag-latest:
description: 'Retag latest Docker images'
required: true
type: boolean
default: false
retag-lts:
description: 'Retag LTS Docker images'
required: true
type: boolean
default: false
plugin-version:
description: 'Plugin version'
required: false
type: string
default: "LATEST"
jobs:
publish-docker:
name: Publish Docker
if: startsWith(github.ref, 'refs/tags/v')
uses: kestra-io/actions/.github/workflows/kestra-oss-publish-docker.yml@main
with:
plugin-version: ${{ inputs.plugin-version }}
retag-latest: ${{ inputs.retag-latest }}
retag-lts: ${{ inputs.retag-lts }}
secrets:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}

View File

@@ -1,60 +0,0 @@
name: Set Version and Tag Plugins
on:
workflow_dispatch:
inputs:
releaseVersion:
description: 'The release version (e.g., 0.21.0)'
required: true
type: string
dryRun:
description: 'Use DRY_RUN mode'
required: false
default: 'false'
jobs:
tag:
name: Release plugins
runs-on: ubuntu-latest
steps:
# Checkout
- uses: actions/checkout@v5
with:
fetch-depth: 0
# Get Plugins List
- name: Get Plugins List
uses: ./.github/actions/plugins-list
id: plugins-list
with:
plugin-version: 'LATEST'
- name: 'Configure Git'
run: |
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
git config --global user.name "github-actions[bot]"
# Execute
- name: Set Version and Tag Plugins
if: ${{ github.event.inputs.dryRun == 'false' }}
env:
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
run: |
chmod +x ./dev-tools/setversion-tag-plugins.sh;
./dev-tools/setversion-tag-plugins.sh \
--release-version=${{github.event.inputs.releaseVersion}} \
--yes \
${{ steps.plugins-list.outputs.repositories }}
- name: Set Version and Tag Plugins (DRY_RUN)
if: ${{ github.event.inputs.dryRun == 'true' }}
env:
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
run: |
chmod +x ./dev-tools/setversion-tag-plugins.sh;
./dev-tools/setversion-tag-plugins.sh \
--release-version=${{github.event.inputs.releaseVersion}} \
--dry-run \
--yes \
${{ steps.plugins-list.outputs.repositories }}

View File

@@ -1,60 +0,0 @@
name: Set Version and Tag
run-name: "Set version and Tag Kestra to ${{ github.event.inputs.releaseVersion }} 🚀"
on:
workflow_dispatch:
inputs:
releaseVersion:
description: 'The release version (e.g., 0.21.1)'
required: true
type: string
env:
RELEASE_VERSION: "${{ github.event.inputs.releaseVersion }}"
jobs:
release:
name: Release Kestra
runs-on: ubuntu-latest
if: startsWith(github.ref, 'refs/heads/releases/v')
steps:
# Checks
- name: Check Inputs
run: |
if ! [[ "$RELEASE_VERSION" =~ ^[0-9]+(\.[0-9]+)(\.[0-9]+)(-rc[0-9])?(-SNAPSHOT)?$ ]]; then
echo "Invalid release version. Must match regex: ^[0-9]+(\.[0-9]+)(\.[0-9]+)-(rc[0-9])?(-SNAPSHOT)?$"
exit 1
fi
# Extract the major and minor versions
BASE_VERSION=$(echo "$RELEASE_VERSION" | sed -E 's/^([0-9]+\.[0-9]+)\..*/\1/')
RELEASE_BRANCH="refs/heads/releases/v${BASE_VERSION}.x"
CURRENT_BRANCH="$GITHUB_REF"
if ! [[ "$CURRENT_BRANCH" == "$RELEASE_BRANCH" ]]; then
echo "Invalid release branch. Expected $RELEASE_BRANCH, was $CURRENT_BRANCH"
exit 1
fi
# Checkout
- name: Checkout
uses: actions/checkout@v5
with:
fetch-depth: 0
token: ${{ secrets.GH_PERSONAL_TOKEN }}
# Configure
- name: Git - Configure
run: |
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
git config --global user.name "github-actions[bot]"
# Execute
- name: Run Gradle Release
env:
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
run: |
# Update version
sed -i "s/^version=.*/version=$RELEASE_VERSION/" ./gradle.properties
git add ./gradle.properties
git commit -m"chore(version): update to version '$RELEASE_VERSION'"
git push
git tag -a "v$RELEASE_VERSION" -m"v$RELEASE_VERSION"
git push --tags

View File

@@ -21,20 +21,12 @@ jobs:
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: ./actions/.github/actions/setup-build
- uses: kestra-io/actions/composite/setup-build@main
id: build
with:
java-enabled: true
node-enabled: true
caches-enabled: true
# Npm
- name: Npm - Install
@@ -56,92 +48,3 @@ jobs:
with:
name: dependency-check-report
path: build/reports/dependency-check-report.html
develop-image-check:
name: Image Check (develop)
runs-on: ubuntu-latest
permissions:
contents: read
security-events: write
actions: read
steps:
# Checkout
- uses: actions/checkout@v5
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: ./actions/.github/actions/setup-build
id: build
with:
java-enabled: false
node-enabled: false
caches-enabled: true
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
- name: Docker Vulnerabilities Check
uses: aquasecurity/trivy-action@0.33.0
with:
image-ref: kestra/kestra:develop
format: 'template'
template: '@/contrib/sarif.tpl'
severity: 'CRITICAL,HIGH'
output: 'trivy-results.sarif'
skip-dirs: /app/plugins
- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: 'trivy-results.sarif'
category: docker-
latest-image-check:
name: Image Check (latest)
runs-on: ubuntu-latest
permissions:
contents: read
security-events: write
actions: read
steps:
# Checkout
- uses: actions/checkout@v5
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- uses: ./actions/.github/actions/setup-build
id: build
with:
java-enabled: false
node-enabled: false
caches-enabled: true
# Run Trivy image scan for Docker vulnerabilities, see https://github.com/aquasecurity/trivy-action
- name: Docker Vulnerabilities Check
uses: aquasecurity/trivy-action@0.33.0
with:
image-ref: kestra/kestra:latest
format: table
skip-dirs: /app/plugins
scanners: vuln
severity: 'CRITICAL,HIGH'
output: 'trivy-results.sarif'
- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: 'trivy-results.sarif'

View File

@@ -1,142 +0,0 @@
name: Backend - Tests
on:
workflow_call:
secrets:
GITHUB_AUTH_TOKEN:
description: "The GitHub Token."
required: true
CODECOV_TOKEN:
description: 'Codecov Token'
required: true
SONAR_TOKEN:
description: 'Sonar Token'
required: true
GOOGLE_SERVICE_ACCOUNT:
description: 'Google Service Account'
required: true
permissions:
contents: write
checks: write
actions: read
jobs:
test:
name: Backend - Tests
runs-on: ubuntu-latest
env:
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
steps:
- uses: actions/checkout@v5
name: Checkout - Current ref
with:
fetch-depth: 0
# Setup build
- uses: kestra-io/actions/.github/actions/setup-build@main
name: Setup - Build
id: build
with:
java-enabled: true
node-enabled: true
python-enabled: true
# Services
- name: Setup - Start docker compose
shell: bash
run: docker compose -f docker-compose-ci.yml up -d
# Gradle check
- name: Gradle - Build
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
env:
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
shell: bash
run: |
echo $GOOGLE_SERVICE_ACCOUNT | base64 -d > ~/.gcp-service-account.json
export GOOGLE_APPLICATION_CREDENTIALS=$HOME/.gcp-service-account.json
./gradlew check javadoc --parallel
# report test
- name: Test - Publish Test Results
uses: dorny/test-reporter@v2
if: always()
with:
name: Java Tests Report
reporter: java-junit
path: '**/build/test-results/test/TEST-*.xml'
list-suites: 'failed'
list-tests: 'failed'
fail-on-error: 'false'
token: ${{ secrets.GITHUB_AUTH_TOKEN }}
# Sonar
- name: Test - Analyze with Sonar
if: env.SONAR_TOKEN != ''
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_AUTH_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
shell: bash
run: ./gradlew sonar --info
# GCP
- name: GCP - Auth with unit test account
id: auth
if: always() && env.GOOGLE_SERVICE_ACCOUNT != ''
continue-on-error: true
uses: "google-github-actions/auth@v3"
with:
credentials_json: "${{ secrets.GOOGLE_SERVICE_ACCOUNT }}"
- name: GCP - Setup Cloud SDK
if: env.GOOGLE_SERVICE_ACCOUNT != ''
uses: "google-github-actions/setup-gcloud@v3"
# Allure check
- uses: rlespinasse/github-slug-action@v5
name: Allure - Generate slug variables
- name: Allure - Publish report
uses: andrcuns/allure-publish-action@v2.9.0
if: always() && env.GOOGLE_SERVICE_ACCOUNT != ''
continue-on-error: true
env:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_AUTH_TOKEN }}
JAVA_HOME: /usr/lib/jvm/default-jvm/
with:
storageType: gcs
resultsGlob: "**/build/allure-results"
bucket: internal-kestra-host
baseUrl: "https://internal.dev.kestra.io"
prefix: ${{ format('{0}/{1}', github.repository, 'allure/java') }}
copyLatest: true
ignoreMissingResults: true
# Jacoco
- name: Jacoco - Copy reports
if: env.GOOGLE_SERVICE_ACCOUNT != ''
continue-on-error: true
shell: bash
run: |
mv build/reports/jacoco/testCodeCoverageReport build/reports/jacoco/test/
mv build/reports/jacoco/test/testCodeCoverageReport.xml build/reports/jacoco/test/jacocoTestReport.xml
gsutil -m rsync -d -r build/reports/jacoco/test/ gs://internal-kestra-host/${{ format('{0}/{1}', github.repository, 'jacoco') }}
# Codecov
- name: Codecov - Upload coverage reports
uses: codecov/codecov-action@v5
if: ${{ !cancelled() }}
continue-on-error: true
with:
token: ${{ secrets.CODECOV_TOKEN }}
flags: backend
- name: Codecov - Upload test results
uses: codecov/test-results-action@v1
if: ${{ !cancelled() }}
continue-on-error: true
with:
token: ${{ secrets.CODECOV_TOKEN }}
flags: backend

View File

@@ -1,80 +0,0 @@
name: Build Artifacts
on:
workflow_call: {}
jobs:
build:
name: Build - Artifacts
runs-on: ubuntu-latest
outputs:
docker-tag: ${{ steps.vars.outputs.tag }}
docker-artifact-name: ${{ steps.vars.outputs.artifact }}
plugins: ${{ steps.plugins.outputs.plugins }}
env:
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
steps:
- name: Checkout - Current ref
uses: actions/checkout@v5
with:
fetch-depth: 0
# Npm
- name: Setup - Npm install
shell: bash
working-directory: ui
run: npm ci
# Setup build
- uses: kestra-io/actions/.github/actions/setup-build@main
name: Setup - Build
id: build
with:
java-enabled: true
node-enabled: true
# Get Plugins List
- name: Plugins - Get List
uses: ./.github/actions/plugins-list
if: "!startsWith(github.ref, 'refs/tags/v')"
id: plugins-list
with:
plugin-version: ${{ env.PLUGIN_VERSION }}
# Set Plugins List
- name: Plugins - Set List
id: plugins
if: "!startsWith(github.ref, 'refs/tags/v')"
shell: bash
run: |
PLUGINS="${{ steps.plugins-list.outputs.plugins }}"
TAG=${GITHUB_REF#refs/*/}
if [[ $TAG = "master" || $TAG == v* ]]; then
echo "plugins=$PLUGINS" >> $GITHUB_OUTPUT
else
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ $PLUGINS" >> $GITHUB_OUTPUT
fi
# Build
- name: Gradle - Build
shell: bash
run: |
./gradlew executableJar
- name: Artifacts - Copy exe to image
shell: bash
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
# Upload artifacts
- name: Artifacts - Upload JAR
uses: actions/upload-artifact@v4
with:
name: jar
path: build/libs/
- name: Artifacts - Upload Executable
uses: actions/upload-artifact@v4
with:
name: exe
path: build/executable/

View File

@@ -1,70 +0,0 @@
name: Frontend - Tests
on:
workflow_call:
secrets:
GITHUB_AUTH_TOKEN:
description: "The GitHub Token."
required: true
CODECOV_TOKEN:
description: 'Codecov Token'
required: true
env:
# to save corepack from itself
COREPACK_INTEGRITY_KEYS: 0
jobs:
test:
name: Frontend - Tests
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v5
- name: Cache Node Modules
id: cache-node-modules
uses: actions/cache@v4
with:
path: |
ui/node_modules
key: modules-${{ hashFiles('ui/package-lock.json') }}
- name: Cache Playwright Binaries
id: cache-playwright
uses: actions/cache@v4
with:
path: |
~/.cache/ms-playwright
key: playwright-${{ hashFiles('ui/package-lock.json') }}
- name: Npm - install
if: steps.cache-node-modules.outputs.cache-hit != 'true'
working-directory: ui
run: npm ci
- name: Npm - lint
uses: reviewdog/action-eslint@v1
with:
github_token: ${{ secrets.GITHUB_AUTH_TOKEN }}
reporter: github-pr-review
workdir: ui
- name: Npm - Run build
working-directory: ui
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
run: npm run build
- name: Run front-end unit tests
working-directory: ui
run: npm run test:unit -- --coverage
- name: Storybook - Install Playwright
working-directory: ui
if: steps.cache-playwright.outputs.cache-hit != 'true'
run: npx playwright install --with-deps
- name: Run storybook component tests
working-directory: ui
run: npm run test:storybook -- --coverage

View File

@@ -1,88 +0,0 @@
name: Github - Release
on:
workflow_dispatch:
workflow_call:
secrets:
GH_PERSONAL_TOKEN:
description: "The Github personal token."
required: true
SLACK_RELEASES_WEBHOOK_URL:
description: "The Slack webhook URL."
required: true
jobs:
publish:
name: Github - Release
runs-on: ubuntu-latest
steps:
# Check out
- name: Checkout - Repository
uses: actions/checkout@v5
with:
fetch-depth: 0
submodules: true
# Checkout GitHub Actions
- name: Checkout - Actions
uses: actions/checkout@v5
with:
repository: kestra-io/actions
sparse-checkout-cone-mode: true
path: actions
sparse-checkout: |
.github/actions
# Download Exec
# Must be done after checkout actions
- name: Artifacts - Download executable
uses: actions/download-artifact@v5
if: startsWith(github.ref, 'refs/tags/v')
with:
name: exe
path: build/executable
- name: Check if current tag is latest
id: is_latest
run: |
latest_tag=$(git tag | grep -E '^v[0-9]+\.[0-9]+\.[0-9]+$' | sed 's/^v//' | sort -V | tail -n1)
current_tag="${GITHUB_REF_NAME#v}"
if [ "$current_tag" = "$latest_tag" ]; then
echo "latest=true" >> $GITHUB_OUTPUT
else
echo "latest=false" >> $GITHUB_OUTPUT
fi
env:
GITHUB_REF_NAME: ${{ github.ref_name }}
# GitHub Release
- name: Create GitHub release
uses: ./actions/.github/actions/github-release
if: ${{ startsWith(github.ref, 'refs/tags/v') }}
env:
MAKE_LATEST: ${{ steps.is_latest.outputs.latest }}
GITHUB_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
# Trigger gha workflow to bump helm chart version
- name: GitHub - Trigger the Helm chart version bump
uses: peter-evans/repository-dispatch@v3
with:
token: ${{ secrets.GH_PERSONAL_TOKEN }}
repository: kestra-io/helm-charts
event-type: update-helm-chart-version
client-payload: |-
{
"new_version": "${{ github.ref_name }}",
"github_repository": "${{ github.repository }}",
"github_actor": "${{ github.actor }}"
}
- name: Merge Release Notes
if: ${{ startsWith(github.ref, 'refs/tags/v') }}
uses: ./actions/.github/actions/github-release-note-merge
env:
GITHUB_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
RELEASE_TAG: ${{ github.ref_name }}

View File

@@ -1,200 +0,0 @@
name: Create Docker images on Release
on:
workflow_dispatch:
inputs:
retag-latest:
description: 'Retag latest Docker images'
required: true
type: choice
default: "false"
options:
- "true"
- "false"
release-tag:
description: 'Kestra Release Tag (by default, deduced with the ref)'
required: false
type: string
plugin-version:
description: 'Plugin version'
required: false
type: string
default: "LATEST"
force-download-artifact:
description: 'Force download artifact'
required: false
type: choice
default: "true"
options:
- "true"
- "false"
workflow_call:
inputs:
plugin-version:
description: "Plugin version"
default: 'LATEST'
required: false
type: string
force-download-artifact:
description: 'Force download artifact'
required: false
type: string
default: "true"
secrets:
DOCKERHUB_USERNAME:
description: "The Dockerhub username."
required: true
DOCKERHUB_PASSWORD:
description: "The Dockerhub password."
required: true
env:
PLUGIN_VERSION: ${{ inputs.plugin-version != null && inputs.plugin-version || 'LATEST' }}
jobs:
plugins:
name: List Plugins
runs-on: ubuntu-latest
outputs:
plugins: ${{ steps.plugins.outputs.plugins }}
steps:
# Checkout
- uses: actions/checkout@v5
# Get Plugins List
- name: Get Plugins List
uses: ./.github/actions/plugins-list
id: plugins
with: # remap LATEST-SNAPSHOT to LATEST
plugin-version: ${{ env.PLUGIN_VERSION == 'LATEST-SNAPSHOT' && 'LATEST' || env.PLUGIN_VERSION }}
# ********************************************************************************************************************
# Build
# ********************************************************************************************************************
build-artifacts:
name: Build Artifacts
if: ${{ inputs.force-download-artifact == 'true' }}
uses: ./.github/workflows/workflow-build-artifacts.yml
docker:
name: Publish Docker
needs: [ plugins, build-artifacts ]
if: always()
runs-on: ubuntu-latest
strategy:
matrix:
image:
- name: "-no-plugins"
plugins: ""
packages: jattach
python-libs: ""
- name: ""
plugins: ${{needs.plugins.outputs.plugins}}
packages: python3 python-is-python3 python3-pip curl jattach
python-libs: kestra
steps:
- uses: actions/checkout@v5
# Vars
- name: Set image name
id: vars
run: |
if [[ "${{ inputs.release-tag }}" == "" ]]; then
TAG=${GITHUB_REF#refs/*/}
echo "tag=${TAG}" >> $GITHUB_OUTPUT
else
TAG="${{ inputs.release-tag }}"
echo "tag=${TAG}" >> $GITHUB_OUTPUT
fi
if [[ $GITHUB_REF == refs/tags/* ]]; then
if [[ $TAG =~ ^v[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
# this will remove the patch version number
MINOR_SEMVER=${TAG%.*}
echo "minor_semver=${MINOR_SEMVER}" >> $GITHUB_OUTPUT
else
echo "Tag '$TAG' is not a valid semver (vMAJOR.MINOR.PATCH), skipping minor_semver"
fi
fi
if [[ "${{ env.PLUGIN_VERSION }}" == *"-SNAPSHOT" ]]; then
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ ${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT;
else
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
fi
# Download executable from artifact
- name: Artifacts - Download executable
uses: actions/download-artifact@v5
with:
name: exe
path: build/executable
- name: Copy exe to image
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
# Docker setup
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Docker - Fix Qemu
shell: bash
run: |
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
# Docker Login
- name: Login to DockerHub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_PASSWORD }}
# Docker Build and push
- name: Push to Docker Hub
uses: docker/build-push-action@v6
with:
context: .
push: true
tags: ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }}
platforms: linux/amd64,linux/arm64
build-args: |
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }}
APT_PACKAGES=${{ matrix.image.packages }}
PYTHON_LIBRARIES=${{ matrix.image.python-libs }}
- name: Install regctl
if: startsWith(github.ref, 'refs/tags/v')
uses: regclient/actions/regctl-installer@main
- name: Retag to minor semver version
if: startsWith(github.ref, 'refs/tags/v') && steps.vars.outputs.minor_semver != ''
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.minor_semver, matrix.image.name) }}
- name: Retag to latest
if: startsWith(github.ref, 'refs/tags/v') && inputs.retag-latest == 'true'
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
end:
runs-on: ubuntu-latest
needs:
- docker
if: always()
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
steps:
# Slack
- name: Slack notification
uses: Gamesight/slack-workflow-status@master
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
name: GitHub Actions
icon_emoji: ':github-actions:'
channel: 'C02DQ1A7JLR' # _int_git channel

View File

@@ -1,57 +0,0 @@
name: Publish - Maven
on:
workflow_call:
secrets:
SONATYPE_USER:
description: "The Sonatype username."
required: true
SONATYPE_PASSWORD:
description: "The Sonatype password."
required: true
SONATYPE_GPG_KEYID:
description: "The Sonatype GPG key id."
required: true
SONATYPE_GPG_PASSWORD:
description: "The Sonatype GPG password."
required: true
SONATYPE_GPG_FILE:
description: "The Sonatype GPG file."
required: true
jobs:
publish:
name: Publish - Maven
runs-on: ubuntu-latest
steps:
- name: Checkout - Current ref
uses: actions/checkout@v5
# Setup build
- name: Setup - Build
uses: kestra-io/actions/.github/actions/setup-build@main
id: build
with:
java-enabled: true
node-enabled: true
# Publish
- name: Publish - Release package to Maven Central
shell: bash
env:
ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.SONATYPE_USER }}
ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }}
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE}}
run: |
mkdir -p ~/.gradle/
echo "signing.keyId=${SONATYPE_GPG_KEYID}" > ~/.gradle/gradle.properties
echo "signing.password=${SONATYPE_GPG_PASSWORD}" >> ~/.gradle/gradle.properties
echo "signing.secretKeyRingFile=${HOME}/.gradle/secring.gpg" >> ~/.gradle/gradle.properties
echo ${SONATYPE_GPG_FILE} | base64 -d > ~/.gradle/secring.gpg
./gradlew publishToMavenCentral
# Gradle dependency
- name: Java - Gradle dependency graph
uses: gradle/actions/dependency-submission@v4

View File

@@ -1,78 +0,0 @@
name: Pull Request - Publish Docker
on:
pull_request:
branches:
- develop
jobs:
build-artifacts:
name: Build Artifacts
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
uses: ./.github/workflows/workflow-build-artifacts.yml
publish:
name: Publish Docker
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
runs-on: ubuntu-latest
needs: build-artifacts
env:
GITHUB_IMAGE_PATH: "ghcr.io/kestra-io/kestra-pr"
steps:
- name: Checkout - Current ref
uses: actions/checkout@v5
with:
fetch-depth: 0
# Docker setup
- name: Docker - Setup QEMU
uses: docker/setup-qemu-action@v3
- name: Docker - Setup Docker Buildx
uses: docker/setup-buildx-action@v3
# Docker Login
- name: Login to GHCR
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
# Build Docker Image
- name: Artifacts - Download executable
uses: actions/download-artifact@v5
with:
name: exe
path: build/executable
- name: Docker - Copy exe to image
shell: bash
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
- name: Docker - Build image
uses: docker/build-push-action@v6
with:
context: .
file: ./Dockerfile.pr
push: true
tags: ${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }}
platforms: linux/amd64,linux/arm64
# Add comment on pull request
- name: Add comment to PR
uses: actions/github-script@v7
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |
await github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: `**🐋 Docker image**: \`${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }}\`\n` +
`\n` +
`\`\`\`bash\n` +
`docker run --pull=always --rm -it -p 8080:8080 --user=root -v /var/run/docker.sock:/var/run/docker.sock -v /tmp:/tmp ${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }} server local\n` +
`\`\`\``
})

View File

@@ -1,85 +0,0 @@
name: Release
on:
workflow_dispatch:
inputs:
plugin-version:
description: "plugins version"
default: 'LATEST'
required: false
type: string
publish-docker:
description: "Publish Docker image"
default: 'false'
required: false
type: string
workflow_call:
inputs:
plugin-version:
description: "plugins version"
default: 'LATEST'
required: false
type: string
secrets:
DOCKERHUB_USERNAME:
description: "The Dockerhub username."
required: true
DOCKERHUB_PASSWORD:
description: "The Dockerhub password."
required: true
SONATYPE_USER:
description: "The Sonatype username."
required: true
SONATYPE_PASSWORD:
description: "The Sonatype password."
required: true
SONATYPE_GPG_KEYID:
description: "The Sonatype GPG key id."
required: true
SONATYPE_GPG_PASSWORD:
description: "The Sonatype GPG password."
required: true
SONATYPE_GPG_FILE:
description: "The Sonatype GPG file."
required: true
GH_PERSONAL_TOKEN:
description: "GH personnal Token."
required: true
SLACK_RELEASES_WEBHOOK_URL:
description: "Slack webhook for releases channel."
required: true
jobs:
build-artifacts:
name: Build - Artifacts
uses: ./.github/workflows/workflow-build-artifacts.yml
Docker:
name: Publish Docker
needs: build-artifacts
uses: ./.github/workflows/workflow-publish-docker.yml
if: github.ref == 'refs/heads/develop' || inputs.publish-docker == 'true'
with:
force-download-artifact: 'false'
plugin-version: ${{ inputs.plugin-version != null && inputs.plugin-version || 'LATEST' }}
secrets:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
Maven:
name: Publish Maven
uses: ./.github/workflows/workflow-publish-maven.yml
secrets:
SONATYPE_USER: ${{ secrets.SONATYPE_USER }}
SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }}
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
Github:
name: Github Release
needs: build-artifacts
if: startsWith(github.ref, 'refs/tags/v')
uses: ./.github/workflows/workflow-github-release.yml
secrets:
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}

View File

@@ -1,97 +0,0 @@
name: Tests
on:
schedule:
- cron: '0 4 * * 1,2,3,4,5'
workflow_call:
inputs:
report-status:
description: "Report status of the jobs in outputs"
type: string
required: false
default: false
outputs:
frontend_status:
description: "Status of the frontend job"
value: ${{ jobs.set-frontend-status.outputs.frontend_status }}
backend_status:
description: "Status of the backend job"
value: ${{ jobs.set-backend-status.outputs.backend_status }}
jobs:
file-changes:
name: File changes detection
runs-on: ubuntu-latest
timeout-minutes: 60
outputs:
ui: ${{ steps.changes.outputs.ui }}
backend: ${{ steps.changes.outputs.backend }}
steps:
- uses: actions/checkout@v5
if: "!startsWith(github.ref, 'refs/tags/v')"
- uses: dorny/paths-filter@v3
if: "!startsWith(github.ref, 'refs/tags/v')"
id: changes
with:
filters: |
ui:
- 'ui/**'
backend:
- '!{ui,.github}/**'
token: ${{ secrets.GITHUB_TOKEN }}
frontend:
name: Frontend - Tests
needs: file-changes
if: "needs.file-changes.outputs.ui == 'true' || startsWith(github.ref, 'refs/tags/v')"
uses: ./.github/workflows/workflow-frontend-test.yml
secrets:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
backend:
name: Backend - Tests
needs: file-changes
if: "needs.file-changes.outputs.backend == 'true' || startsWith(github.ref, 'refs/tags/v')"
uses: ./.github/workflows/workflow-backend-test.yml
secrets:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
# Output every job status
# To be used in other workflows
report-status:
name: Report Status
runs-on: ubuntu-latest
needs: [ frontend, backend ]
if: always() && (inputs.report-status == 'true')
outputs:
frontend_status: ${{ steps.set-frontend-status.outputs.frontend_status }}
backend_status: ${{ steps.set-backend-status.outputs.backend_status }}
steps:
- id: set-frontend-status
name: Set frontend job status
run: echo "::set-output name=frontend_status::${{ needs.frontend.result }}"
- id: set-backend-status
name: Set backend job status
run: echo "::set-output name=backend_status::${{ needs.backend.result }}"
notify:
name: Notify - Slack
runs-on: ubuntu-latest
needs: [ frontend, backend ]
if: github.event_name == 'schedule'
steps:
- name: Notify failed CI
id: send-ci-failed
if: |
always() && (needs.frontend.result != 'success' ||
needs.backend.result != 'success')
uses: kestra-io/actions/.github/actions/send-ci-failed@main
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}

View File

@@ -1,4 +1,5 @@
FROM kestra/kestra:develop
ARG KESTRA_DOCKER_BASE_VERSION=develop
FROM kestra/kestra:$KESTRA_DOCKER_BASE_VERSION
USER root

View File

@@ -205,23 +205,59 @@ subprojects {
testImplementation 'org.assertj:assertj-core'
}
test {
useJUnitPlatform()
def commonTestConfig = { Test t ->
// set Xmx for test workers
maxHeapSize = '4g'
t.maxHeapSize = '4g'
// configure en_US default locale for tests
systemProperty 'user.language', 'en'
systemProperty 'user.country', 'US'
t.systemProperty 'user.language', 'en'
t.systemProperty 'user.country', 'US'
environment 'SECRET_MY_SECRET', "{\"secretKey\":\"secretValue\"}".bytes.encodeBase64().toString()
environment 'SECRET_NEW_LINE', "cGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2\nZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZl\neXJsb25n"
environment 'SECRET_WEBHOOK_KEY', "secretKey".bytes.encodeBase64().toString()
environment 'SECRET_NON_B64_SECRET', "some secret value"
environment 'SECRET_PASSWORD', "cGFzc3dvcmQ="
environment 'ENV_TEST1', "true"
environment 'ENV_TEST2', "Pass by env"
t.environment 'SECRET_MY_SECRET', "{\"secretKey\":\"secretValue\"}".bytes.encodeBase64().toString()
t.environment 'SECRET_NEW_LINE', "cGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2\nZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZleXJsb25ncGFzc3dvcmR2ZXJ5dmVyeXZl\neXJsb25n"
t.environment 'SECRET_WEBHOOK_KEY', "secretKey".bytes.encodeBase64().toString()
t.environment 'SECRET_NON_B64_SECRET', "some secret value"
t.environment 'SECRET_PASSWORD', "cGFzc3dvcmQ="
t.environment 'ENV_TEST1', "true"
t.environment 'ENV_TEST2', "Pass by env"
}
tasks.register('flakyTest', Test) { Test t ->
group = 'verification'
description = 'Runs tests tagged @Flaky but does not fail the build.'
useJUnitPlatform {
includeTags 'flaky'
}
ignoreFailures = true
reports {
junitXml.required = true
junitXml.outputPerTestCase = true
junitXml.mergeReruns = true
junitXml.includeSystemErrLog = true
junitXml.outputLocation = layout.buildDirectory.dir("test-results/flakyTest")
}
commonTestConfig(t)
}
test {
useJUnitPlatform {
excludeTags 'flaky'
}
reports {
junitXml.required = true
junitXml.outputPerTestCase = true
junitXml.mergeReruns = true
junitXml.includeSystemErrLog = true
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
}
commonTestConfig(it)
finalizedBy(tasks.named('flakyTest'))
}
testlogger {

View File

@@ -117,7 +117,7 @@ public abstract class AbstractValidateCommand extends AbstractApiCommand {
try(DefaultHttpClient client = client()) {
MutableHttpRequest<String> request = HttpRequest
.POST(apiUri("/flows/validate", tenantService.getTenantId(tenantId)), body).contentType(MediaType.APPLICATION_YAML);
.POST(apiUri("/flows/validate", tenantService.getTenantIdAndAllowEETenants(tenantId)), body).contentType(MediaType.APPLICATION_YAML);
List<ValidateConstraintViolation> validations = client.toBlocking().retrieve(
this.requestOptions(request),

View File

@@ -24,7 +24,8 @@ public class FlowValidateCommand extends AbstractValidateCommand {
private FlowService flowService;
@Inject
private TenantIdSelectorService tenantService;
private TenantIdSelectorService tenantIdSelectorService;
@Override
public Integer call() throws Exception {
@@ -39,7 +40,7 @@ public class FlowValidateCommand extends AbstractValidateCommand {
FlowWithSource flow = (FlowWithSource) object;
List<String> warnings = new ArrayList<>();
warnings.addAll(flowService.deprecationPaths(flow).stream().map(deprecation -> deprecation + " is deprecated").toList());
warnings.addAll(flowService.warnings(flow, tenantService.getTenantId(tenantId)));
warnings.addAll(flowService.warnings(flow, tenantIdSelectorService.getTenantIdAndAllowEETenants(tenantId)));
return warnings;
},
(Object object) -> {

View File

@@ -64,7 +64,7 @@ public class FlowNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCo
}
try(DefaultHttpClient client = client()) {
MutableHttpRequest<String> request = HttpRequest
.POST(apiUri("/flows/", tenantService.getTenantId(tenantId)) + namespace + "?delete=" + delete, body).contentType(MediaType.APPLICATION_YAML);
.POST(apiUri("/flows/", tenantService.getTenantIdAndAllowEETenants(tenantId)) + namespace + "?delete=" + delete, body).contentType(MediaType.APPLICATION_YAML);
List<UpdateResult> updated = client.toBlocking().retrieve(
this.requestOptions(request),

View File

@@ -49,7 +49,7 @@ public class NamespaceFilesUpdateCommand extends AbstractApiCommand {
try (var files = Files.walk(from); DefaultHttpClient client = client()) {
if (delete) {
client.toBlocking().exchange(this.requestOptions(HttpRequest.DELETE(apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/files?path=" + to, null)));
client.toBlocking().exchange(this.requestOptions(HttpRequest.DELETE(apiUri("/namespaces/", tenantService.getTenantIdAndAllowEETenants(tenantId)) + namespace + "/files?path=" + to, null)));
}
KestraIgnore kestraIgnore = new KestraIgnore(from);
@@ -67,7 +67,7 @@ public class NamespaceFilesUpdateCommand extends AbstractApiCommand {
client.toBlocking().exchange(
this.requestOptions(
HttpRequest.POST(
apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/files?path=" + destination,
apiUri("/namespaces/", tenantService.getTenantIdAndAllowEETenants(tenantId)) + namespace + "/files?path=" + destination,
body
).contentType(MediaType.MULTIPART_FORM_DATA)
)

View File

@@ -3,7 +3,7 @@ package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.ServerType;
import io.kestra.core.runners.ExecutorInterface;
import io.kestra.executor.SkipExecutionService;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.services.StartExecutorService;
import io.kestra.core.utils.Await;
import io.micronaut.context.ApplicationContext;

View File

@@ -4,10 +4,13 @@ import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.ServerType;
import io.kestra.core.runners.Indexer;
import io.kestra.core.utils.Await;
import io.kestra.core.services.SkipExecutionService;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import picocli.CommandLine;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@CommandLine.Command(
@@ -17,6 +20,11 @@ import java.util.Map;
public class IndexerCommand extends AbstractServerCommand {
@Inject
private ApplicationContext applicationContext;
@Inject
private SkipExecutionService skipExecutionService;
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting purpose only")
private List<String> skipIndexerRecords = Collections.emptyList();
@SuppressWarnings("unused")
public static Map<String, Object> propertiesOverrides() {
@@ -27,6 +35,8 @@ public class IndexerCommand extends AbstractServerCommand {
@Override
public Integer call() throws Exception {
this.skipExecutionService.setSkipIndexerRecords(skipIndexerRecords);
super.call();
Indexer indexer = applicationContext.getBean(Indexer.class);

View File

@@ -7,7 +7,7 @@ import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.cli.StandAloneRunner;
import io.kestra.executor.SkipExecutionService;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.services.StartExecutorService;
import io.kestra.core.utils.Await;
import io.micronaut.context.ApplicationContext;
@@ -63,6 +63,9 @@ public class StandAloneCommand extends AbstractServerCommand {
@CommandLine.Option(names = {"--skip-tenants"}, split=",", description = "a list of tenants to skip, separated by a coma; for troubleshooting purpose only")
private List<String> skipTenants = Collections.emptyList();
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting purpose only")
private List<String> skipIndexerRecords = Collections.emptyList();
@CommandLine.Option(names = {"--no-tutorials"}, description = "Flag to disable auto-loading of tutorial flows.")
boolean tutorialsDisabled = false;
@@ -93,6 +96,7 @@ public class StandAloneCommand extends AbstractServerCommand {
this.skipExecutionService.setSkipFlows(skipFlows);
this.skipExecutionService.setSkipNamespaces(skipNamespaces);
this.skipExecutionService.setSkipTenants(skipTenants);
this.skipExecutionService.setSkipIndexerRecords(skipIndexerRecords);
this.startExecutorService.applyOptions(startExecutors, notStartExecutors);
KestraContext.getContext().injectWorkerConfigs(workerThread, null);

View File

@@ -5,12 +5,15 @@ import io.kestra.core.models.ServerType;
import io.kestra.core.runners.Indexer;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.core.services.SkipExecutionService;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import picocli.CommandLine.Option;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -28,11 +31,17 @@ public class WebServerCommand extends AbstractServerCommand {
@Inject
private ExecutorsUtils executorsUtils;
@Inject
private SkipExecutionService skipExecutionService;
@Option(names = {"--no-tutorials"}, description = "Flag to disable auto-loading of tutorial flows.")
boolean tutorialsDisabled = false;
private boolean tutorialsDisabled = false;
@Option(names = {"--no-indexer"}, description = "Flag to disable starting an embedded indexer.")
boolean indexerDisabled = false;
private boolean indexerDisabled = false;
@CommandLine.Option(names = {"--skip-indexer-records"}, split=",", description = "a list of indexer record keys, separated by a coma; for troubleshooting purpose only")
private List<String> skipIndexerRecords = Collections.emptyList();
@Override
public boolean isFlowAutoLoadEnabled() {
@@ -48,6 +57,8 @@ public class WebServerCommand extends AbstractServerCommand {
@Override
public Integer call() throws Exception {
this.skipExecutionService.setSkipIndexerRecords(skipIndexerRecords);
super.call();
// start the indexer

View File

@@ -6,6 +6,7 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.ExecutionQueued;
import io.kestra.core.services.ConcurrencyLimitService;
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStorage;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
@@ -15,8 +16,6 @@ import picocli.CommandLine;
import java.util.Optional;
import static io.kestra.core.utils.Rethrow.throwConsumer;
@CommandLine.Command(
name = "submit-queued-execution",
description = {"Submit all queued execution to the executor",
@@ -49,9 +48,11 @@ public class SubmitQueuedCommand extends AbstractCommand {
}
else if (queueType.get().equals("postgres") || queueType.get().equals("mysql") || queueType.get().equals("h2")) {
var executionQueuedStorage = applicationContext.getBean(AbstractJdbcExecutionQueuedStorage.class);
var concurrencyLimitService = applicationContext.getBean(ConcurrencyLimitService.class);
for (ExecutionQueued queued : executionQueuedStorage.getAllForAllTenants()) {
executionQueuedStorage.pop(queued.getTenantId(), queued.getNamespace(), queued.getFlowId(), throwConsumer(execution -> executionQueue.emit(execution.withState(State.Type.CREATED))));
Execution restart = concurrencyLimitService.unqueue(queued.getExecution(), State.Type.RUNNING);
executionQueue.emit(restart);
cpt++;
}
}

View File

@@ -49,7 +49,7 @@ public class TemplateNamespaceUpdateCommand extends AbstractServiceNamespaceUpda
try (DefaultHttpClient client = client()) {
MutableHttpRequest<List<Template>> request = HttpRequest
.POST(apiUri("/templates/", tenantService.getTenantId(tenantId)) + namespace + "?delete=" + delete, templates);
.POST(apiUri("/templates/", tenantService.getTenantIdAndAllowEETenants(tenantId)) + namespace + "?delete=" + delete, templates);
List<UpdateResult> updated = client.toBlocking().retrieve(
this.requestOptions(request),

View File

@@ -16,4 +16,11 @@ public class TenantIdSelectorService {
}
return MAIN_TENANT;
}
public String getTenantIdAndAllowEETenants(String tenantId) {
if (StringUtils.isNotBlank(tenantId)){
return tenantId;
}
return MAIN_TENANT;
}
}

View File

@@ -167,6 +167,9 @@ kestra:
open-urls:
- "/ping"
- "/api/v1/executions/webhook/"
- "/api/v1/main/executions/webhook/"
- "/api/v1/*/executions/webhook/"
- "/api/v1/basicAuthValidationErrors"
preview:
initial-rows: 100

View File

@@ -27,6 +27,26 @@ class FlowValidateCommandTest {
}
}
@Test
// github action kestra-io/validate-action requires being able to validate Flows from OSS CLI against a remote EE instance
void runForEEInstance() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
String[] args = {
"--tenant",
"some-ee-tenant",
"--local",
"src/test/resources/helper/include.yaml"
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("✓ - io.kestra.cli / include");
}
}
@Test
void warning() {
ByteArrayOutputStream out = new ByteArrayOutputStream();

View File

@@ -7,6 +7,8 @@ import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
/**
* Top-level marker interface for Kestra's plugin of type App.
*/
@@ -18,6 +20,6 @@ public interface AppBlockInterface extends io.kestra.core.models.Plugin {
)
@NotNull
@NotBlank
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
String getType();
}

View File

@@ -7,6 +7,8 @@ import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
/**
* Top-level marker interface for Kestra's plugin of type App.
*/
@@ -18,6 +20,6 @@ public interface AppPluginInterface extends io.kestra.core.models.Plugin {
)
@NotNull
@NotBlank
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
String getType();
}

View File

@@ -15,6 +15,7 @@ import com.github.victools.jsonschema.generator.impl.DefinitionKey;
import com.github.victools.jsonschema.generator.naming.DefaultSchemaDefinitionNamingStrategy;
import com.github.victools.jsonschema.module.jackson.JacksonModule;
import com.github.victools.jsonschema.module.jackson.JacksonOption;
import com.github.victools.jsonschema.module.jackson.JsonUnwrappedDefinitionProvider;
import com.github.victools.jsonschema.module.jakarta.validation.JakartaValidationModule;
import com.github.victools.jsonschema.module.jakarta.validation.JakartaValidationOption;
import com.github.victools.jsonschema.module.swagger2.Swagger2Module;
@@ -45,6 +46,9 @@ import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.*;
import java.time.*;
@@ -58,7 +62,9 @@ import static io.kestra.core.docs.AbstractClassDocumentation.required;
import static io.kestra.core.serializers.JacksonMapper.MAP_TYPE_REFERENCE;
@Singleton
@Slf4j
public class JsonSchemaGenerator {
private static final List<Class<?>> TYPES_RESOLVED_AS_STRING = List.of(Duration.class, LocalTime.class, LocalDate.class, LocalDateTime.class, ZonedDateTime.class, OffsetDateTime.class, OffsetTime.class);
private static final List<Class<?>> SUBTYPE_RESOLUTION_EXCLUSION_FOR_PLUGIN_SCHEMA = List.of(Task.class, AbstractTrigger.class);
@@ -118,7 +124,7 @@ public class JsonSchemaGenerator {
removeRequiredOnPropsWithDefaults(objectNode);
return MAPPER.convertValue(objectNode, MAP_TYPE_REFERENCE);
} catch (IllegalArgumentException e) {
} catch (Exception e) {
throw new IllegalArgumentException("Unable to generate jsonschema for '" + cls.getName() + "'", e);
}
}
@@ -270,8 +276,22 @@ public class JsonSchemaGenerator {
.with(Option.DEFINITIONS_FOR_ALL_OBJECTS)
.with(Option.DEFINITION_FOR_MAIN_SCHEMA)
.with(Option.PLAIN_DEFINITION_KEYS)
.with(Option.ALLOF_CLEANUP_AT_THE_END);;
.with(Option.ALLOF_CLEANUP_AT_THE_END);
// HACK: Registered a custom JsonUnwrappedDefinitionProvider prior to the JacksonModule
// to be able to return an CustomDefinition with an empty node when the ResolvedType can't be found.
builder.forTypesInGeneral().withCustomDefinitionProvider(new JsonUnwrappedDefinitionProvider(){
@Override
public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) {
try {
return super.provideCustomSchemaDefinition(javaType, context);
} catch (NoClassDefFoundError e) {
// This error happens when a non-supported plugin type exists in the classpath.
log.debug("Cannot create schema definition for type '{}'. Cause: NoClassDefFoundError", javaType.getTypeName());
return new CustomDefinition(context.getGeneratorConfig().createObjectNode(), true);
}
}
});
if (!draft7) {
builder.with(new JacksonModule(JacksonOption.IGNORE_TYPE_INFO_TRANSFORM));
} else {
@@ -300,6 +320,7 @@ public class JsonSchemaGenerator {
// inline some type
builder.forTypesInGeneral()
.withCustomDefinitionProvider(new CustomDefinitionProviderV2() {
@Override
public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) {
if (javaType.isInstanceOf(Map.class) || javaType.isInstanceOf(Enum.class)) {

View File

@@ -3,30 +3,88 @@ package io.kestra.core.events;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.context.ServerRequestContext;
import lombok.AllArgsConstructor;
import lombok.Getter;
@AllArgsConstructor
import java.util.Objects;
@Getter
public class CrudEvent<T> {
T model;
private final T model;
@Nullable
T previousModel;
CrudEventType type;
HttpRequest<?> request;
private final T previousModel;
private final CrudEventType type;
private final HttpRequest<?> request;
/**
* Static helper method for creating a new {@link CrudEventType#UPDATE} CrudEvent.
*
* @param model the new created model.
* @param <T> type of the model.
* @return the new {@link CrudEvent}.
*/
public static <T> CrudEvent<T> create(T model) {
Objects.requireNonNull(model, "Can't create CREATE event with a null model");
return new CrudEvent<>(model, null, CrudEventType.CREATE);
}
/**
* Static helper method for creating a new {@link CrudEventType#DELETE} CrudEvent.
*
* @param model the deleted model.
* @param <T> type of the model.
* @return the new {@link CrudEvent}.
*/
public static <T> CrudEvent<T> delete(T model) {
Objects.requireNonNull(model, "Can't create DELETE event with a null model");
return new CrudEvent<>(null, model, CrudEventType.DELETE);
}
/**
* Static helper method for creating a new CrudEvent.
*
* @param before the model before the update.
* @param after the model after the update.
* @param <T> type of the model.
* @return the new {@link CrudEvent}.
*/
public static <T> CrudEvent<T> of(T before, T after) {
if (before == null && after == null) {
throw new IllegalArgumentException("Both before and after cannot be null");
}
if (before == null) {
return create(after);
}
if (after == null) {
return delete(before);
}
return new CrudEvent<>(after, before, CrudEventType.UPDATE);
}
/**
* @deprecated use the static factory methods.
*/
@Deprecated
public CrudEvent(T model, CrudEventType type) {
this.model = model;
this.type = type;
this.previousModel = null;
this.request = ServerRequestContext.currentRequest().orElse(null);
this(
CrudEventType.DELETE.equals(type) ? null : model,
CrudEventType.DELETE.equals(type) ? model : null,
type,
ServerRequestContext.currentRequest().orElse(null)
);
}
public CrudEvent(T model, T previousModel, CrudEventType type) {
this(model, previousModel, type, ServerRequestContext.currentRequest().orElse(null));
}
public CrudEvent(T model, T previousModel, CrudEventType type, HttpRequest<?> request) {
this.model = model;
this.previousModel = previousModel;
this.type = type;
this.request = ServerRequestContext.currentRequest().orElse(null);
this.request = request;
}
}

View File

@@ -1,13 +1,15 @@
package io.kestra.core.models;
import io.kestra.core.utils.MapUtils;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.NotEmpty;
import java.util.*;
import java.util.stream.Collectors;
public record Label(@NotNull String key, @NotNull String value) {
@Schema(description = "A key/value pair that can be attached to a Flow or Execution. Labels are often used to organize and categorize objects.")
public record Label(@NotEmpty String key, @NotEmpty String value) {
public static final String SYSTEM_PREFIX = "system.";
// system labels

View File

@@ -12,6 +12,8 @@ import lombok.experimental.SuperBuilder;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
@io.kestra.core.models.annotations.Plugin
@SuperBuilder
@Getter
@@ -20,6 +22,6 @@ import jakarta.validation.constraints.Pattern;
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public abstract class Condition implements Plugin, Rethrow.PredicateChecked<ConditionContext, InternalException> {
@NotNull
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
protected String type;
}

View File

@@ -20,6 +20,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
@SuperBuilder(toBuilder = true)
@Getter
@NoArgsConstructor
@@ -28,7 +30,7 @@ import java.util.Set;
public abstract class DataFilter<F extends Enum<F>, C extends ColumnDescriptor<F>> implements io.kestra.core.models.Plugin, IData<F> {
@NotNull
@NotBlank
@Pattern(regexp = "\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
private String type;
private Map<String, C> columns;

View File

@@ -19,6 +19,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
@SuperBuilder(toBuilder = true)
@Getter
@NoArgsConstructor
@@ -27,7 +29,7 @@ import java.util.Set;
public abstract class DataFilterKPI<F extends Enum<F>, C extends ColumnDescriptor<F>> implements io.kestra.core.models.Plugin, IData<F> {
@NotNull
@NotBlank
@Pattern(regexp = "\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
private String type;
private C columns;

View File

@@ -12,6 +12,8 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
@SuperBuilder(toBuilder = true)
@Getter
@NoArgsConstructor
@@ -26,7 +28,7 @@ public abstract class Chart<P extends ChartOption> implements io.kestra.core.mod
@NotNull
@NotBlank
@Pattern(regexp = "\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
protected String type;
@Valid

View File

@@ -272,7 +272,7 @@ public class Execution implements DeletedInterface, TenantInterface {
}
public Execution withTaskRun(TaskRun taskRun) throws InternalException {
ArrayList<TaskRun> newTaskRunList = new ArrayList<>(this.taskRunList);
ArrayList<TaskRun> newTaskRunList = this.taskRunList == null ? new ArrayList<>() : new ArrayList<>(this.taskRunList);
boolean b = Collections.replaceAll(
newTaskRunList,

View File

@@ -10,6 +10,7 @@ import io.swagger.v3.oas.annotations.Hidden;
import jakarta.annotation.Nullable;
import lombok.Builder;
import lombok.Value;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.event.Level;
import jakarta.validation.constraints.NotNull;
@@ -120,6 +121,16 @@ public class LogEntry implements DeletedInterface, TenantInterface {
return logEntry.getTimestamp().toString() + " " + logEntry.getLevel() + " " + logEntry.getMessage();
}
public static String toPrettyString(LogEntry logEntry, Integer maxMessageSize) {
String message;
if (maxMessageSize != null && maxMessageSize > 0) {
message = StringUtils.truncate(logEntry.getMessage(), maxMessageSize);
} else {
message = logEntry.getMessage();
}
return logEntry.getTimestamp().toString() + " " + logEntry.getLevel() + " " + message;
}
public Map<String, String> toMap() {
return Stream
.of(

View File

@@ -52,6 +52,7 @@ public class TaskRun implements TenantInterface {
@With
@JsonInclude(JsonInclude.Include.ALWAYS)
@Nullable
Variables outputs;
@NotNull
@@ -64,7 +65,6 @@ public class TaskRun implements TenantInterface {
Boolean dynamic;
// Set it to true to force execution even if the execution is killed
@Nullable
@With
Boolean forceExecution;
@@ -217,7 +217,7 @@ public class TaskRun implements TenantInterface {
public boolean isSame(TaskRun taskRun) {
return this.getId().equals(taskRun.getId()) &&
((this.getValue() == null && taskRun.getValue() == null) || (this.getValue() != null && this.getValue().equals(taskRun.getValue()))) &&
((this.getIteration() == null && taskRun.getIteration() == null) || (this.getIteration() != null && this.getIteration().equals(taskRun.getIteration()))) ;
((this.getIteration() == null && taskRun.getIteration() == null) || (this.getIteration() != null && this.getIteration().equals(taskRun.getIteration())));
}
public String toString(boolean pretty) {
@@ -249,7 +249,7 @@ public class TaskRun implements TenantInterface {
* This method is used when the retry is apply on a task
* but the retry type is NEW_EXECUTION
*
* @param retry Contains the retry configuration
* @param retry Contains the retry configuration
* @param execution Contains the attempt number and original creation date
* @return The next retry date, null if maxAttempt || maxDuration is reached
*/
@@ -270,6 +270,7 @@ public class TaskRun implements TenantInterface {
/**
* This method is used when the Retry definition comes from the flow
*
* @param retry The retry configuration
* @return The next retry date, null if maxAttempt || maxDuration is reached
*/

View File

@@ -3,7 +3,6 @@ package io.kestra.core.models.flows;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import io.kestra.core.models.Label;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.WorkerGroup;
import io.kestra.core.serializers.ListOrMapOfLabelDeserializer;
import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
@@ -61,12 +60,24 @@ public abstract class AbstractFlow implements FlowInterface {
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
@JsonDeserialize(using = ListOrMapOfLabelDeserializer.class)
@Schema(implementation = Object.class, oneOf = {List.class, Map.class})
@Schema(
description = "Labels as a list of Label (key/value pairs) or as a map of string to string.",
oneOf = {
Label[].class,
Map.class
}
)
@Valid
List<Label> labels;
@Schema(additionalProperties = Schema.AdditionalPropertiesValue.TRUE)
@Schema(
type = "object",
additionalProperties = Schema.AdditionalPropertiesValue.FALSE
)
Map<String, Object> variables;
@Valid
private WorkerGroup workerGroup;
}

View File

@@ -61,6 +61,11 @@ public class Flow extends AbstractFlow implements HasUID {
}
});
@Schema(
type = "object",
additionalProperties = Schema.AdditionalPropertiesValue.FALSE
)
Map<String, Object> variables;
@Valid

View File

@@ -5,7 +5,6 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.kestra.core.models.flows.input.*;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.ConstraintViolationException;
@@ -18,8 +17,6 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.function.Function;
@SuppressWarnings("deprecation")
@SuperBuilder
@Getter

View File

@@ -1,6 +1,7 @@
package io.kestra.core.models.flows;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -33,6 +34,12 @@ public class Output implements Data {
* The output value. Can be a dynamic expression.
*/
@NotNull
@Schema(
oneOf = {
Object.class,
String.class
}
)
Object value;
/**

View File

@@ -2,6 +2,7 @@ package io.kestra.core.models.flows;
import io.kestra.core.validations.PluginDefaultValidation;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -21,6 +22,10 @@ public class PluginDefault {
@Builder.Default
private final boolean forced = false;
@Schema(
type = "object",
additionalProperties = Schema.AdditionalPropertiesValue.FALSE
)
private final Map<String, Object> values;
}

View File

@@ -12,6 +12,7 @@ import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
@@ -36,6 +37,12 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
@Builder
@NoArgsConstructor
@AllArgsConstructor(access = AccessLevel.PACKAGE)
@Schema(
oneOf = {
Object.class,
String.class
}
)
public class Property<T> {
// By default, durations are stored as numbers.
// We cannot change that globally, as in JDBC/Elastic 'execution.state.duration' must be a number to be able to aggregate them.
@@ -68,7 +75,7 @@ public class Property<T> {
String getExpression() {
return expression;
}
/**
* Returns a new {@link Property} with no cached rendered value,
* so that the next render will evaluate its original Pebble expression.
@@ -84,9 +91,9 @@ public class Property<T> {
/**
* Build a new Property object with a value already set.<br>
*
* <p>
* A property build with this method will always return the value passed at build time, no rendering will be done.
*
* <p>
* Use {@link #ofExpression(String)} to build a property with a Pebble expression instead.
*/
public static <V> Property<V> ofValue(V value) {
@@ -126,12 +133,12 @@ public class Property<T> {
/**
* Build a new Property object with a Pebble expression.<br>
*
* <p>
* Use {@link #ofValue(Object)} to build a property with a value instead.
*/
public static <V> Property<V> ofExpression(@NotNull String expression) {
Objects.requireNonNull(expression, "'expression' is required");
if(!expression.contains("{")) {
if (!expression.contains("{")) {
throw new IllegalArgumentException("'expression' must be a valid Pebble expression");
}
@@ -140,7 +147,7 @@ public class Property<T> {
/**
* Render a property then convert it to its target type.<br>
*
* <p>
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see io.kestra.core.runners.RunContextProperty#as(Class)
@@ -151,14 +158,14 @@ public class Property<T> {
/**
* Render a property with additional variables, then convert it to its target type.<br>
*
* <p>
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see io.kestra.core.runners.RunContextProperty#as(Class, Map)
*/
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.value == null) {
String rendered = context.render(property.expression, variables);
String rendered = context.render(property.expression, variables);
property.value = MAPPER.convertValue(rendered, clazz);
}
@@ -167,7 +174,7 @@ public class Property<T> {
/**
* Render a property then convert it as a list of target type.<br>
*
* <p>
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see io.kestra.core.runners.RunContextProperty#asList(Class)
@@ -178,7 +185,7 @@ public class Property<T> {
/**
* Render a property with additional variables, then convert it as a list of target type.<br>
*
* <p>
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see io.kestra.core.runners.RunContextProperty#asList(Class, Map)
@@ -218,25 +225,25 @@ public class Property<T> {
/**
* Render a property then convert it as a map of target types.<br>
*
* <p>
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
*
* @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class)
*/
public static <T, K,V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
return asMap(property, runContext, keyClass, valueClass, Map.of());
}
/**
* Render a property with additional variables, then convert it as a map of target types.<br>
*
* <p>
* This method is safe to be used as many times as you want as the rendering and conversion will be cached.
* Warning, due to the caching mechanism, this method is not thread-safe.
*
* @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class, Map)
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public static <T, K,V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.value == null) {
JavaType targetMapType = MAPPER.getTypeFactory().constructMapType(Map.class, keyClass, valueClass);

View File

@@ -8,6 +8,8 @@ import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public interface TaskInterface extends Plugin, PluginVersioning {
@NotNull
@@ -17,7 +19,7 @@ public interface TaskInterface extends Plugin, PluginVersioning {
@NotNull
@NotBlank
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
@Schema(title = "The class name of this task.")
String getType();
}

View File

@@ -11,6 +11,8 @@ import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import reactor.core.publisher.Flux;
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
@Plugin
@SuperBuilder(toBuilder = true)
@Getter
@@ -22,7 +24,7 @@ public abstract class LogExporter<T extends Output> implements io.kestra.core.m
protected String id;
@NotBlank
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
protected String type;
public abstract T sendLogs(RunContext runContext, Flux<LogRecord> logRecords) throws Exception;

View File

@@ -8,12 +8,16 @@ public final class LogRecordMapper {
private LogRecordMapper(){}
public static LogRecord mapToLogRecord(LogEntry log) {
return mapToLogRecord(log, null);
}
public static LogRecord mapToLogRecord(LogEntry log, Integer maxMessageSize) {
return LogRecord.builder()
.resource("Kestra")
.timestampEpochNanos(instantInNanos(log.getTimestamp()))
.severity(log.getLevel().name())
.attributes(log.toLogMap())
.bodyValue(LogEntry.toPrettyString(log))
.bodyValue(LogEntry.toPrettyString(log, maxMessageSize))
.build();
}

View File

@@ -1,3 +1,11 @@
package io.kestra.core.models.tasks.runners;
public interface RemoteRunnerInterface {}
import io.kestra.core.models.property.Property;
import io.swagger.v3.oas.annotations.media.Schema;
public interface RemoteRunnerInterface {
@Schema(
title = "Whether to synchronize working directory from remote runner back to local one after run."
)
Property<Boolean> getSyncWorkingDirectory();
}

View File

@@ -30,6 +30,10 @@ public interface TaskCommands {
Map<String, Object> getAdditionalVars();
default String outputDirectoryName() {
return this.getWorkingDirectory().relativize(this.getOutputDirectory()).toString();
}
Path getWorkingDirectory();
Path getOutputDirectory();

View File

@@ -7,7 +7,6 @@ import io.kestra.core.models.Plugin;
import io.kestra.core.models.PluginVersioning;
import io.kestra.core.models.WorkerJobLifecycle;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.core.runner.Process;
import jakarta.validation.constraints.NotBlank;
@@ -19,13 +18,14 @@ import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import org.apache.commons.lang3.SystemUtils;
import java.io.IOException;
import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import static io.kestra.core.utils.WindowsUtils.windowsToUnixPath;
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
/**
* Base class for all task runners.
@@ -37,7 +37,7 @@ import static io.kestra.core.utils.WindowsUtils.windowsToUnixPath;
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public abstract class TaskRunner<T extends TaskRunnerDetailResult> implements Plugin, PluginVersioning, WorkerJobLifecycle {
@NotBlank
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
protected String type;
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)

View File

@@ -47,9 +47,9 @@ abstract public class AbstractTrigger implements TriggerInterface {
@Valid
protected List<@Valid @NotNull Condition> conditions;
@NotNull
@Builder.Default
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
@Schema(defaultValue = "false")
private boolean disabled = false;
@Valid

View File

@@ -185,34 +185,6 @@ public class Trigger extends TriggerContext implements HasUID {
.build();
}
public static Trigger update(Trigger currentTrigger, Trigger newTrigger, ZonedDateTime nextExecutionDate) throws Exception {
Trigger updated = currentTrigger;
// If a backfill is created, we update the currentTrigger
// and set the nextExecutionDate() as the previous one
if (newTrigger.getBackfill() != null) {
updated = currentTrigger.toBuilder()
.backfill(
newTrigger
.getBackfill()
.toBuilder()
.end(newTrigger.getBackfill().getEnd() != null ? newTrigger.getBackfill().getEnd() : ZonedDateTime.now())
.currentDate(
newTrigger.getBackfill().getStart()
)
.previousNextExecutionDate(
currentTrigger.getNextExecutionDate())
.build())
.build();
}
return updated.toBuilder()
.nextExecutionDate(newTrigger.getDisabled() ?
null : nextExecutionDate)
.disabled(newTrigger.getDisabled())
.build();
}
public Trigger resetExecution(Flow flow, Execution execution, ConditionContext conditionContext) {
boolean disabled = this.getStopAfter() != null ? this.getStopAfter().contains(execution.getState().getCurrent()) : this.getDisabled();
if (!disabled) {
@@ -276,27 +248,22 @@ public class Trigger extends TriggerContext implements HasUID {
.build();
}
public Trigger initBackfill(Trigger newTrigger) {
// If a backfill is created, we update the currentTrigger
public Trigger withBackfill(final Backfill backfill) {
Trigger updated = this;
// If a backfill is created, we update the trigger
// and set the nextExecutionDate() as the previous one
if (newTrigger.getBackfill() != null) {
return this.toBuilder()
if (backfill != null) {
updated = this.toBuilder()
.backfill(
newTrigger
.getBackfill()
backfill
.toBuilder()
.end(newTrigger.getBackfill().getEnd() != null ? newTrigger.getBackfill().getEnd() : ZonedDateTime.now())
.currentDate(
newTrigger.getBackfill().getStart()
)
.previousNextExecutionDate(
this.getNextExecutionDate())
.end(backfill.getEnd() != null ? backfill.getEnd() : ZonedDateTime.now())
.currentDate(backfill.getStart())
.previousNextExecutionDate(this.getNextExecutionDate())
.build())
.build();
}
return this;
return updated;
}
// if the next date is after the backfill end, we remove the backfill

View File

@@ -4,6 +4,7 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.utils.IdUtils;
import io.micronaut.core.annotation.Introspected;
import io.micronaut.core.annotation.Nullable;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import lombok.Getter;
@@ -46,6 +47,7 @@ public class TriggerContext {
@Nullable
private List<State.Type> stopAfter;
@Schema(defaultValue = "false")
private Boolean disabled = Boolean.FALSE;
protected TriggerContext(TriggerContextBuilder<?, ?> b) {

View File

@@ -7,6 +7,7 @@ import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
public interface TriggerInterface extends Plugin, PluginVersioning {
@NotNull
@@ -17,7 +18,7 @@ public interface TriggerInterface extends Plugin, PluginVersioning {
@NotNull
@NotBlank
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
@Schema(title = "The class name for this current trigger.")
String getType();

View File

@@ -8,6 +8,8 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import static io.kestra.core.utils.RegexPatterns.JAVA_IDENTIFIER_REGEX;
@io.kestra.core.models.annotations.Plugin
@SuperBuilder(toBuilder = true)
@Getter
@@ -15,6 +17,6 @@ import lombok.experimental.SuperBuilder;
public abstract class AdditionalPlugin implements Plugin {
@NotNull
@NotBlank
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
@Pattern(regexp = JAVA_IDENTIFIER_REGEX)
protected String type;
}

View File

@@ -6,6 +6,12 @@ import lombok.Getter;
import lombok.ToString;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Enumeration;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.zip.CRC32;
@AllArgsConstructor
@Getter
@@ -14,5 +20,59 @@ import java.net.URL;
public class ExternalPlugin {
private final URL location;
private final URL[] resources;
private final long crc32;
private volatile Long crc32; // lazy-val
public ExternalPlugin(URL location, URL[] resources) {
this.location = location;
this.resources = resources;
}
public Long getCrc32() {
if (this.crc32 == null) {
synchronized (this) {
if (this.crc32 == null) {
this.crc32 = computeJarCrc32(location);
}
}
}
return crc32;
}
/**
* Compute a CRC32 of the JAR File without reading the whole file
*
* @param location of the JAR File.
* @return the CRC32 of {@code -1} if the checksum can't be computed.
*/
private static long computeJarCrc32(final URL location) {
CRC32 crc = new CRC32();
try (JarFile jar = new JarFile(location.toURI().getPath(), false)) {
Enumeration<JarEntry> entries = jar.entries();
byte[] buffer = new byte[Long.BYTES]; // reusable buffer to avoid re-allocation
while (entries.hasMoreElements()) {
JarEntry entry = entries.nextElement();
crc.update(entry.getName().getBytes(StandardCharsets.UTF_8));
updateCrc32WithLong(crc, buffer, entry.getSize());
updateCrc32WithLong(crc, buffer, entry.getCrc());
}
return crc.getValue();
} catch (Exception e) {
return -1;
}
}
private static void updateCrc32WithLong(CRC32 crc32, byte[] reusable, long val) {
// fast long -> byte conversion
reusable[0] = (byte) (val >>> 56);
reusable[1] = (byte) (val >>> 48);
reusable[2] = (byte) (val >>> 40);
reusable[3] = (byte) (val >>> 32);
reusable[4] = (byte) (val >>> 24);
reusable[5] = (byte) (val >>> 16);
reusable[6] = (byte) (val >>> 8);
reusable[7] = (byte) val;
crc32.update(reusable);;
}
}

View File

@@ -46,6 +46,7 @@ public class PluginClassLoader extends URLClassLoader {
+ "|dev.failsafe"
+ "|reactor"
+ "|io.opentelemetry"
+ "|io.netty"
+ ")\\..*$");
private final ClassLoader parent;

View File

@@ -51,8 +51,7 @@ public class PluginResolver {
final List<URL> resources = resolveUrlsForPluginPath(path);
plugins.add(new ExternalPlugin(
path.toUri().toURL(),
resources.toArray(new URL[0]),
computeJarCrc32(path)
resources.toArray(new URL[0])
));
}
} catch (final InvalidPathException | MalformedURLException e) {
@@ -124,33 +123,5 @@ public class PluginResolver {
return urls;
}
/**
* Compute a CRC32 of the JAR File without reading the whole file
*
* @param location of the JAR File.
* @return the CRC32 of {@code -1} if the checksum can't be computed.
*/
private static long computeJarCrc32(final Path location) {
CRC32 crc = new CRC32();
try (JarFile jar = new JarFile(location.toFile(), false)) {
Enumeration<JarEntry> entries = jar.entries();
while (entries.hasMoreElements()) {
JarEntry entry = entries.nextElement();
crc.update(entry.getName().getBytes());
crc.update(longToBytes(entry.getSize()));
crc.update(longToBytes(entry.getCrc()));
}
} catch (Exception e) {
return -1;
}
return crc.getValue();
}
private static byte[] longToBytes(long x) {
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
buffer.putLong(x);
return buffer.array();
}
}

View File

@@ -26,7 +26,7 @@ public interface QueueFactoryInterface {
String SUBFLOWEXECUTIONRESULT_NAMED = "subflowExecutionResultQueue";
String CLUSTER_EVENT_NAMED = "clusterEventQueue";
String SUBFLOWEXECUTIONEND_NAMED = "subflowExecutionEndQueue";
String EXECUTION_RUNNING_NAMED = "executionRunningQueue";
String MULTIPLE_CONDITION_EVENT_NAMED = "multipleConditionEventQueue";
QueueInterface<Execution> execution();
@@ -58,5 +58,5 @@ public interface QueueFactoryInterface {
QueueInterface<SubflowExecutionEnd> subflowExecutionEnd();
QueueInterface<ExecutionRunning> executionRunning();
QueueInterface<MultipleConditionEvent> multipleConditionEvent();
}

View File

@@ -106,6 +106,8 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
Integer purge(Execution execution);
Integer purge(List<Execution> executions);
List<DailyExecutionStatistics> dailyStatisticsForAllTenants(
@Nullable String query,
@Nullable String namespace,

View File

@@ -25,8 +25,8 @@ public interface FlowRepositoryInterface {
* Used only if result is used internally and not exposed to the user.
* It is useful when we want to restart/resume a flow.
*/
default Flow findByExecutionWithoutAcl(Execution execution) {
Optional<Flow> find = this.findByIdWithoutAcl(
default FlowWithSource findByExecutionWithoutAcl(Execution execution) {
Optional<FlowWithSource> find = this.findByIdWithSourceWithoutAcl(
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),

View File

@@ -90,6 +90,8 @@ public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry
Integer purge(Execution execution);
Integer purge(List<Execution> executions);
void deleteByQuery(String tenantId, String executionId, String taskId, String taskRunId, Level minLevel, Integer attempt);
void deleteByQuery(String tenantId, String namespace, String flowId, String triggerId);

View File

@@ -29,6 +29,8 @@ public interface MetricRepositoryInterface extends SaveRepositoryInterface<Metri
Integer purge(Execution execution);
Integer purge(List<Execution> executions);
Flux<MetricEntry> findAllAsync(@Nullable String tenantId);
default Function<String, String> sortMapping() throws IllegalArgumentException {

View File

@@ -0,0 +1,31 @@
package io.kestra.core.runners;
import io.kestra.core.models.HasUID;
import io.kestra.core.utils.IdUtils;
import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Value;
import lombok.With;
@Value
@AllArgsConstructor
@Builder
public class ConcurrencyLimit implements HasUID {
@NotNull
String tenantId;
@NotNull
String namespace;
@NotNull
String flowId;
@With
Integer running;
@Override
public String uid() {
return IdUtils.fromPartsAndSeparator('|', this.tenantId, this.namespace, this.flowId);
}
}

View File

@@ -32,5 +32,7 @@ public class ExecutionRunning implements HasUID {
return IdUtils.fromPartsAndSeparator('|', this.tenantId, this.namespace, this.flowId, this.execution.getId());
}
public enum ConcurrencyState { CREATED, RUNNING, QUEUED, CANCELLED, FAILED }
// Note: the KILLED state is only used in the Kafka implementation to difference between purging terminated running execution (null)
// and purging killed execution which need special treatment
public enum ConcurrencyState { CREATED, RUNNING, QUEUED, CANCELLED, FAILED, KILLED }
}

View File

@@ -82,6 +82,8 @@ public abstract class FilesService {
}
private static String resolveUniqueNameForFile(final Path path) {
return IdUtils.from(path.toString()) + "-" + path.toFile().getName();
String filename = path.getFileName().toString();
String encodedFilename = java.net.URLEncoder.encode(filename, java.nio.charset.StandardCharsets.UTF_8);
return IdUtils.from(path.toString()) + "-" + encodedFilename;
}
}

View File

@@ -1,7 +1,6 @@
package io.kestra.core.runners;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.encryption.EncryptionService;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.KestraRuntimeException;
@@ -73,31 +72,28 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
public class FlowInputOutput {
private static final Pattern URI_PATTERN = Pattern.compile("^[a-z]+:\\/\\/(?:www\\.)?[-a-zA-Z0-9@:%._\\+~#=]{1,256}\\.[a-zA-Z0-9()]{1,6}\\b(?:[-a-zA-Z0-9()@:%_\\+.~#?&\\/=]*)$");
private static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml();
private final StorageInterface storageInterface;
private final Optional<String> secretKey;
private final RunContextFactory runContextFactory;
private final VariableRenderer variableRenderer;
@Inject
public FlowInputOutput(
StorageInterface storageInterface,
RunContextFactory runContextFactory,
VariableRenderer variableRenderer,
@Nullable @Value("${kestra.encryption.secret-key}") String secretKey
) {
this.storageInterface = storageInterface;
this.runContextFactory = runContextFactory;
this.secretKey = Optional.ofNullable(secretKey);
this.variableRenderer = variableRenderer;
}
/**
* Validate all the inputs of a given execution of a flow.
*
* @param inputs The Flow's inputs.
* @param execution The Execution.
* @param data The Execution's inputs data.
* @param inputs The Flow's inputs.
* @param execution The Execution.
* @param data The Execution's inputs data.
* @return The list of {@link InputAndValue}.
*/
public Mono<List<InputAndValue>> validateExecutionInputs(final List<Input<?>> inputs,
@@ -105,10 +101,11 @@ public class FlowInputOutput {
final Execution execution,
final Publisher<CompletedPart> data) {
if (ListUtils.isEmpty(inputs)) return Mono.just(Collections.emptyList());
return readData(inputs, execution, data, false).map(inputData -> resolveInputs(inputs, flow, execution, inputData));
return readData(inputs, execution, data, false)
.map(inputData -> resolveInputs(inputs, flow, execution, inputData, false));
}
/**
* Reads all the inputs of a given execution of a flow.
*
@@ -122,7 +119,7 @@ public class FlowInputOutput {
final Publisher<CompletedPart> data) {
return this.readExecutionInputs(flow.getInputs(), flow, execution, data);
}
/**
* Reads all the inputs of a given execution of a flow.
*
@@ -137,7 +134,7 @@ public class FlowInputOutput {
final Publisher<CompletedPart> data) {
return readData(inputs, execution, data, true).map(inputData -> this.readExecutionInputs(inputs, flow, execution, inputData));
}
private Mono<Map<String, Object>> readData(List<Input<?>> inputs, Execution execution, Publisher<CompletedPart> data, boolean uploadFiles) {
return Flux.from(data)
.publishOn(Schedulers.boundedElastic())
@@ -220,7 +217,7 @@ public class FlowInputOutput {
final Execution execution,
final Map<String, ?> data
) {
Map<String, Object> resolved = this.resolveInputs(inputs, flow, execution, data)
Map<String, Object> resolved = this.resolveInputs(inputs, flow, execution, data, true)
.stream()
.filter(InputAndValue::enabled)
.map(it -> {
@@ -233,7 +230,7 @@ public class FlowInputOutput {
.collect(HashMap::new, (m,v)-> m.put(v.getKey(), v.getValue()), HashMap::putAll);
return MapUtils.flattenToNestedMap(resolved);
}
/**
* Utility method for retrieving types inputs.
*
@@ -242,12 +239,21 @@ public class FlowInputOutput {
* @param data The Execution's inputs data.
* @return The Map of typed inputs.
*/
@VisibleForTesting
public List<InputAndValue> resolveInputs(
final List<Input<?>> inputs,
final FlowInterface flow,
final Execution execution,
final Map<String, ?> data
) {
return resolveInputs(inputs, flow, execution, data, true);
}
public List<InputAndValue> resolveInputs(
final List<Input<?>> inputs,
final FlowInterface flow,
final Execution execution,
final Map<String, ?> data,
final boolean decryptSecrets
) {
if (inputs == null) {
return Collections.emptyList();
@@ -257,7 +263,7 @@ public class FlowInputOutput {
.map(input -> ResolvableInput.of(input,data.get(input.getId())))
.collect(Collectors.toMap(it -> it.get().input().getId(), Function.identity(), (o1, o2) -> o1, LinkedHashMap::new)));
resolvableInputMap.values().forEach(input -> resolveInputValue(input, flow, execution, resolvableInputMap));
resolvableInputMap.values().forEach(input -> resolveInputValue(input, flow, execution, resolvableInputMap, decryptSecrets));
return resolvableInputMap.values().stream().map(ResolvableInput::get).toList();
}
@@ -267,7 +273,8 @@ public class FlowInputOutput {
final @NotNull ResolvableInput resolvable,
final FlowInterface flow,
final @NotNull Execution execution,
final @NotNull Map<String, ResolvableInput> inputs) {
final @NotNull Map<String, ResolvableInput> inputs,
final boolean decryptSecrets) {
// return immediately if the input is already resolved
if (resolvable.isResolved()) return resolvable.get();
@@ -275,9 +282,10 @@ public class FlowInputOutput {
Input<?> input = resolvable.get().input();
try {
// resolve all input dependencies and check whether input is enabled
final Map<String, InputAndValue> dependencies = resolveAllDependentInputs(input, flow, execution, inputs);
final RunContext runContext = buildRunContextForExecutionAndInputs(flow, execution, dependencies);
// Resolve all input dependencies and check whether input is enabled
// Note: Secrets are always decrypted here because they can be part of expressions used to render inputs such as SELECT & MULTI_SELECT.
final Map<String, InputAndValue> dependencies = resolveAllDependentInputs(input, flow, execution, inputs, true);
final RunContext runContext = buildRunContextForExecutionAndInputs(flow, execution, dependencies, true);
boolean isInputEnabled = dependencies.isEmpty() || dependencies.values().stream().allMatch(InputAndValue::enabled);
@@ -312,13 +320,13 @@ public class FlowInputOutput {
}
});
resolvable.setInput(input);
Object value = resolvable.get().value();
// resolve default if needed
if (value == null && input.getDefaults() != null) {
value = resolveDefaultValue(input, runContext);
RunContext runContextForDefault = decryptSecrets ? runContext : buildRunContextForExecutionAndInputs(flow, execution, dependencies, false);
value = resolveDefaultValue(input, runContextForDefault);
resolvable.isDefault(true);
}
@@ -376,8 +384,8 @@ public class FlowInputOutput {
private static <T> Object resolveDefaultPropertyAsList(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException {
return Property.asList((Property<List<T>>) input.getDefaults(), renderer, clazz);
}
private RunContext buildRunContextForExecutionAndInputs(final FlowInterface flow, final Execution execution, Map<String, InputAndValue> dependencies) {
private RunContext buildRunContextForExecutionAndInputs(final FlowInterface flow, final Execution execution, Map<String, InputAndValue> dependencies, final boolean decryptSecrets) {
Map<String, Object> flattenInputs = MapUtils.flattenToNestedMap(dependencies.entrySet()
.stream()
.collect(HashMap::new, (m, v) -> m.put(v.getKey(), v.getValue().value()), HashMap::putAll)
@@ -391,10 +399,10 @@ public class FlowInputOutput {
flattenInputs.put(input.getId(), null);
}
}
return runContextFactory.of(flow, execution, vars -> vars.withInputs(flattenInputs));
return runContextFactory.of(flow, execution, vars -> vars.withInputs(flattenInputs), decryptSecrets);
}
private Map<String, InputAndValue> resolveAllDependentInputs(final Input<?> input, final FlowInterface flow, final Execution execution, final Map<String, ResolvableInput> inputs) {
private Map<String, InputAndValue> resolveAllDependentInputs(final Input<?> input, final FlowInterface flow, final Execution execution, final Map<String, ResolvableInput> inputs, final boolean decryptSecrets) {
return Optional.ofNullable(input.getDependsOn())
.map(DependsOn::inputs)
.stream()
@@ -402,7 +410,7 @@ public class FlowInputOutput {
.filter(id -> !id.equals(input.getId()))
.map(inputs::get)
.filter(Objects::nonNull) // input may declare unknown or non-necessary dependencies. Let's ignore.
.map(it -> resolveInputValue(it, flow, execution, inputs))
.map(it -> resolveInputValue(it, flow, execution, inputs, decryptSecrets))
.collect(Collectors.toMap(it -> it.input().getId(), Function.identity()));
}

View File

@@ -500,7 +500,7 @@ public class FlowableUtils {
ArrayList<ResolvedTask> result = new ArrayList<>();
int index = 0;
int iteration = 0;
for (Object current : distinctValue) {
try {
String resolvedValue = current instanceof String stringValue ? stringValue : MAPPER.writeValueAsString(current);
@@ -508,7 +508,7 @@ public class FlowableUtils {
result.add(ResolvedTask.builder()
.task(task)
.value(resolvedValue)
.iteration(index++)
.iteration(iteration)
.parentId(parentTaskRun.getId())
.build()
);
@@ -516,6 +516,7 @@ public class FlowableUtils {
} catch (JsonProcessingException e) {
throw new IllegalVariableEvaluationException(e);
}
iteration++;
}
return result;

View File

@@ -0,0 +1,13 @@
package io.kestra.core.runners;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.utils.IdUtils;
public record MultipleConditionEvent(Flow flow, Execution execution) implements HasUID {
@Override
public String uid() {
return IdUtils.fromParts(flow.uidWithoutRevision(), execution.getId());
}
}

View File

@@ -41,6 +41,9 @@ public class RunContextFactory {
@Inject
protected VariableRenderer variableRenderer;
@Inject
protected SecureVariableRendererFactory secureVariableRendererFactory;
@Inject
protected StorageInterface storageInterface;
@@ -82,22 +85,33 @@ public class RunContextFactory {
public RunContext of(FlowInterface flow, Execution execution) {
return of(flow, execution, Function.identity());
}
public RunContext of(FlowInterface flow, Execution execution, boolean decryptVariable) {
return of(flow, execution, Function.identity(), decryptVariable);
}
public RunContext of(FlowInterface flow, Execution execution, Function<RunVariables.Builder, RunVariables.Builder> runVariableModifier) {
return of(flow, execution, runVariableModifier, true);
}
public RunContext of(FlowInterface flow, Execution execution, Function<RunVariables.Builder, RunVariables.Builder> runVariableModifier, boolean decryptVariables) {
RunContextLogger runContextLogger = runContextLoggerFactory.create(execution);
VariableRenderer variableRenderer = decryptVariables ? this.variableRenderer : secureVariableRendererFactory.createOrGet();
return newBuilder()
// Logger
.withLogger(runContextLogger)
// Execution
.withPluginConfiguration(Map.of())
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forExecution(execution), storageInterface, flowService))
.withVariableRenderer(variableRenderer)
.withVariables(runVariableModifier.apply(
newRunVariablesBuilder()
.withFlow(flow)
.withExecution(execution)
.withDecryptVariables(true)
.withSecretInputs(secretInputsFromFlow(flow))
newRunVariablesBuilder()
.withFlow(flow)
.withExecution(execution)
.withDecryptVariables(decryptVariables)
.withSecretInputs(secretInputsFromFlow(flow))
)
.build(runContextLogger, PropertyContext.create(variableRenderer)))
.withSecretInputs(secretInputsFromFlow(flow))
@@ -109,7 +123,7 @@ public class RunContextFactory {
}
public RunContext of(FlowInterface flow, Task task, Execution execution, TaskRun taskRun, boolean decryptVariables) {
return this.of(flow, task, execution, taskRun, decryptVariables, variableRenderer);
return this.of(flow, task, execution, taskRun, decryptVariables, this.variableRenderer);
}
public RunContext of(FlowInterface flow, Task task, Execution execution, TaskRun taskRun, boolean decryptVariables, VariableRenderer variableRenderer) {
@@ -147,7 +161,7 @@ public class RunContextFactory {
.withFlow(flow)
.withTrigger(trigger)
.withSecretInputs(secretInputsFromFlow(flow))
.build(runContextLogger, PropertyContext.create(variableRenderer))
.build(runContextLogger, PropertyContext.create(this.variableRenderer))
)
.withSecretInputs(secretInputsFromFlow(flow))
.withTrigger(trigger)
@@ -226,7 +240,7 @@ public class RunContextFactory {
// inject mandatory services and config
.withApplicationContext(applicationContext) // TODO - ideally application should not be injected here
.withMeterRegistry(metricRegistry)
.withVariableRenderer(variableRenderer)
.withVariableRenderer(this.variableRenderer)
.withStorageInterface(storageInterface)
.withSecretKey(secretKey)
.withWorkingDir(workingDirFactory.createWorkingDirectory())

View File

@@ -99,7 +99,7 @@ public final class RunVariables {
* @return a new immutable {@link Map}.
*/
static Map<String, Object> of(final AbstractTrigger trigger) {
return ImmutableMap.of(
return Map.of(
"id", trigger.getId(),
"type", trigger.getType()
);
@@ -281,16 +281,19 @@ public final class RunVariables {
}
if (flow != null && flow.getInputs() != null) {
// Create a new PropertyContext with 'flow' variables which are required by some pebble expressions.
PropertyContextWithVariables context = new PropertyContextWithVariables(propertyContext, Map.of("flow", RunVariables.of(flow)));
// we add default inputs value from the flow if not already set, this will be useful for triggers
flow.getInputs().stream()
.filter(input -> input.getDefaults() != null && !inputs.containsKey(input.getId()))
.forEach(input -> {
try {
inputs.put(input.getId(), FlowInputOutput.resolveDefaultValue(input, propertyContext));
} catch (IllegalVariableEvaluationException e) {
throw new RuntimeException("Unable to inject default value for input '" + input.getId() + "'", e);
}
});
flow.getInputs().stream()
.filter(input -> input.getDefaults() != null && !inputs.containsKey(input.getId()))
.forEach(input -> {
try {
inputs.put(input.getId(), FlowInputOutput.resolveDefaultValue(input, context));
} catch (IllegalVariableEvaluationException e) {
// Silent catch, if an input depends on another input, or a variable that is populated at runtime / input filling time, we can't resolve it here.
}
});
}
if (!inputs.isEmpty()) {
@@ -390,4 +393,20 @@ public final class RunVariables {
}
private RunVariables(){}
private record PropertyContextWithVariables(
PropertyContext delegate,
Map<String, Object> variables
) implements PropertyContext {
@Override
public String render(String inline, Map<String, Object> variables) throws IllegalVariableEvaluationException {
return delegate.render(inline, variables.isEmpty() ? this.variables : variables);
}
@Override
public Map<String, Object> render(Map<String, Object> inline, Map<String, Object> variables) throws IllegalVariableEvaluationException {
return delegate.render(inline, variables.isEmpty() ? this.variables : variables);
}
}
}

View File

@@ -45,7 +45,7 @@ final class Secret {
for (var entry: data.entrySet()) {
if (entry.getValue() instanceof Map map) {
// if some value are of type EncryptedString we decode them and replace the object
if (EncryptedString.TYPE.equalsIgnoreCase((String)map.get("type"))) {
if (map.get("type") instanceof String typeStr && EncryptedString.TYPE.equalsIgnoreCase(typeStr)) {
try {
String decoded = decrypt((String) map.get("value"));
decryptedMap.put(entry.getKey(), decoded);

View File

@@ -0,0 +1,39 @@
package io.kestra.core.runners;
import io.kestra.core.runners.pebble.PebbleEngineFactory;
import io.kestra.core.runners.pebble.functions.SecretFunction;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.List;
@Singleton
public class SecureVariableRendererFactory {
private final PebbleEngineFactory pebbleEngineFactory;
private final ApplicationContext applicationContext;
private VariableRenderer secureVariableRenderer;
@Inject
public SecureVariableRendererFactory(ApplicationContext applicationContext, PebbleEngineFactory pebbleEngineFactory) {
this.pebbleEngineFactory = pebbleEngineFactory;
this.applicationContext = applicationContext;
}
/**
* Creates or returns the existing secured {@link VariableRenderer} instance.
*
* @return the secured {@link VariableRenderer} instance
*/
public synchronized VariableRenderer createOrGet() {
if (this.secureVariableRenderer == null) {
// Explicitly create a new instance through the application context to ensure
// eventual custom VariableRenderer implementation is used
secureVariableRenderer = applicationContext.createBean(VariableRenderer.class);
secureVariableRenderer.setPebbleEngine(pebbleEngineFactory.createWithMaskedFunctions(secureVariableRenderer, List.of(SecretFunction.NAME)));
}
return secureVariableRenderer;
}
}

View File

@@ -2,121 +2,44 @@ package io.kestra.core.runners;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.pebble.*;
import io.kestra.core.runners.pebble.functions.RenderingFunctionInterface;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.core.annotation.Nullable;
import io.pebbletemplates.pebble.PebbleEngine;
import io.pebbletemplates.pebble.error.AttributeNotFoundException;
import io.pebbletemplates.pebble.error.PebbleException;
import io.pebbletemplates.pebble.extension.Extension;
import io.pebbletemplates.pebble.extension.Function;
import io.pebbletemplates.pebble.template.PebbleTemplate;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Getter;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@Singleton
public class VariableRenderer {
private static final Pattern RAW_PATTERN = Pattern.compile("(\\{%-*\\s*raw\\s*-*%}(.*?)\\{%-*\\s*endraw\\s*-*%})");
public static final int MAX_RENDERING_AMOUNT = 100;
private final PebbleEngine pebbleEngine;
private PebbleEngine pebbleEngine;
private final VariableConfiguration variableConfiguration;
@Inject
public VariableRenderer(ApplicationContext applicationContext, @Nullable VariableConfiguration variableConfiguration) {
this(applicationContext, variableConfiguration, Collections.emptyList());
this(applicationContext.getBean(PebbleEngineFactory.class), variableConfiguration);
}
public VariableRenderer(ApplicationContext applicationContext, @Nullable VariableConfiguration variableConfiguration, List<String> functionsToMask) {
public VariableRenderer(PebbleEngineFactory pebbleEngineFactory, @Nullable VariableConfiguration variableConfiguration) {
this.variableConfiguration = variableConfiguration != null ? variableConfiguration : new VariableConfiguration();
PebbleEngine.Builder pebbleBuilder = new PebbleEngine.Builder()
.registerExtensionCustomizer(ExtensionCustomizer::new)
.strictVariables(true)
.cacheActive(this.variableConfiguration.getCacheEnabled())
.newLineTrimming(false)
.autoEscaping(false);
List<Extension> extensions = applicationContext.getBeansOfType(Extension.class).stream()
.map(e -> functionsToMask.stream().anyMatch(excludedFunction -> e.getFunctions().containsKey(excludedFunction))
? extensionWithMaskedFunctions(e, functionsToMask)
: e)
.toList();
extensions.forEach(pebbleBuilder::extension);
if (this.variableConfiguration.getCacheEnabled()) {
pebbleBuilder.templateCache(new PebbleLruCache(this.variableConfiguration.getCacheSize()));
}
this.pebbleEngine = pebbleBuilder.build();
this.pebbleEngine = pebbleEngineFactory.create();
}
private Extension extensionWithMaskedFunctions(Extension initialExtension, List<String> maskedFunctions) {
return (Extension) Proxy.newProxyInstance(
initialExtension.getClass().getClassLoader(),
new Class[]{Extension.class},
(proxy, method, methodArgs) -> {
if (method.getName().equals("getFunctions")) {
return initialExtension.getFunctions().entrySet().stream()
.map(entry -> {
if (maskedFunctions.contains(entry.getKey())) {
return Map.entry(entry.getKey(), this.maskedFunctionProxy(entry.getValue()));
} else if (RenderingFunctionInterface.class.isAssignableFrom(entry.getValue().getClass())) {
return Map.entry(entry.getKey(), this.variableRendererProxy(entry.getValue()));
}
return entry;
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
return method.invoke(initialExtension, methodArgs);
}
);
public void setPebbleEngine(final PebbleEngine pebbleEngine) {
this.pebbleEngine = pebbleEngine;
}
private Function variableRendererProxy(Function initialFunction) {
return (Function) Proxy.newProxyInstance(
initialFunction.getClass().getClassLoader(),
new Class[]{Function.class, RenderingFunctionInterface.class},
(functionProxy, functionMethod, functionArgs) -> {
if (functionMethod.getName().equals("variableRenderer")) {
return this;
}
return functionMethod.invoke(initialFunction, functionArgs);
}
);
}
private Function maskedFunctionProxy(Function initialFunction) {
return (Function) Proxy.newProxyInstance(
initialFunction.getClass().getClassLoader(),
new Class[]{Function.class},
(functionProxy, functionMethod, functionArgs) -> {
Object result;
try {
result = functionMethod.invoke(initialFunction, functionArgs);
} catch (InvocationTargetException e) {
throw e.getCause();
}
if (functionMethod.getName().equals("execute")) {
return "******";
}
return result;
}
);
}
public static IllegalVariableEvaluationException properPebbleException(PebbleException initialExtension) {
if (initialExtension instanceof AttributeNotFoundException current) {
return new IllegalVariableEvaluationException(

View File

@@ -0,0 +1,118 @@
package io.kestra.core.runners.pebble;
import io.kestra.core.runners.VariableRenderer;
import io.kestra.core.runners.pebble.functions.RenderingFunctionInterface;
import io.micronaut.context.ApplicationContext;
import io.micronaut.core.annotation.Nullable;
import io.pebbletemplates.pebble.PebbleEngine;
import io.pebbletemplates.pebble.extension.Extension;
import io.pebbletemplates.pebble.extension.Function;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Singleton
public class PebbleEngineFactory {
private final ApplicationContext applicationContext;
private final VariableRenderer.VariableConfiguration variableConfiguration;
@Inject
public PebbleEngineFactory(ApplicationContext applicationContext, @Nullable VariableRenderer.VariableConfiguration variableConfiguration) {
this.applicationContext = applicationContext;
this.variableConfiguration = variableConfiguration;
}
public PebbleEngine create() {
PebbleEngine.Builder builder = newPebbleEngineBuilder();
this.applicationContext.getBeansOfType(Extension.class).forEach(builder::extension);
return builder.build();
}
public PebbleEngine createWithMaskedFunctions(VariableRenderer renderer, final List<String> functionsToMask) {
PebbleEngine.Builder builder = newPebbleEngineBuilder();
this.applicationContext.getBeansOfType(Extension.class).stream()
.map(e -> functionsToMask.stream().anyMatch(fun -> e.getFunctions().containsKey(fun))
? extensionWithMaskedFunctions(renderer, e, functionsToMask)
: e)
.forEach(builder::extension);
return builder.build();
}
private PebbleEngine.Builder newPebbleEngineBuilder() {
PebbleEngine.Builder builder = new PebbleEngine.Builder()
.registerExtensionCustomizer(ExtensionCustomizer::new)
.strictVariables(true)
.cacheActive(this.variableConfiguration.getCacheEnabled())
.newLineTrimming(false)
.autoEscaping(false);
if (this.variableConfiguration.getCacheEnabled()) {
builder = builder.templateCache(new PebbleLruCache(this.variableConfiguration.getCacheSize()));
}
return builder;
}
private Extension extensionWithMaskedFunctions(VariableRenderer renderer, Extension initialExtension, List<String> maskedFunctions) {
return (Extension) Proxy.newProxyInstance(
initialExtension.getClass().getClassLoader(),
new Class[]{Extension.class},
(proxy, method, methodArgs) -> {
if (method.getName().equals("getFunctions")) {
return initialExtension.getFunctions().entrySet().stream()
.map(entry -> {
if (maskedFunctions.contains(entry.getKey())) {
return Map.entry(entry.getKey(), this.maskedFunctionProxy(entry.getValue()));
} else if (RenderingFunctionInterface.class.isAssignableFrom(entry.getValue().getClass())) {
return Map.entry(entry.getKey(), this.variableRendererProxy(renderer, entry.getValue()));
}
return entry;
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
return method.invoke(initialExtension, methodArgs);
}
);
}
private Function variableRendererProxy(VariableRenderer renderer, Function initialFunction) {
return (Function) Proxy.newProxyInstance(
initialFunction.getClass().getClassLoader(),
new Class[]{Function.class, RenderingFunctionInterface.class},
(functionProxy, functionMethod, functionArgs) -> {
if (functionMethod.getName().equals("variableRenderer")) {
return renderer;
}
return functionMethod.invoke(initialFunction, functionArgs);
}
);
}
private Function maskedFunctionProxy(Function initialFunction) {
return (Function) Proxy.newProxyInstance(
initialFunction.getClass().getClassLoader(),
new Class[]{Function.class},
(functionProxy, functionMethod, functionArgs) -> {
Object result;
try {
result = functionMethod.invoke(initialFunction, functionArgs);
} catch (InvocationTargetException e) {
throw e.getCause();
}
if (functionMethod.getName().equals("execute")) {
return "******";
}
return result;
}
);
}
}

View File

@@ -38,7 +38,7 @@ public class KvFunction implements Function {
String key = getKey(args, self, lineNumber);
String namespace = (String) args.get(NAMESPACE_ARG);
Boolean errorOnMissing = Optional.ofNullable((Boolean) args.get(ERROR_ON_MISSING_ARG)).orElse(true);
boolean errorOnMissing = Optional.ofNullable((Boolean) args.get(ERROR_ON_MISSING_ARG)).orElse(true);
Map<String, String> flow = (Map<String, String>) context.getVariable("flow");
String flowNamespace = flow.get(NAMESPACE_ARG);
@@ -53,11 +53,16 @@ public class KvFunction implements Function {
// we didn't check allowedNamespace here as it's checked in the kvStoreService itself
value = kvStoreService.get(flowTenantId, namespace, flowNamespace).getValue(key);
}
} catch (ResourceExpiredException e) {
if (errorOnMissing) {
throw new PebbleException(e, e.getMessage(), lineNumber, self.getName());
}
value = Optional.empty();
} catch (Exception e) {
throw new PebbleException(e, e.getMessage(), lineNumber, self.getName());
}
if (value.isEmpty() && errorOnMissing == Boolean.TRUE) {
if (value.isEmpty() && errorOnMissing) {
throw new PebbleException(null, "The key '" + key + "' does not exist in the namespace '" + namespace + "'.", lineNumber, self.getName());
}
@@ -85,4 +90,4 @@ public class KvFunction implements Function {
return (String) args.get(KEY_ARGS);
}
}
}

View File

@@ -163,31 +163,28 @@ public final class JacksonMapper {
.build();
}
public static Pair<JsonNode, JsonNode> getBiDirectionalDiffs(Object previous, Object current) {
JsonNode previousJson = MAPPER.valueToTree(previous);
JsonNode newJson = MAPPER.valueToTree(current);
public static Pair<JsonNode, JsonNode> getBiDirectionalDiffs(Object before, Object after) {
JsonNode beforeNode = MAPPER.valueToTree(before);
JsonNode afterNode = MAPPER.valueToTree(after);
JsonNode patchPrevToNew = JsonDiff.asJson(previousJson, newJson);
JsonNode patchNewToPrev = JsonDiff.asJson(newJson, previousJson);
JsonNode patch = JsonDiff.asJson(beforeNode, afterNode);
JsonNode revert = JsonDiff.asJson(afterNode, beforeNode);
return Pair.of(patchPrevToNew, patchNewToPrev);
return Pair.of(patch, revert);
}
public static String applyPatches(Object object, List<JsonNode> patches) throws JsonProcessingException {
public static JsonNode applyPatchesOnJsonNode(JsonNode jsonObject, List<JsonNode> patches) {
for (JsonNode patch : patches) {
try {
// Required for ES
if (patch.findValue("value") == null) {
((ObjectNode) patch.get(0)).set("value", (JsonNode) null);
if (patch.findValue("value") == null && !patch.isEmpty()) {
((ObjectNode) patch.get(0)).set("value", null);
}
JsonNode current = MAPPER.valueToTree(object);
object = JsonPatch.fromJson(patch).apply(current);
jsonObject = JsonPatch.fromJson(patch).apply(jsonObject);
} catch (IOException | JsonPatchException e) {
throw new RuntimeException(e);
}
}
return MAPPER.writeValueAsString(object);
return jsonObject;
}
}

View File

@@ -14,6 +14,7 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.input.InputAndValue;
import io.kestra.core.models.hierarchies.AbstractGraphTask;
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
@@ -56,8 +57,7 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static io.kestra.core.utils.Rethrow.throwFunction;
import static io.kestra.core.utils.Rethrow.throwPredicate;
import static io.kestra.core.utils.Rethrow.*;
@Singleton
@Slf4j
@@ -122,21 +122,38 @@ public class ExecutionService {
* Retry set the given taskRun in created state
* and return the execution in running state
**/
public Execution retryTask(Execution execution, String taskRunId) {
List<TaskRun> newTaskRuns = execution
.getTaskRunList()
.stream()
.map(taskRun -> {
if (taskRun.getId().equals(taskRunId)) {
return taskRun
.withState(State.Type.CREATED);
public Execution retryTask(Execution execution, Flow flow, String taskRunId) throws InternalException {
TaskRun taskRun = execution.findTaskRunByTaskRunId(taskRunId).withState(State.Type.CREATED);
List<TaskRun> taskRunList = execution.getTaskRunList();
if (taskRun.getParentTaskRunId() != null) {
// we need to find the parent to remove any errors or finally tasks already executed
TaskRun parentTaskRun = execution.findTaskRunByTaskRunId(taskRun.getParentTaskRunId());
Task parentTask = flow.findTaskByTaskId(parentTaskRun.getTaskId());
if (parentTask instanceof FlowableTask<?> flowableTask) {
if (flowableTask.getErrors() != null) {
List<Task> allErrors = Stream.concat(flowableTask.getErrors().stream()
.filter(task -> task.isFlowable() && ((FlowableTask<?>) task).getErrors() != null)
.flatMap(task -> ((FlowableTask<?>) task).getErrors().stream()),
flowableTask.getErrors().stream())
.toList();
allErrors.forEach(error -> taskRunList.removeIf(t -> t.getTaskId().equals(error.getId())));
}
return taskRun;
})
.toList();
if (flowableTask.getFinally() != null) {
List<Task> allFinally = Stream.concat(flowableTask.getFinally().stream()
.filter(task -> task.isFlowable() && ((FlowableTask<?>) task).getFinally() != null)
.flatMap(task -> ((FlowableTask<?>) task).getFinally().stream()),
flowableTask.getFinally().stream())
.toList();
allFinally.forEach(error -> taskRunList.removeIf(t -> t.getTaskId().equals(error.getId())));
}
}
return execution.withTaskRunList(newTaskRuns).withState(State.Type.RUNNING);
return execution.withTaskRunList(taskRunList).withTaskRun(taskRun).withState(State.Type.RUNNING);
}
return execution.withTaskRun(taskRun).withState(State.Type.RUNNING);
}
public Execution retryWaitFor(Execution execution, String flowableTaskRunId) {
@@ -431,7 +448,8 @@ public class ExecutionService {
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
@Nullable List<State.Type> state
@Nullable List<State.Type> state,
int batchSize
) throws IOException {
PurgeResult purgeResult = this.executionRepository
.find(
@@ -448,24 +466,27 @@ public class ExecutionService {
null,
true
)
.map(throwFunction(execution -> {
.buffer(batchSize)
.map(throwFunction(executions -> {
PurgeResult.PurgeResultBuilder<?, ?> builder = PurgeResult.builder();
if (purgeExecution) {
builder.executionsCount(this.executionRepository.purge(execution));
builder.executionsCount(this.executionRepository.purge(executions));
}
if (purgeLog) {
builder.logsCount(this.logRepository.purge(execution));
builder.logsCount(this.logRepository.purge(executions));
}
if (purgeMetric) {
builder.metricsCount(this.metricRepository.purge(execution));
builder.metricsCount(this.metricRepository.purge(executions));
}
if (purgeStorage) {
URI uri = StorageContext.forExecution(execution).getExecutionStorageURI(StorageContext.KESTRA_SCHEME);
builder.storagesCount(storageInterface.deleteByPrefix(execution.getTenantId(), execution.getNamespace(), uri).size());
executions.forEach(throwConsumer(execution -> {
URI uri = StorageContext.forExecution(execution).getExecutionStorageURI(StorageContext.KESTRA_SCHEME);
builder.storagesCount(storageInterface.deleteByPrefix(execution.getTenantId(), execution.getNamespace(), uri).size());
}));
}
return (PurgeResult) builder.build();
@@ -715,8 +736,9 @@ public class ExecutionService {
} else {
newExecution = execution.withState(killingOrAfterKillState);
}
eventPublisher.publishEvent(new CrudEvent<>(newExecution, execution, CrudEventType.UPDATE));
// Because this method is expected to be called by the Executor we can return the Execution
// immediately without publishing a CrudEvent like it's done on pause/resume method.
return newExecution;
}
public Execution kill(Execution execution, FlowInterface flow) {

View File

@@ -3,12 +3,7 @@ package io.kestra.core.services;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.kestra.core.exceptions.FlowProcessingException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowId;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.models.flows.*;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.models.triggers.AbstractTrigger;
@@ -30,16 +25,7 @@ import org.apache.commons.lang3.builder.EqualsBuilder;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -551,23 +537,24 @@ public class FlowService {
return expandAll ? recursiveFlowTopology(new ArrayList<>(), tenant, namespace, id, destinationOnly) : flowTopologyRepository.get().findByFlow(tenant, namespace, id, destinationOnly).stream();
}
private Stream<FlowTopology> recursiveFlowTopology(List<FlowId> flowIds, String tenantId, String namespace, String id, boolean destinationOnly) {
private Stream<FlowTopology> recursiveFlowTopology(List<String> visitedTopologies, String tenantId, String namespace, String id, boolean destinationOnly) {
if (flowTopologyRepository.isEmpty()) {
throw noRepositoryException();
}
List<FlowTopology> flowTopologies = flowTopologyRepository.get().findByFlow(tenantId, namespace, id, destinationOnly);
FlowId flowId = FlowId.of(tenantId, namespace, id, null);
if (flowIds.contains(flowId)) {
return flowTopologies.stream();
}
flowIds.add(flowId);
var flowTopologies = flowTopologyRepository.get().findByFlow(tenantId, namespace, id, destinationOnly);
return flowTopologies.stream()
.flatMap(topology -> Stream.of(topology.getDestination(), topology.getSource()))
// recursively fetch child nodes
.flatMap(node -> recursiveFlowTopology(flowIds, node.getTenantId(), node.getNamespace(), node.getId(), destinationOnly));
// ignore already visited topologies
.filter(x -> !visitedTopologies.contains(x.uid()))
.flatMap(topology -> {
visitedTopologies.add(topology.uid());
Stream<FlowTopology> subTopologies = Stream
.of(topology.getDestination(), topology.getSource())
// recursively visit children and parents nodes
.flatMap(relationNode -> recursiveFlowTopology(visitedTopologies, relationNode.getTenantId(), relationNode.getNamespace(), relationNode.getId(), destinationOnly));
return Stream.concat(Stream.of(topology), subTopologies);
});
}
private IllegalStateException noRepositoryException() {

View File

@@ -1,8 +1,9 @@
package io.kestra.executor;
package io.kestra.core.services;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import jakarta.annotation.Nullable;
import jakarta.inject.Singleton;
import java.util.Collections;
@@ -14,6 +15,7 @@ public class SkipExecutionService {
private volatile List<FlowId> skipFlows = Collections.emptyList();
private volatile List<NamespaceId> skipNamespaces = Collections.emptyList();
private volatile List<String> skipTenants = Collections.emptyList();
private volatile List<String> skipIndexerRecords = Collections.emptyList();
public synchronized void setSkipExecutions(List<String> skipExecutions) {
this.skipExecutions = skipExecutions == null ? Collections.emptyList() : skipExecutions;
@@ -31,6 +33,10 @@ public class SkipExecutionService {
this.skipTenants = skipTenants == null ? Collections.emptyList() : skipTenants;
}
public synchronized void setSkipIndexerRecords(List<String> skipIndexerRecords) {
this.skipIndexerRecords = skipIndexerRecords == null ? Collections.emptyList() : skipIndexerRecords;
}
/**
* Warning: this method didn't check the flow, so it must be used only when neither of the others can be used.
*/
@@ -46,6 +52,14 @@ public class SkipExecutionService {
return skipExecution(taskRun.getTenantId(), taskRun.getNamespace(), taskRun.getFlowId(), taskRun.getExecutionId());
}
/**
* Skip an indexer records based on its key.
* @param key the record key as computed by <code>QueueService.key(record)</code>, can be null
*/
public boolean skipIndexerRecord(@Nullable String key) {
return key != null && skipIndexerRecords.contains(key);
}
@VisibleForTesting
boolean skipExecution(String tenant, String namespace, String flow, String executionId) {
return (tenant != null && skipTenants.contains(tenant)) ||

View File

@@ -1,5 +1,6 @@
package io.kestra.core.storages.kv;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import java.time.Duration;
@@ -9,6 +10,7 @@ import java.util.Map;
import java.util.Optional;
@Getter
@EqualsAndHashCode
public class KVMetadata {
private String description;
private Instant expirationDate;
@@ -17,14 +19,18 @@ public class KVMetadata {
if (ttl != null && ttl.isNegative()) {
throw new IllegalArgumentException("ttl cannot be negative");
}
this.description = description;
if (ttl != null) {
this.expirationDate = Instant.now().plus(ttl);
}
}
public KVMetadata(String description, Instant expirationDate) {
this.description = description;
this.expirationDate = expirationDate;
}
public KVMetadata(Map<String, String> metadata) {
if (metadata == null) {
return;
@@ -46,4 +52,9 @@ public class KVMetadata {
}
return map;
}
@Override
public String toString() {
return "[description=" + description + ", expirationDate=" + expirationDate + "]";
}
}

View File

@@ -4,7 +4,9 @@ import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.storages.StorageContext;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.regex.Pattern;
@@ -104,8 +106,33 @@ public interface KVStore {
default boolean exists(String key) throws IOException {
return list().stream().anyMatch(kvEntry -> kvEntry.key().equals(key));
}
/**
* Finds a KV entry with associated metadata for a given key.
*
* @param key the KV entry key.
* @return an optional of {@link KVValueAndMetadata}.
*
* @throws UncheckedIOException if an error occurred while executing the operation on the K/V store.
*/
default Optional<KVValueAndMetadata> findMetadataAndValue(final String key) throws UncheckedIOException {
try {
return get(key).flatMap(entry ->
{
try {
return getValue(entry.key()).map(current -> new KVValueAndMetadata(new KVMetadata(entry.description(), entry.expirationDate()), current.value()));
} catch (IOException e) {
throw new UncheckedIOException(e);
} catch (ResourceExpiredException e) {
return Optional.empty();
}
}
);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
Pattern KEY_VALIDATOR_PATTERN = Pattern.compile("[a-zA-Z0-9][a-zA-Z0-9._-]*");
/**

View File

@@ -18,6 +18,7 @@ import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.FlowTopologyRepositoryInterface;
import io.kestra.core.services.ConditionService;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.MapUtils;
import io.kestra.plugin.core.condition.*;
import io.micronaut.core.annotation.Nullable;
import jakarta.inject.Inject;
@@ -175,9 +176,6 @@ public class FlowTopologyService {
protected boolean isTriggerChild(Flow parent, Flow child) {
List<AbstractTrigger> triggers = ListUtils.emptyOnNull(child.getTriggers());
// simulated execution: we add a "simulated" label so conditions can know that the evaluation is for a simulated execution
Execution execution = Execution.newExecution(parent, (f, e) -> null, List.of(SIMULATED_EXECUTION), Optional.empty());
// keep only flow trigger
List<io.kestra.plugin.core.trigger.Flow> flowTriggers = triggers
.stream()
@@ -189,13 +187,16 @@ public class FlowTopologyService {
return false;
}
// simulated execution: we add a "simulated" label so conditions can know that the evaluation is for a simulated execution
Execution execution = Execution.newExecution(parent, (f, e) -> null, List.of(SIMULATED_EXECUTION), Optional.empty());
boolean conditionMatch = flowTriggers
.stream()
.flatMap(flow -> ListUtils.emptyOnNull(flow.getConditions()).stream())
.allMatch(condition -> validateCondition(condition, parent, execution));
boolean preconditionMatch = flowTriggers.stream()
.anyMatch(flow -> flow.getPreconditions() == null || validateMultipleConditions(flow.getPreconditions().getConditions(), parent, execution));
.anyMatch(flow -> flow.getPreconditions() == null || validatePreconditions(flow.getPreconditions(), parent, execution));
return conditionMatch && preconditionMatch;
}
@@ -239,11 +240,24 @@ public class FlowTopologyService {
}
private boolean isMandatoryMultipleCondition(Condition condition) {
return Stream
.of(
Expression.class
)
.anyMatch(aClass -> condition.getClass().isAssignableFrom(aClass));
return condition.getClass().isAssignableFrom(Expression.class);
}
private boolean validatePreconditions(io.kestra.plugin.core.trigger.Flow.Preconditions preconditions, FlowInterface child, Execution execution) {
boolean upstreamFlowMatched = MapUtils.emptyOnNull(preconditions.getUpstreamFlowsConditions())
.values()
.stream()
.filter(c -> !isFilterCondition(c))
.anyMatch(c -> validateCondition(c, child, execution));
boolean whereMatched = MapUtils.emptyOnNull(preconditions.getWhereConditions())
.values()
.stream()
.filter(c -> !isFilterCondition(c))
.allMatch(c -> validateCondition(c, child, execution));
// to be a dependency, if upstream flow is set it must be either inside it so it's a AND between upstream flow and where
return upstreamFlowMatched && whereMatched;
}
private boolean isFilterCondition(Condition condition) {

View File

@@ -206,22 +206,17 @@ public class MapUtils {
/**
* Utility method that flatten a nested map.
* <p>
* NOTE: for simplicity, this method didn't allow to flatten maps with conflicting keys that would end up in different flatten keys,
* this could be related later if needed by flattening {k1: k2: {k3: v1}, k1: {k4: v2}} to {k1.k2.k3: v1, k1.k4: v2} is prohibited for now.
*
* @param nestedMap the nested map.
* @return the flattened map.
*
* @throws IllegalArgumentException if any entry contains a map of more than one element.
*/
public static Map<String, Object> nestedToFlattenMap(@NotNull Map<String, Object> nestedMap) {
Map<String, Object> result = new TreeMap<>();
for (Map.Entry<String, Object> entry : nestedMap.entrySet()) {
if (entry.getValue() instanceof Map<?, ?> map) {
Map.Entry<String, Object> flatten = flattenEntry(entry.getKey(), (Map<String, Object>) map);
result.put(flatten.getKey(), flatten.getValue());
Map<String, Object> flatten = flattenEntry(entry.getKey(), (Map<String, Object>) map);
result.putAll(flatten);
} else {
result.put(entry.getKey(), entry.getValue());
}
@@ -229,18 +224,19 @@ public class MapUtils {
return result;
}
private static Map.Entry<String, Object> flattenEntry(String key, Map<String, Object> value) {
if (value.size() > 1) {
throw new IllegalArgumentException("You cannot flatten a map with an entry that is a map of more than one element, conflicting key: " + key);
private static Map<String, Object> flattenEntry(String key, Map<String, Object> value) {
Map<String, Object> result = new TreeMap<>();
for (Map.Entry<String, Object> entry : value.entrySet()) {
String newKey = key + "." + entry.getKey();
Object newValue = entry.getValue();
if (newValue instanceof Map<?, ?> map) {
result.putAll(flattenEntry(newKey, (Map<String, Object>) map));
} else {
result.put(newKey, newValue);
}
}
Map.Entry<String, Object> entry = value.entrySet().iterator().next();
String newKey = key + "." + entry.getKey();
Object newValue = entry.getValue();
if (newValue instanceof Map<?, ?> map) {
return flattenEntry(newKey, (Map<String, Object>) map);
} else {
return Map.entry(newKey, newValue);
}
return result;
}
}

View File

@@ -0,0 +1,5 @@
package io.kestra.core.utils;
public class RegexPatterns {
public static final String JAVA_IDENTIFIER_REGEX = "^[A-Za-z_$][A-Za-z0-9_$]*(\\.[A-Za-z_$][A-Za-z0-9_$]*)*$";
}

View File

@@ -63,7 +63,7 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
List<String> violations = new ArrayList<>();
if (RESERVED_FLOW_IDS.contains(value.getId())) {
if (value.getId() != null && RESERVED_FLOW_IDS.contains(value.getId())) {
violations.add("Flow id is a reserved keyword: " + value.getId() + ". List of reserved keywords: " + String.join(", ", RESERVED_FLOW_IDS));
}

Some files were not shown because too many files have changed in this diff Show More