Compare commits

...

187 Commits

Author SHA1 Message Date
François Delbrayelle
343c66f125 build(release): only release changed plugins 2025-08-12 11:59:15 +02:00
Loïc Mathieu
577f813eef fix(executions): SLA monitor should take into account restarted executions 2025-08-12 11:46:58 +02:00
Loïc Mathieu
06a9f13676 fix(executions): concurrency limit exceeded when restarting an execution
Fixes #7880
2025-08-12 11:46:58 +02:00
Loïc Mathieu
1fd6e23f96 feat(flows): Flow SLA out of beta
Part-of: https://github.com/kestra-io/kestra-ee/issues/4555
2025-08-12 11:29:32 +02:00
Piyush Bhaskar
9a32780c8c fix(flow): fixes flow deletion inside actions (#10693) 2025-08-12 14:56:31 +05:30
Nicolas K.
af140baa66 Feat/add filters to repositories (#10629)
* wip(repositories): use query filter in the log repository

* feat(repositories): #10628 refactor query builder engine

* fix(repositories): #10628 add sort to findAsych query

* Update core/src/main/java/io/kestra/core/utils/ListUtils.java

Co-authored-by: Loïc Mathieu <loikeseke@gmail.com>

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
Co-authored-by: Loïc Mathieu <loikeseke@gmail.com>
2025-08-12 11:17:47 +02:00
Florian Hussonnois
54b0183b95 fix(system): avoid unsupported type error on ServiceType enum 2025-08-12 10:01:30 +02:00
Loïc Mathieu
64de3d5fa8 fix(executions): correctly fail the request when trying to resume an execution with the wrong inputs
Fixes #9959
2025-08-12 09:39:02 +02:00
Piyush Bhaskar
4c17aadb81 fix(ui): more visible color for deafult edge (#10690) 2025-08-12 12:44:20 +05:30
Piyush Bhaskar
bf424fbf53 fix(core): reduce size of code block text and padding (#10689) 2025-08-12 11:46:52 +05:30
brian.mulier
edcdb88559 fix(dashboard): avoid duplicate dashboard calls + properly refresh dashboards on refresh button + don't discard component entirely on refresh 2025-08-11 22:28:19 +02:00
brian.mulier
9a9d0b995a fix(dashboard): properly use time filters in queries
closes kestra-io/kestra-ee#4389
2025-08-11 22:28:19 +02:00
brian-mulier-p
5c5d313fb0 fix(metrics): restore autocompletion on metrics filter (#10688) 2025-08-11 21:08:56 +02:00
Nicolas K.
dfd4d87867 feat(releases): add test jar to meven central deployment (#10675)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-08-11 15:56:51 +02:00
Piyush Bhaskar
367d773a86 fix(flows): enable the save and makes tab dirty when have unsaved changes in no code (#10671) 2025-08-11 18:35:56 +05:30
brian.mulier
c819f15c66 tests(core): add a test to taskrunners to ensure it's working multiple times on the same working directory
part of kestra-io/plugin-ee-kubernetes#45
2025-08-11 14:59:15 +02:00
Loïc Mathieu
673b5c994c feat(flows): add upstream dependencies in flow dependencies
Closes #10638
2025-08-11 12:43:33 +02:00
Loïc Mathieu
2acf37e0e6 fix(executions): properly fail the task if it contains unsupported unicode sequence
This occurs in Postgres using the `\u0000` unicode sequence. Postgres refuse to store any JSONB with this sequence as it has no textual representation.
We now properly detect that and fail the task.

Fixes #10326
2025-08-11 11:53:39 +02:00
Ludovic DEHON
0d7fcbb936 build(core): create a docker image for each pull request (#10644)
relate to kestra-io/kestra#10643
2025-08-09 00:18:28 +02:00
Miloš Paunović
42b01d6951 chore(core): reload number of dependencies on flow save action (#10663)
Closes https://github.com/kestra-io/kestra/issues/10484.
2025-08-08 15:11:41 +02:00
Miloš Paunović
9edfb01920 chore(core): uniform dependency table namespace label (#10655) 2025-08-08 13:14:53 +02:00
Miloš Paunović
7813337f48 fix(core): ensure dependency table updates occur after dom is fully rendered (#10654)
Closes https://github.com/kestra-io/kestra/issues/10639.
2025-08-08 12:52:16 +02:00
Miloš Paunović
ea0342f82a refactor(core): remove revision property from flow nodes in dependency graph (#10650)
Related to https://github.com/kestra-io/kestra/issues/10633.
2025-08-08 12:21:01 +02:00
Piyush Bhaskar
ca8f25108e fix(core): update flow store usage. (#10649) 2025-08-08 11:34:09 +02:00
Miloš Paunović
49b6c331a6 chore(core): amend edge color scheme in execution dependency graph (#10648)
Related to https://github.com/kestra-io/kestra/issues/10639.
2025-08-08 11:29:11 +02:00
Miloš Paunović
e409fb7ac0 chore(core): lower the wheel sensitivity on zooming of dependency graph (#10647)
Relates to https://github.com/kestra-io/kestra/issues/10639.
2025-08-08 10:27:51 +02:00
Miloš Paunović
0b64c29794 fix(flows): properly import pinia store into a dependency graph composable (#10646) 2025-08-08 10:25:58 +02:00
Piyush Bhaskar
c4665460aa fix(flows): copy trigger url propely. (#10645) 2025-08-08 12:57:41 +05:30
Barthélémy Ledoux
5423b6e3a7 refactor: move flow store to pinia (#10620) 2025-08-08 09:04:33 +02:00
Vanshika Kumar
114669e1b5 chore(core): add padding around user image in left sidebar (#10553)
Co-authored-by: Vanshika Kumar <vanshika.kumar-ext@infra.market>
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-08-08 08:34:23 +02:00
Loïc Mathieu
d75f0ced38 fix(executions): allow caching tasks that use the 'workingDir' variable
Fixes #10253
2025-08-07 17:26:24 +02:00
brian.mulier
0a788d8429 fix(core): ensure props with defaults are not marked as required in generated doc 2025-08-07 15:07:00 +02:00
brian.mulier
8c25d1bbd7 fix(core): wrong @NotNull import leading to key not being marked as required
closes #9287
2025-08-07 15:07:00 +02:00
YannC
4e2e8f294f fix: avoid calling nextExecutionDate if value is null when resetting trigger (#10547) 2025-08-07 14:51:27 +02:00
Barthélémy Ledoux
2c34804ce2 fix(core): update necessary node viewer in gradle build (#10624) 2025-08-07 13:38:29 +02:00
Piyush Bhaskar
bab4eef790 refactor(namespace): migrate namespace module to pinia (#10571)
* refactor(namespace): migrate namespace module to pinia

* refactor(namespaces): override the store and fix the test

* fix:  test in good way

* refactor: rename action as ee

* refactor: state and action is different

* refactor:  namespaces store in composition  API and composable to use the common state, actions

* fix: export validate
2025-08-07 16:20:51 +05:30
Miloš Paunović
94aa628ac1 feat(core): implement different graph type for dependencies view (#10240)
Closes https://github.com/kestra-io/kestra/issues/5350.
Closes https://github.com/kestra-io/kestra/issues/10446.
Closes https://github.com/kestra-io/kestra/issues/10563.
Closes https://github.com/kestra-io/kestra-ee/issues/3431.
Closes https://github.com/kestra-io/kestra-ee/issues/4509.

Relates to https://github.com/kestra-io/kestra/issues/10484.
Relates to https://github.com/kestra-io/kestra-ee/issues/3550.
2025-08-07 12:12:12 +02:00
Loïc Mathieu
da180fbc00 chore(system): add a note on MapUtils.nestedToFlattenMap() method 2025-08-07 12:01:31 +02:00
Anna Geller
c7bd592bc7 fix(ai-agent): add prompt suggestion 2025-08-07 10:42:35 +02:00
Florian Hussonnois
693d174960 chore(system): provide a more useful Either utility class
Rewrite and add tests to Either class to be a bit
more useable
2025-08-07 10:31:28 +02:00
Florian Hussonnois
8ee492b9c5 fix(system): fix consumer commit on JDBC queue
Ensure that JDBC queue records are committed to the consumer
after processing. This fixes a rare issue where executions could be blocked after a runner crash.
2025-08-07 10:31:17 +02:00
Loïc Mathieu
d6b8ba34ea chore(system): provide a MapUtils.nestedToFlattenMap() method
It will be used to nest a previously flatten map when needed.
2025-08-07 10:00:13 +02:00
dependabot[bot]
08cc853e00 build(deps): bump software.amazon.awssdk.crt:aws-crt
Bumps [software.amazon.awssdk.crt:aws-crt](https://github.com/awslabs/aws-crt-java) from 0.38.7 to 0.38.8.
- [Release notes](https://github.com/awslabs/aws-crt-java/releases)
- [Commits](https://github.com/awslabs/aws-crt-java/compare/v0.38.7...v0.38.8)

---
updated-dependencies:
- dependency-name: software.amazon.awssdk.crt:aws-crt
  dependency-version: 0.38.8
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-07 09:19:01 +02:00
dependabot[bot]
4f68715483 build(deps): bump org.apache.commons:commons-compress
Bumps [org.apache.commons:commons-compress](https://github.com/apache/commons-compress) from 1.27.1 to 1.28.0.
- [Changelog](https://github.com/apache/commons-compress/blob/master/RELEASE-NOTES.txt)
- [Commits](https://github.com/apache/commons-compress/compare/rel/commons-compress-1.27.1...rel/commons-compress-1.28.0)

---
updated-dependencies:
- dependency-name: org.apache.commons:commons-compress
  dependency-version: 1.28.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-07 09:18:02 +02:00
Karthik D
edde1b6730 fix(core): fixes overflow of outputs content
* fix

* fix

* fix: minor tweaks

* fix: scope the style

---------

Co-authored-by: Piyush-r-bhaskar <impiyush0012@gmail.com>
2025-08-07 12:37:44 +05:30
Biplab Bera
399446f52e feat: disabled the preview button in output tabs for zip files (#10535)
Co-authored-by: Piyush Bhaskar <102078527+Piyush-r-bhaskar@users.noreply.github.com>
2025-08-07 11:56:58 +05:30
Florian Hussonnois
c717890fbc fix(build): fix and enhance release-plugins.sh
Skip gradle release when tag already exists
Check for staging files before commiting
2025-08-06 17:17:50 +02:00
Barthélémy Ledoux
5328b0c574 fix(flows): allow date inputs in playground (#10611) 2025-08-06 15:36:29 +02:00
Barthélémy Ledoux
de14cae1f0 fix(flows): playground only clear highlighted lines on leave task (#10612) 2025-08-06 15:36:17 +02:00
Miloš Paunović
d8a3e703e7 feat(core): add animated edges to topology graph (#10616)
Closes kestra-io/kestra#10614.
2025-08-06 14:49:31 +02:00
dependabot[bot]
90659bc320 build(deps): bump com.azure:azure-sdk-bom from 1.2.36 to 1.2.37
Bumps [com.azure:azure-sdk-bom](https://github.com/azure/azure-sdk-for-java) from 1.2.36 to 1.2.37.
- [Release notes](https://github.com/azure/azure-sdk-for-java/releases)
- [Commits](https://github.com/azure/azure-sdk-for-java/compare/azure-sdk-bom_1.2.36...azure-sdk-bom_1.2.37)

---
updated-dependencies:
- dependency-name: com.azure:azure-sdk-bom
  dependency-version: 1.2.37
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-06 12:55:33 +02:00
dependabot[bot]
37d1d8856e build(deps): bump software.amazon.awssdk:bom from 2.32.11 to 2.32.16
Bumps software.amazon.awssdk:bom from 2.32.11 to 2.32.16.

---
updated-dependencies:
- dependency-name: software.amazon.awssdk:bom
  dependency-version: 2.32.16
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-06 11:56:59 +02:00
Florian Hussonnois
93a4eb5cbc build: add plugin-datagen to plugin list 2025-08-06 11:11:46 +02:00
Miloš Paunović
de160c8a2d chore(deps): regular dependency update (#10607)
Performing a weekly round of dependency updates in the NPM ecosystem to keep everything up to date.
2025-08-06 10:20:32 +02:00
dependabot[bot]
28458b59eb build(deps): bump com.mysql:mysql-connector-j from 9.3.0 to 9.4.0
Bumps [com.mysql:mysql-connector-j](https://github.com/mysql/mysql-connector-j) from 9.3.0 to 9.4.0.
- [Changelog](https://github.com/mysql/mysql-connector-j/blob/release/9.x/CHANGES)
- [Commits](https://github.com/mysql/mysql-connector-j/compare/9.3.0...9.4.0)

---
updated-dependencies:
- dependency-name: com.mysql:mysql-connector-j
  dependency-version: 9.4.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-06 09:50:39 +02:00
dependabot[bot]
2a256d9505 build(deps): bump org.eclipse.angus:jakarta.mail from 2.0.3 to 2.0.4
Bumps org.eclipse.angus:jakarta.mail from 2.0.3 to 2.0.4.

---
updated-dependencies:
- dependency-name: org.eclipse.angus:jakarta.mail
  dependency-version: 2.0.4
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-06 09:50:20 +02:00
dependabot[bot]
9008b21007 build(deps): bump com.google.cloud:libraries-bom from 26.64.0 to 26.65.0
Bumps [com.google.cloud:libraries-bom](https://github.com/googleapis/java-cloud-bom) from 26.64.0 to 26.65.0.
- [Release notes](https://github.com/googleapis/java-cloud-bom/releases)
- [Changelog](https://github.com/googleapis/java-cloud-bom/blob/main/release-please-config.json)
- [Commits](https://github.com/googleapis/java-cloud-bom/compare/v26.64.0...v26.65.0)

---
updated-dependencies:
- dependency-name: com.google.cloud:libraries-bom
  dependency-version: 26.65.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-06 09:49:35 +02:00
dependabot[bot]
8c13bf6a71 build(deps): bump com.gradleup.shadow from 8.3.8 to 8.3.9
Bumps [com.gradleup.shadow](https://github.com/GradleUp/shadow) from 8.3.8 to 8.3.9.
- [Release notes](https://github.com/GradleUp/shadow/releases)
- [Commits](https://github.com/GradleUp/shadow/compare/8.3.8...8.3.9)

---
updated-dependencies:
- dependency-name: com.gradleup.shadow
  dependency-version: 8.3.9
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-06 09:49:06 +02:00
dependabot[bot]
43888cc3dd build(deps): bump actions/download-artifact from 4 to 5
Bumps [actions/download-artifact](https://github.com/actions/download-artifact) from 4 to 5.
- [Release notes](https://github.com/actions/download-artifact/releases)
- [Commits](https://github.com/actions/download-artifact/compare/v4...v5)

---
updated-dependencies:
- dependency-name: actions/download-artifact
  dependency-version: '5'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-06 09:48:47 +02:00
Piyush Bhaskar
c94093d9f6 fix(flows): ensure plugin documentation change on flow switch (#10546)
Co-authored-by: Barthélémy Ledoux <bledoux@kestra.io>
2025-08-05 14:29:36 +05:30
Barthélémy Ledoux
8779dec28a fix(flows): add conditional rendering for restart button based on execution (#10570) 2025-08-05 10:22:13 +02:00
Nicolas K.
41614c3a6e feat(stores): #4353 list all KV for namespace and parent namespaces (#10470)
* feat(stores): #4353 list all KV for namespace and parent namespaces

* feat(stores): #4353 list all KV for namespace and parent namespaces

* feat(stores): #4353 list all KV for namespace and parent namespaces

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-08-05 09:55:41 +02:00
Barthélémy Ledoux
6b4fdd0688 fix: restore InputForm (#10568) 2025-08-05 09:44:39 +02:00
Loïc Mathieu
0319f3d267 feat(system): set the default number of worker threads to 8x available cpu cores
This is a better default for mixed workloads and provides better tail latency.
This is also what we advise to our customer.
2025-08-05 09:19:14 +02:00
brian.mulier
0b37fe2cb8 fix(namespaces): autocomplete in kv & secrets
related to kestra-io/kestra-ee#4559
2025-08-04 20:29:56 +02:00
brian.mulier
e623dd7729 fix(executions): avoid SSE error in follow execution dependencies
closes #10560
2025-08-04 20:22:32 +02:00
Barthélémy Ledoux
db4f7cb4ff fix(flows)*: load flow for execution needs to be stored most of the time (#10566) 2025-08-04 18:54:01 +02:00
Abhilash T
b14b16db0e fix: Updated InputsForm.vue to clear Radio Button Selection (#9654)
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
Co-authored-by: Bart Ledoux <bledoux@kestra.io>
2025-08-04 16:03:25 +02:00
brian.mulier
77f6cec0e4 fix(executions): restore execution redirect & subflow logs view from parent
closes #10528
closes #10551
2025-08-04 15:46:48 +02:00
Piyush Bhaskar
1748b18d66 chore(core): remove variable and directly assign. (#10554) 2025-08-04 18:45:19 +05:30
Piyush Bhaskar
32f96348c1 fix(core): proper state detection from parsed data (#10527) 2025-08-04 18:41:05 +05:30
Barthélémy Ledoux
07db0a8c80 fix(flows): no-code - when changing type message avoid warning (#10498) 2025-08-04 14:57:28 +02:00
Barthélémy Ledoux
2035fd42c3 refactor: use composition api and ts on revision component (#10529) 2025-08-04 14:56:36 +02:00
Barthélémy Ledoux
2856bf07e8 refactor: move editor from vuex to pinia (#10533)
Co-authored-by: Piyush-r-bhaskar <impiyush0012@gmail.com>
2025-08-04 14:55:55 +02:00
Barthélémy Ledoux
f5327cec33 fix: remove debugging value from playground (#10541) 2025-08-04 14:54:45 +02:00
Anna Geller
42955936b2 fix: demo no longer exists 2025-08-04 14:38:13 +02:00
Miloš Paunović
771b98e023 chore(namespaces): add the needed prop for loading all namespaces inside a selector (#10544) 2025-08-04 12:44:38 +02:00
Miloš Paunović
b80e8487e3 fix(namespaces): amend problems with namespace secrets and kv pairs (#10543)
Closes https://github.com/kestra-io/kestra-ee/issues/4584.
2025-08-04 12:19:52 +02:00
YannC.
f35a0b6d60 fix: add missing webhook releases secrets for github releases 2025-08-01 23:21:27 +02:00
brian.mulier
0c9ed17f1c fix(core): remove icon for inputs in no-code
closes #10520
2025-08-01 16:32:08 +02:00
brian.mulier
7ca20371f8 fix(executions): avoid race condition leading to never-ending follow with non-terminal state 2025-08-01 13:12:14 +02:00
brian.mulier
8ff3454cbd fix(core): ensure instances can read all messages when no consumer group / queue type 2025-08-01 13:12:14 +02:00
Piyush Bhaskar
09593d9fd2 fix(namespaces): fixes loading of additional ns (#10518) 2025-08-01 16:28:01 +05:30
Loïc Mathieu
d3cccf36f0 feat(flow): pull up description to the FlowInterface
This avoid the need to parse the flow for ex by AI to get the description.
2025-08-01 12:43:49 +02:00
Loïc Mathieu
eeb91cd9ed fix(tests): RunContextLoggerTest.secrets(): wrong number of logs in awaitLogs() 2025-08-01 12:41:41 +02:00
Loïc Mathieu
2679b0f067 feat(flows): warn on runnable only properties on non-runnable tasks
Closes #9967
Closes #10500
2025-08-01 12:41:08 +02:00
Piyush Bhaskar
54281864c8 fix(executions): do not rely on monaco to get value (#10515) 2025-08-01 13:23:43 +05:30
Loïc Mathieu
e4f9b11d0c fix(ci): workflow build artifact doesn't need the plugin version 2025-08-01 09:41:48 +02:00
Barthélémy Ledoux
12cef0593c fix(flows): playground need to use ui-libs (#10506) 2025-08-01 09:06:11 +02:00
Piyush Bhaskar
c6cf8f307f fix(flows): route to flow page (#10514) 2025-08-01 12:10:56 +05:30
Piyush Bhaskar
3b4eb55f84 fix(executions): properly handle methods and computed for tabs (#10513) 2025-08-01 12:10:27 +05:30
YannC
d32949985d fix: handle empty flows list in lastExecutions correctly (#10493) 2025-08-01 07:21:00 +02:00
YannC
c051ca2e66 fix(ui): load correctly filters + refresh dashboard on filter change (#10504) 2025-08-01 07:15:46 +02:00
Piyush Bhaskar
93a456963b fix(editor): adjust padding for editor (#10497)
* fix(editor): adjust padding for editor

* fix: make padding 16px
2025-07-31 19:10:46 +05:30
YannC.
9a45f17680 fix(ci): do not run github release on tag 2025-07-31 14:37:51 +02:00
github-actions[bot]
5fb6806d74 chore(core): localize to languages other than english (#10494)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-31 17:44:10 +05:30
Barthélémy Ledoux
f3cff72edd fix(flows): forget all old taskRunId when a new execution (#10487) 2025-07-31 13:41:57 +02:00
Barthélémy Ledoux
0abc660e7d fix(flows): wait longer for widgets to be rendered (#10485) 2025-07-31 13:41:46 +02:00
Barthélémy Ledoux
f09ca3d92e fix(flows): load flows documentation when coming back to no-code root (#10374) 2025-07-31 13:41:36 +02:00
YannC
9fd778fca1 feat(ui): added http method autocompletion (#10492) 2025-07-31 13:28:59 +02:00
Loïc Mathieu
667af25e1b fix(executions): Don't create outputs from the Subflow task when we didn't wait
As, well, if we didn't wait for the subflow execution, we cannot have access to its outputs.
2025-07-31 13:06:58 +02:00
github-actions[bot]
1b1aed5ff1 chore(core): localize to languages other than english (#10489)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-31 12:14:37 +02:00
Barthélémy Ledoux
da1bb58199 fix(flows): add the load errors to the flow errors (#10483) 2025-07-31 11:53:43 +02:00
Loïc Mathieu
d3e661f9f8 feat(system): improve performance of computeSchedulable
- Store flowIds in a list to avoid computing the multiple times
- Storeg triggers by ID in a map to avoid iterating the list of triggers for each flow
2025-07-31 11:35:01 +02:00
yuri1969
2126c8815e feat(core): validate URL configuration
Used the `ServerCommandValidator` style.

BREAKING CHANGE: app won't start due invalid `kestra.url`
2025-07-31 11:24:21 +02:00
yuri1969
6cfc5b8799 fix(build): reduce Gradle warnings 2025-07-31 11:21:01 +02:00
Barthélémy Ledoux
16d44034f0 fix(flows): hide executionkind meta in the logs (#10482) 2025-07-31 10:50:34 +02:00
Barthélémy Ledoux
f76e62a4af fix(executions): do not rely on monaco to get value (#10467) 2025-07-31 09:28:33 +02:00
Piyush Bhaskar
f6645da94c fix(core): remove top spacing from no execution page and removing the redundant code (#10445)
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-07-31 12:03:58 +05:30
github-actions[bot]
93b2bbf0d0 chore(core): localize to languages other than english (#10471)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-31 08:23:08 +02:00
Piyush Bhaskar
9d46e2aece fix(executions): make columns that are not links normal text (#10460)
* fix(executions): make it normal text

* fix(executions): use monospace font only
2025-07-31 10:33:33 +05:30
brian.mulier
133315a2a5 chore(deps): hardcode vue override version 2025-07-30 19:25:50 +02:00
brian.mulier
b96b9bb414 fix(core): avoid follow execution from being discarded too early
closes #10472
closes #7623
2025-07-30 19:25:50 +02:00
Barthélémy Ledoux
9865d8a7dc fix(flows): playground - implement new designs (#10459)
Co-authored-by: brian.mulier <bmmulier@hotmail.fr>
2025-07-30 17:54:46 +02:00
brian-mulier-p
29f22c2f81 fix(core): redesign playground run task button (#10423)
closes #10389
2025-07-30 15:23:33 +02:00
dependabot[bot]
3e69469381 build(deps): bump net.thisptr:jackson-jq from 1.3.0 to 1.4.0
Bumps [net.thisptr:jackson-jq](https://github.com/eiiches/jackson-jq) from 1.3.0 to 1.4.0.
- [Release notes](https://github.com/eiiches/jackson-jq/releases)
- [Commits](https://github.com/eiiches/jackson-jq/compare/1.3.0...1.4.0)

---
updated-dependencies:
- dependency-name: net.thisptr:jackson-jq
  dependency-version: 1.4.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-07-30 15:08:39 +02:00
dependabot[bot]
38c24ccf7f build(deps): bump software.amazon.awssdk:bom from 2.32.6 to 2.32.11
Bumps software.amazon.awssdk:bom from 2.32.6 to 2.32.11.

---
updated-dependencies:
- dependency-name: software.amazon.awssdk:bom
  dependency-version: 2.32.11
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-07-30 15:07:49 +02:00
Loïc Mathieu
12cf41a309 fix(ci): don't publish docker in build-artifact 2025-07-30 14:42:16 +02:00
Malaydewangan09
7b8ea0d885 feat(plugins): add script plugins 2025-07-30 17:27:48 +05:30
Barthélémy Ledoux
cf88bbcb12 fix(flows): playground align restart button button (#10415) 2025-07-30 11:57:24 +02:00
Loïc Mathieu
6abe7f96e7 fix(ci): add missing build artifact job 2025-07-30 11:47:10 +02:00
Loïc Mathieu
e73ac78d8b build(ci): allow downloading the exe from the workflow and not the release
This would allow running the workflow even if the release step fail
2025-07-30 11:23:43 +02:00
François Delbrayelle
b0687eb702 fix(): fix icons 2025-07-30 10:28:10 +02:00
weibo1
85f9070f56 feat: Trigger Initialization Method Performance Optimization 2025-07-30 09:23:48 +02:00
YannC
0a42ab40ec fix(dashboard): pageSize & pageNumber is now correctly pass when fetching a chart (#10413) 2025-07-30 08:45:20 +02:00
Piyush Bhaskar
856d2d1d51 refactor(flows): remove execution chart (#10425) 2025-07-30 11:54:35 +05:30
YannC.
a7d6dbc8a3 feat(ci): allow to run github release ci on dispatch 2025-07-29 15:04:50 +02:00
YannC.
cf82109da6 fix(ci): correctly pass GH token to release workflow 2025-07-29 15:01:36 +02:00
Barthélémy Ledoux
d4168ba424 fix(flows): playground clear current execution when clearExecutions() (#10414) 2025-07-29 14:43:11 +02:00
Loïc Mathieu
46a294f25a chore(version): upgrade to v1.0.0-SNAPSHOT 2025-07-29 14:23:19 +02:00
Loïc Mathieu
a229036d8d chore(version): update to version 'v0.24.0-rc0-SNAPSHOT'. 2025-07-29 14:21:49 +02:00
François Delbrayelle
a518fefecd feat(plugins): add plugin-deepseek 2025-07-29 13:58:11 +02:00
Barthélémy Ledoux
1d3210fd7d fix(flows): remove text from warning button (#10370) 2025-07-29 11:27:37 +02:00
brian-mulier-p
597f84ecb7 fix(core): topology was no longer working on new flows (#10411)
closes #10354
2025-07-29 11:19:05 +02:00
Barthélémy Ledoux
5f3c7ac9f0 fix(core): allow icons api call to take longer than local call (#10412) 2025-07-29 11:13:12 +02:00
Nicolas K.
77c4691b04 fix(tests): rework basic auth service test (#10409)
* fix(tests): rework basic auth service test

* fix(tests): clean basic auth service test

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-07-29 10:59:43 +02:00
github-actions[bot]
6d34416529 chore(core): localize to languages other than english (#10410)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-29 13:55:25 +05:30
Piyush Bhaskar
40a67d5dcd feat(flows): add webhook curl and json field (#10392) 2025-07-29 13:48:28 +05:30
Loïc Mathieu
2c68c704f6 fix(test): flaky test JdbcQueueTest.withGroupAndType() 2025-07-29 09:35:28 +02:00
Miloš Paunović
e59d9f622c chore(namespaces): properly handle file name field on flow run dialog if set from defaults (#10390)
Closes https://github.com/kestra-io/kestra/issues/10365.
2025-07-29 08:29:08 +02:00
Piyush Bhaskar
c951ba39a7 fix(core): make validation less aggressive (#10406) 2025-07-29 11:41:38 +05:30
github-actions[bot]
a0200cfacb chore(core): localize to languages other than english (#10405)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-28 20:50:23 +02:00
dependabot[bot]
c6310f0697 build(deps): bump com.github.ben-manes.caffeine:caffeine
Bumps [com.github.ben-manes.caffeine:caffeine](https://github.com/ben-manes/caffeine) from 3.2.1 to 3.2.2.
- [Release notes](https://github.com/ben-manes/caffeine/releases)
- [Commits](https://github.com/ben-manes/caffeine/compare/v3.2.1...v3.2.2)

---
updated-dependencies:
- dependency-name: com.github.ben-manes.caffeine:caffeine
  dependency-version: 3.2.2
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-07-28 17:28:31 +02:00
dependabot[bot]
21ba59a525 build(deps): bump org.postgresql:postgresql in the gradle group
Bumps the gradle group with 1 update: [org.postgresql:postgresql](https://github.com/pgjdbc/pgjdbc).


Updates `org.postgresql:postgresql` from 42.7.6 to 42.7.7
- [Release notes](https://github.com/pgjdbc/pgjdbc/releases)
- [Changelog](https://github.com/pgjdbc/pgjdbc/blob/master/CHANGELOG.md)
- [Commits](https://github.com/pgjdbc/pgjdbc/compare/REL42.7.6...REL42.7.7)

---
updated-dependencies:
- dependency-name: org.postgresql:postgresql
  dependency-version: 42.7.7
  dependency-type: direct:production
  dependency-group: gradle
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-07-28 17:26:23 +02:00
Barthélémy Ledoux
4f9e3cd06c fix(flows): bring back alignment on overview (#10369) 2025-07-28 17:22:55 +02:00
Barthélémy Ledoux
e74010d1a4 fix(flows): playground - use all tasks as breakpoints (#10399) 2025-07-28 17:22:23 +02:00
Barthélémy Ledoux
465e6467e9 chore: update ui-libs (#10391) 2025-07-28 17:12:56 +02:00
YannC
c68c1b16d9 fix: set postgres and mysql queue offset as a bigint (#10344) 2025-07-28 16:28:09 +02:00
YannC
468c32156e chore: enforce micronaut open api version until Micronaut Platform works (#10393) 2025-07-28 16:27:59 +02:00
Loïc Mathieu
6e0a1c61ef fix(tests): increase the sleep inside ExecutionControllerRunnerTest.triggerExecutionAndFollowDependencies()
I'm not happy with that but I ran 3x 100 repetitions and all passed
2025-07-28 16:23:40 +02:00
Loïc Mathieu
552d55ef6b fix(test): RestactCaseTest.restartFailedWithFinally() should use executionService.isTerminated() 2025-07-28 16:23:40 +02:00
skayliu
08b0b682bf refactor(pebble): add more timestamp data time format 2025-07-28 16:17:55 +02:00
ben8t
cff90c93bb feat(plugin): add Notion plugin (#10049) 2025-07-28 16:09:01 +02:00
Roman Acevedo
ea465056d0 fix(triggers): bulk action on triggers did not take into account this is async (#10307) 2025-07-28 15:21:03 +02:00
Piyush Bhaskar
02f150f0b0 fix(core): animation on ai agent button (#10379)
* fix(core): animation on ai agent button

* reafctor(ai): add AITriggerButton component.
2025-07-28 18:48:34 +05:30
Loïc Mathieu
95d95d3d3c fix(tests): fix assertions in DateFilterTest.now() 2025-07-28 14:51:41 +02:00
Nicolas K.
6b8d3d6928 fix(tests): flaky test with wire mock not staring fast enough (#10383)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-07-28 14:07:48 +02:00
Piyush Bhaskar
1e347073ca fix(plugins): go to list of plugins from sidebar (#10385)
* fix(plugins): go to list of plugins from sidebar

* fix: update route to use path.
2025-07-28 17:24:09 +05:30
Loïc Mathieu
ac09dcecd9 fix(flows): wrong warning in FILE input
As we deprecate the `extension` property but still have a default for it, the warning is always disaply.
Removing the default and applying only when needed fixes the issue.

Fixes #10361
2025-07-28 13:40:11 +02:00
Miloš Paunović
40b337cd22 fix(namespaces): make sure the namespace parameter is properly passed when reading a file (#10384)
Relates to https://github.com/kestra-io/kestra/issues/10363.
Relates to https://github.com/kestra-io/kestra-ee/issues/4514.
2025-07-28 12:35:44 +02:00
Karthik D
5377d16036 chore(plugins): simplify the plugins search field (#10373)
Closes https://github.com/kestra-io/kestra/issues/10300.

Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-07-28 12:15:08 +02:00
Barthélémy Ledoux
f717bc413f tests: update frontend-tests (#10380) 2025-07-28 12:02:32 +02:00
Loïc Mathieu
d6bed2d235 fix(flows): file watching flow should delete the flow when the file is deleted while reading for update
When you use flow watching, a file can be updated then deleted, if it occurs quickly you could have a modified event, read the file then have a file not found exception as it has been deleted.
We should delete the flow in this case.

This has been detected because the related tests are flaky, doing that reduce falkiness of the related tests
2025-07-28 12:00:30 +02:00
Loïc Mathieu
07fd74b238 fix(test): flaky ExecutionControllerRunnerTest.triggerExecutionAndFollowDependencies() 2025-07-28 11:50:57 +02:00
Loïc Mathieu
60eef29de2 fix(tests): await for terminated execution instead of sleep in ExecutionControllerRunnerTest.killByIdShouldFailed() 2025-07-28 11:50:57 +02:00
Loïc Mathieu
20ca7b6380 fix(tests): increase wait time to 10 in LogConsumerTests.logs()
This  should reduce flakiness
2025-07-28 11:50:57 +02:00
Piyush Bhaskar
9d82df61c6 Revert "fix(core): fixes cumbersome operator selection (#10322)" (#10377)
This reverts commit e78210b5eb.
2025-07-28 14:52:58 +05:30
Piyush Bhaskar
e78210b5eb fix(core): fixes cumbersome operator selection (#10322) 2025-07-28 14:29:42 +05:30
Miloš Paunović
83143fae83 fix(executions): add default icons to execution dependency view (#10375)
Closes https://github.com/kestra-io/kestra/issues/10327.
2025-07-28 10:58:12 +02:00
Malay Dewangan
25f5ccc6b5 fix(runner): support floating-point CPU values in Docker runner (#10366) 2025-07-28 14:21:23 +05:30
Piyush Bhaskar
cf3e49a284 fix(core): give height to tooltip and use ks tokens (#10372) 2025-07-28 13:49:42 +05:30
JAGADEESH E
9a72d378df chore(core): amend missing property causing console warning in the settings page (#9788)
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-07-28 09:49:14 +02:00
Loïc Mathieu
752a927fac feat(logs): allow LogShipper to filters on flowId and executionId
Closes https://github.com/kestra-io/kestra-ee/issues/4468
2025-07-28 09:40:36 +02:00
yuri
4053392921 fix(core): amend misc label-related issues (#10044)
* fix(core): amend misc label-related issues

* re-enabled bulk update of label value
* re-enabled merging flow-execution labels by key
* made duplicated keys rejection readable
* forced multiple validations within `RequestUtils`
* ensured existing labels can be overriden
* added multiple tests validating complex scenarios

BREAKING CHANGE: switched from first to last label value override
BREAKING CHANGE: preventing empty key/value labels
BREAKING CHANGE: preventing whitespace in key

* fix(core): reflect feedback

* Deduplicated a list inside the `Labels` task.
* Worked around label mutation at `Worker`.
* Attempted to deduplicate labels within `Execution` as possible.

* fix(core): remove irrelevant changes
2025-07-28 09:38:38 +02:00
Barthélémy Ledoux
8b0483643a feat(flows): code editor can launch playground (#10359) 2025-07-28 09:15:39 +02:00
Piyush Bhaskar
5feeb41c7a fix(core): update state count emission and filter table executions. (#10367) 2025-07-28 12:42:20 +05:30
github-actions[bot]
d7f5e5c05d chore(core): localize to languages other than english (#10368)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-28 09:09:32 +02:00
Aditya
4840f723fc chore(core): properly handle environment name set either via config and through the settings page (#10151)
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-07-28 09:06:34 +02:00
Devesh Kumar
8cf159b281 fix(namespaces): prevent namespace folder highlighting when containing file is selected (#10364)
Closes https://github.com/kestra-io/kestra/issues/10360.
2025-07-28 08:47:25 +02:00
Loïc Mathieu
4c79576113 fix(tests): improve JdbcQueueTest flaky tests 2025-07-25 12:50:27 +02:00
Florian Hussonnois
f87f2ed753 fix(system): avoid potential NPE in ServiceLivenessManager (#10338)
Avoid a potential NPE in ServiceLivenessManager when
a local service is unregistered during shutdown before the liveness probe completes

Fix: #10338
2025-07-25 12:33:18 +02:00
Florian Hussonnois
298a6c7ca8 fix(system): ignore state transition failure for indexer
Fix: kestra-io/kestra-ee#4474
2025-07-25 11:35:09 +02:00
Loïc Mathieu
ab464fff6e fix(executions): flow concurrency limit not honors when executions are created at a high rate
This is due to the fact that we now process the execution queue concurrently so there is a race when counting currently running executions. This can be seen easily using a ForEachItem as it could create tens or hundreds of executions almost instantly leading to almost all those executions started as they would all see 0 executions running...

Using a dedicated execution running queue, as done in EE, would serialize the messages and fix the issue.

However, if using multiple executor instances and concurrency limit = 1, there is a theoretical race as no locks will be done if no execution is running. A max surge of executions could be as high as the number of executor but this race is less probable to happen in real world scenario.

Fixes #10167
2025-07-25 11:35:00 +02:00
Florian Hussonnois
6dcba16314 chore(core): clean QueryFilter class 2025-07-25 11:34:09 +02:00
Barthélémy Ledoux
80a328e87e fix(flows): better loading pattern (#10345) 2025-07-25 10:14:07 +02:00
Loïc Mathieu
f2034f4975 fix(executions): race condition inside nested ForEach with concurrency
Fixes #10167
2025-07-25 09:45:29 +02:00
github-actions[bot]
edca56d168 chore(core): localize to languages other than english (#10341)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-24 23:21:03 +02:00
299 changed files with 10142 additions and 5469 deletions

View File

@@ -1,20 +1,28 @@
name: 'Load Kestra Plugin List'
description: 'Composite action to load list of plugins'
description: 'Composite action to load list of plugins (from .plugins) and output repositories and GA coordinates'
inputs:
plugin-version:
description: "Kestra version"
description: "Kestra version placeholder to replace LATEST in GA coordinates"
default: 'LATEST'
required: true
plugin-file:
description: "File of the plugins"
description: "Path to the .plugins file"
default: './.plugins'
required: true
include:
description: "Regex include filter applied on repository names"
required: false
default: ''
exclude:
description: "Regex exclude filter applied on repository names"
required: false
default: ''
outputs:
plugins:
description: "List of all Kestra plugins"
description: "Space-separated list of GA coordinates (group:artifact:version)"
value: ${{ steps.plugins.outputs.plugins }}
repositories:
description: "List of all Kestra repositories of plugins"
description: "Space-separated list of repository names (e.g., plugin-ai plugin-airbyte)"
value: ${{ steps.plugins.outputs.repositories }}
runs:
using: composite
@@ -23,7 +31,35 @@ runs:
id: plugins
shell: bash
run: |
PLUGINS=$([ -f ${{ inputs.plugin-file }} ] && cat ${{ inputs.plugin-file }} | grep "io\\.kestra\\." | sed -e '/#/s/^.//' | sed -e "s/LATEST/${{ inputs.plugin-version }}/g" | cut -d':' -f2- | xargs || echo '');
REPOSITORIES=$([ -f ${{ inputs.plugin-file }} ] && cat ${{ inputs.plugin-file }} | grep "io\\.kestra\\." | sed -e '/#/s/^.//' | cut -d':' -f1 | uniq | sort | xargs || echo '')
echo "plugins=$PLUGINS" >> $GITHUB_OUTPUT
echo "repositories=$REPOSITORIES" >> $GITHUB_OUTPUT
set -euo pipefail
# Read only uncommented lines that contain io.kestra.* coordinates.
# This avoids the previous approach that 'uncommented' lines by stripping the first char after '#'.
if [[ -f "${{ inputs.plugin-file }}" ]]; then
ENABLED_LINES=$(grep -E '^\s*[^#]' "${{ inputs.plugin-file }}" | grep "io\.kestra\." || true)
else
ENABLED_LINES=""
fi
# Build GA coordinates by replacing LATEST with the provided plugin-version (if present)
PLUGINS=$(echo "$ENABLED_LINES" \
| sed -e "s/LATEST/${{ inputs.plugin-version }}/g" \
| cut -d':' -f2- \
| xargs || echo '')
# Extract repository names (first column), unique + sorted
REPOSITORIES=$(echo "$ENABLED_LINES" \
| cut -d':' -f1 \
| uniq | sort \
| xargs || echo '')
# Apply include/exclude filters if provided (POSIX ERE via grep -E)
if [ -n "${{ inputs.include }}" ] && [ -n "$REPOSITORIES" ]; then
REPOSITORIES=$(echo "$REPOSITORIES" | xargs -n1 | grep -E "${{ inputs.include }}" | xargs || true)
fi
if [ -n "${{ inputs.exclude }}" ] && [ -n "$REPOSITORIES" ]; then
REPOSITORIES=$(echo "$REPOSITORIES" | xargs -n1 | grep -Ev "${{ inputs.exclude }}" | xargs || true)
fi
echo "plugins=$PLUGINS" >> "$GITHUB_OUTPUT"
echo "repositories=$REPOSITORIES" >> "$GITHUB_OUTPUT"

View File

@@ -20,6 +20,15 @@ on:
required: false
type: string
default: "LATEST"
force-download-artifact:
description: 'Force download artifact'
required: false
type: string
default: "true"
options:
- "true"
- "false"
env:
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
jobs:
@@ -38,9 +47,18 @@ jobs:
id: plugins
with:
plugin-version: ${{ env.PLUGIN_VERSION }}
# ********************************************************************************************************************
# Build
# ********************************************************************************************************************
build-artifacts:
name: Build Artifacts
if: ${{ github.event.inputs.force-download-artifact == 'true' }}
uses: ./.github/workflows/workflow-build-artifacts.yml
docker:
name: Publish Docker
needs: [ plugins ]
needs: [ plugins, build-artifacts ]
runs-on: ubuntu-latest
strategy:
matrix:
@@ -73,14 +91,27 @@ jobs:
else
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
fi
# Download release
- name: Download release
# [workflow_dispatch]
# Download executable from GitHub Release
- name: Artifacts - Download release (workflow_dispatch)
id: download-github-release
if: github.event_name == 'workflow_dispatch' && github.event.inputs.force-download-artifact == 'false'
uses: robinraju/release-downloader@v1.12
with:
tag: ${{steps.vars.outputs.tag}}
fileName: 'kestra-*'
out-file-path: build/executable
# [workflow_call]
# Download executable from artifact
- name: Artifacts - Download executable
if: github.event_name != 'workflow_dispatch' || steps.download-github-release.outcome == 'skipped'
uses: actions/download-artifact@v5
with:
name: exe
path: build/executable
- name: Copy exe to image
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra

View File

@@ -15,24 +15,111 @@ on:
description: 'Use DRY_RUN mode'
required: false
default: 'false'
type: choice
options: ['false', 'true']
repositories:
description: 'Space-separated repo names to release (e.g. "plugin-ai plugin-airbyte"). If empty, uses .plugins.'
required: false
type: string
include:
description: 'Regex include filter on repo names (applied when using .plugins)'
required: false
type: string
exclude:
description: 'Regex exclude filter on repo names (applied when using .plugins)'
required: false
type: string
onlyChanged:
description: 'Release only repos changed since last tag (or sinceTag if provided)'
required: false
default: 'false'
type: choice
options: ['false', 'true']
sinceTag:
description: 'Optional tag used as base for change detection (e.g. v0.24.0)'
required: false
type: string
jobs:
release:
name: Release plugins
prepare:
name: Compute target repositories
runs-on: ubuntu-latest
outputs:
matrix: ${{ steps.compute.outputs.matrix }}
steps:
# Checkout
# Checkout the current repo (assumed to contain .plugins and the workflow)
- uses: actions/checkout@v4
with:
fetch-depth: 0
# Checkout GitHub Actions
# Checkout the kestra-io/actions repo (for setup-build, etc.)
- uses: actions/checkout@v4
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- name: Install tools
run: sudo apt-get update && sudo apt-get install -y jq
# Load repositories from .plugins (only uncommented lines) with optional include/exclude filters
- name: Get Plugins List
id: plugins-list
uses: ./.github/actions/plugins-list
with:
plugin-version: 'LATEST'
plugin-file: './.plugins'
include: ${{ github.event.inputs.include }}
exclude: ${{ github.event.inputs.exclude }}
# Finalize repo list:
# - If "repositories" input is provided, it takes precedence.
# - Otherwise, use the filtered list from the composite action.
- name: Build repo list
id: build-list
shell: bash
env:
INP_REPOS: ${{ github.event.inputs.repositories }}
run: |
set -euo pipefail
if [ -n "${INP_REPOS:-}" ]; then
LIST="${INP_REPOS}"
else
LIST="${{ steps.plugins-list.outputs.repositories }}"
fi
# Convert to JSON array for matrix
arr_json=$(printf '%s\n' $LIST | jq -R . | jq -s .)
echo "list=$LIST" >> "$GITHUB_OUTPUT"
echo "arr_json=$arr_json" >> "$GITHUB_OUTPUT"
- name: Compute matrix
id: compute
shell: bash
run: |
set -euo pipefail
echo "matrix={\"repo\": ${{ steps.build-list.outputs.arr_json }}}" >> "$GITHUB_OUTPUT"
release:
name: Release ${{ matrix.repo }}
needs: [prepare]
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix: ${{ fromJson(needs.prepare.outputs.matrix) }}
steps:
# Checkout the current repo (for dev-tools/release-plugins.sh)
- uses: actions/checkout@v4
with:
fetch-depth: 0
# Checkout the kestra-io/actions repo (for setup-build, etc.)
- uses: actions/checkout@v4
with:
repository: kestra-io/actions
path: actions
ref: main
# Build toolchain used by plugin builds
- uses: ./actions/.github/actions/setup-build
id: build
with:
@@ -41,42 +128,45 @@ jobs:
python-enabled: true
caches-enabled: true
# Get Plugins List
- name: Get Plugins List
uses: ./.github/actions/plugins-list
id: plugins-list
with:
plugin-version: 'LATEST'
- name: 'Configure Git'
- name: Configure Git
run: |
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
git config --global user.name "github-actions[bot]"
# Execute
- name: Run Gradle Release
if: ${{ github.event.inputs.dryRun == 'false' }}
env:
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
run: |
chmod +x ./dev-tools/release-plugins.sh;
./dev-tools/release-plugins.sh \
--release-version=${{github.event.inputs.releaseVersion}} \
--next-version=${{github.event.inputs.nextVersion}} \
--yes \
${{ steps.plugins-list.outputs.repositories }}
chmod +x ./dev-tools/release-plugins.sh
ARGS=()
ARGS+=(--release-version="${{ github.event.inputs.releaseVersion }}")
ARGS+=(--next-version="${{ github.event.inputs.nextVersion }}")
ARGS+=(--yes)
if [ "${{ github.event.inputs.onlyChanged }}" = "true" ]; then
ARGS+=(--only-changed)
fi
if [ -n "${{ github.event.inputs.sinceTag }}" ]; then
ARGS+=(--since-tag="${{ github.event.inputs.sinceTag }}")
fi
./dev-tools/release-plugins.sh "${ARGS[@]}" "${{ matrix.repo }}"
# Dry-run release
- name: Run Gradle Release (DRY_RUN)
if: ${{ github.event.inputs.dryRun == 'true' }}
env:
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
run: |
chmod +x ./dev-tools/release-plugins.sh;
./dev-tools/release-plugins.sh \
--release-version=${{github.event.inputs.releaseVersion}} \
--next-version=${{github.event.inputs.nextVersion}} \
--dry-run \
--yes \
${{ steps.plugins-list.outputs.repositories }}
chmod +x ./dev-tools/release-plugins.sh
ARGS=()
ARGS+=(--release-version="${{ github.event.inputs.releaseVersion }}")
ARGS+=(--next-version="${{ github.event.inputs.nextVersion }}")
ARGS+=(--dry-run)
ARGS+=(--yes)
if [ "${{ github.event.inputs.onlyChanged }}" = "true" ]; then
ARGS+=(--only-changed)
fi
if [ -n "${{ github.event.inputs.sinceTag }}" ]; then
ARGS+=(--since-tag="${{ github.event.inputs.sinceTag }}")
fi
./dev-tools/release-plugins.sh "${ARGS[@]}" "${{ matrix.repo }}"

View File

@@ -43,7 +43,8 @@ jobs:
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
end:
runs-on: ubuntu-latest
needs:

View File

@@ -1,23 +1,7 @@
name: Build Artifacts
on:
workflow_call:
inputs:
plugin-version:
description: "Kestra version"
default: 'LATEST'
required: true
type: string
outputs:
docker-tag:
value: ${{ jobs.build.outputs.docker-tag }}
description: "The Docker image Tag for Kestra"
docker-artifact-name:
value: ${{ jobs.build.outputs.docker-artifact-name }}
description: "The GitHub artifact containing the Kestra docker image name."
plugins:
value: ${{ jobs.build.outputs.plugins }}
description: "The Kestra plugins list used for the build."
workflow_call: {}
jobs:
build:
@@ -82,55 +66,6 @@ jobs:
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
# Docker Tag
- name: Setup - Docker vars
id: vars
shell: bash
run: |
TAG=${GITHUB_REF#refs/*/}
if [[ $TAG = "master" ]]
then
TAG="latest";
elif [[ $TAG = "develop" ]]
then
TAG="develop";
elif [[ $TAG = v* ]]
then
TAG="${TAG}";
else
TAG="build-${{ github.run_id }}";
fi
echo "tag=${TAG}" >> $GITHUB_OUTPUT
echo "artifact=docker-kestra-${TAG}" >> $GITHUB_OUTPUT
# Docker setup
- name: Docker - Setup QEMU
uses: docker/setup-qemu-action@v3
- name: Docker - Fix Qemu
shell: bash
run: |
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
- name: Docker - Setup Buildx
uses: docker/setup-buildx-action@v3
# Docker Build
- name: Docker - Build & export image
uses: docker/build-push-action@v6
if: "!startsWith(github.ref, 'refs/tags/v')"
with:
context: .
push: false
file: Dockerfile
tags: |
kestra/kestra:${{ steps.vars.outputs.tag }}
build-args: |
KESTRA_PLUGINS=${{ steps.plugins.outputs.plugins }}
APT_PACKAGES=${{ env.DOCKER_APT_PACKAGES }}
PYTHON_LIBRARIES=${{ env.DOCKER_PYTHON_LIBRARIES }}
outputs: type=docker,dest=/tmp/${{ steps.vars.outputs.artifact }}.tar
# Upload artifacts
- name: Artifacts - Upload JAR
uses: actions/upload-artifact@v4
@@ -143,10 +78,3 @@ jobs:
with:
name: exe
path: build/executable/
- name: Artifacts - Upload Docker
uses: actions/upload-artifact@v4
if: "!startsWith(github.ref, 'refs/tags/v')"
with:
name: ${{ steps.vars.outputs.artifact }}
path: /tmp/${{ steps.vars.outputs.artifact }}.tar

View File

@@ -1,14 +1,17 @@
name: Github - Release
on:
workflow_dispatch:
workflow_call:
secrets:
GH_PERSONAL_TOKEN:
description: "The Github personal token."
required: true
push:
tags:
- '*'
SLACK_RELEASES_WEBHOOK_URL:
description: "The Slack webhook URL."
required: true
jobs:
publish:
@@ -35,7 +38,7 @@ jobs:
# Download Exec
# Must be done after checkout actions
- name: Artifacts - Download executable
uses: actions/download-artifact@v4
uses: actions/download-artifact@v5
if: startsWith(github.ref, 'refs/tags/v')
with:
name: exe

View File

@@ -41,8 +41,6 @@ jobs:
name: Build Artifacts
if: ${{ github.event.inputs.force-download-artifact == 'true' }}
uses: ./.github/workflows/workflow-build-artifacts.yml
with:
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
# ********************************************************************************************************************
# Docker
# ********************************************************************************************************************
@@ -122,7 +120,7 @@ jobs:
# Build Docker Image
- name: Artifacts - Download executable
uses: actions/download-artifact@v4
uses: actions/download-artifact@v5
with:
name: exe
path: build/executable

View File

@@ -0,0 +1,15 @@
name: Pull Request - Delete Docker
on:
pull_request:
types: [closed]
jobs:
publish:
name: Pull Request - Delete Docker
runs-on: ubuntu-latest
steps:
- uses: dataaxiom/ghcr-cleanup-action@v1
with:
package: kestra-pr
delete-tags: ${{ github.event.pull_request.number }}

View File

@@ -0,0 +1,76 @@
name: Pull Request - Publish Docker
on:
pull_request:
branches:
- develop
jobs:
build-artifacts:
name: Build Artifacts
uses: ./.github/workflows/workflow-build-artifacts.yml
publish:
name: Publish Docker
runs-on: ubuntu-latest
needs: build-artifacts
env:
GITHUB_IMAGE_PATH: "ghcr.io/kestra-io/kestra-pr"
steps:
- name: Checkout - Current ref
uses: actions/checkout@v4
with:
fetch-depth: 0
# Docker setup
- name: Docker - Setup QEMU
uses: docker/setup-qemu-action@v3
- name: Docker - Setup Docker Buildx
uses: docker/setup-buildx-action@v3
# Docker Login
- name: Login to GHCR
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
# Build Docker Image
- name: Artifacts - Download executable
uses: actions/download-artifact@v5
with:
name: exe
path: build/executable
- name: Docker - Copy exe to image
shell: bash
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
- name: Docker - Build image
uses: docker/build-push-action@v6
with:
context: .
file: ./Dockerfile.pr
push: true
tags: ${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }}
platforms: linux/amd64,linux/arm64
# Add comment on pull request
- name: Add comment to PR
uses: actions/github-script@v7
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |
await github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: `**🐋 Docker image**: \`${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }}\`\n` +
`\n` +
`\`\`\`bash\n` +
`docker run --pull=always --rm -it -p 8080:8080 --user=root -v /var/run/docker.sock:/var/run/docker.sock -v /tmp:/tmp ${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }} server local\n` +
`\`\`\``
})

View File

@@ -42,12 +42,16 @@ on:
SONATYPE_GPG_FILE:
description: "The Sonatype GPG file."
required: true
GH_PERSONAL_TOKEN:
description: "GH personnal Token."
required: true
SLACK_RELEASES_WEBHOOK_URL:
description: "Slack webhook for releases channel."
required: true
jobs:
build-artifacts:
name: Build - Artifacts
uses: ./.github/workflows/workflow-build-artifacts.yml
with:
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
Docker:
name: Publish Docker
@@ -77,4 +81,5 @@ jobs:
if: startsWith(github.ref, 'refs/tags/v')
uses: ./.github/workflows/workflow-github-release.yml
secrets:
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}

View File

@@ -19,6 +19,7 @@
#plugin-databricks:io.kestra.plugin:plugin-databricks:LATEST
#plugin-datahub:io.kestra.plugin:plugin-datahub:LATEST
#plugin-dataform:io.kestra.plugin:plugin-dataform:LATEST
#plugin-datagen:io.kestra.plugin:plugin-datagen:LATEST
#plugin-dbt:io.kestra.plugin:plugin-dbt:LATEST
#plugin-debezium:io.kestra.plugin:plugin-debezium-db2:LATEST
#plugin-debezium:io.kestra.plugin:plugin-debezium-mongodb:LATEST
@@ -26,6 +27,7 @@
#plugin-debezium:io.kestra.plugin:plugin-debezium-oracle:LATEST
#plugin-debezium:io.kestra.plugin:plugin-debezium-postgres:LATEST
#plugin-debezium:io.kestra.plugin:plugin-debezium-sqlserver:LATEST
#plugin-deepseek:io.kestra.plugin:plugin-deepseek:LATEST
#plugin-docker:io.kestra.plugin:plugin-docker:LATEST
#plugin-elasticsearch:io.kestra.plugin:plugin-elasticsearch:LATEST
#plugin-fivetran:io.kestra.plugin:plugin-fivetran:LATEST
@@ -86,13 +88,18 @@
#plugin-powerbi:io.kestra.plugin:plugin-powerbi:LATEST
#plugin-pulsar:io.kestra.plugin:plugin-pulsar:LATEST
#plugin-redis:io.kestra.plugin:plugin-redis:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-bun:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-deno:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-go:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-groovy:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-jbang:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-julia:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-jython:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-lua:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-nashorn:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-node:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-perl:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-php:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-powershell:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-python:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-r:LATEST

7
Dockerfile.pr Normal file
View File

@@ -0,0 +1,7 @@
FROM kestra/kestra:develop
USER root
COPY --chown=kestra:kestra docker /
USER kestra

View File

@@ -65,10 +65,6 @@ Kestra is an open-source, event-driven orchestration platform that makes both **
## 🚀 Quick Start
### Try the Live Demo
Try Kestra with our [**Live Demo**](https://demo.kestra.io/ui/login?auto). No installation required!
### Get Started Locally in 5 Minutes
#### Launch Kestra in Docker

View File

@@ -16,7 +16,7 @@ plugins {
id "java"
id 'java-library'
id "idea"
id "com.gradleup.shadow" version "8.3.8"
id "com.gradleup.shadow" version "8.3.9"
id "application"
// test
@@ -225,14 +225,14 @@ subprojects {
}
testlogger {
theme 'mocha-parallel'
showExceptions true
showFullStackTraces true
showCauses true
slowThreshold 2000
showStandardStreams true
showPassedStandardStreams false
showSkippedStandardStreams true
theme = 'mocha-parallel'
showExceptions = true
showFullStackTraces = true
showCauses = true
slowThreshold = 2000
showStandardStreams = true
showPassedStandardStreams = false
showSkippedStandardStreams = true
}
}
}
@@ -410,7 +410,7 @@ jar {
shadowJar {
archiveClassifier.set(null)
mergeServiceFiles()
zip64 true
zip64 = true
}
distZip.dependsOn shadowJar
@@ -427,8 +427,8 @@ def executableDir = layout.buildDirectory.dir("executable")
def executable = layout.buildDirectory.file("executable/${project.name}-${project.version}").get().asFile
tasks.register('writeExecutableJar') {
group "build"
description "Write an executable jar from shadow jar"
group = "build"
description = "Write an executable jar from shadow jar"
dependsOn = [shadowJar]
final shadowJarFile = tasks.shadowJar.outputs.files.singleFile
@@ -454,8 +454,8 @@ tasks.register('writeExecutableJar') {
}
tasks.register('executableJar', Zip) {
group "build"
description "Zip the executable jar"
group = "build"
description = "Zip the executable jar"
dependsOn = [writeExecutableJar]
archiveFileName = "${project.name}-${project.version}.zip"
@@ -620,6 +620,28 @@ subprojects {subProject ->
}
}
}
if (subProject.name != 'platform' && subProject.name != 'cli') {
// only if a test source set actually exists (avoids empty artifacts)
def hasTests = subProject.extensions.findByName('sourceSets')?.findByName('test') != null
if (hasTests) {
// wire the artifact onto every Maven publication of this subproject
publishing {
publications {
withType(MavenPublication).configureEach { pub ->
// keep the normal java component + sources/javadoc already configured
pub.artifact(subProject.tasks.named('testsJar').get())
}
}
}
// make sure publish tasks build the tests jar first
tasks.matching { it.name.startsWith('publish') }.configureEach {
dependsOn subProject.tasks.named('testsJar')
}
}
}
}
}

View File

@@ -16,6 +16,6 @@ abstract public class AbstractServerCommand extends AbstractCommand implements S
}
protected static int defaultWorkerThread() {
return Runtime.getRuntime().availableProcessors() * 4;
return Runtime.getRuntime().availableProcessors() * 8;
}
}

View File

@@ -48,7 +48,7 @@ public class StandAloneCommand extends AbstractServerCommand {
@CommandLine.Option(names = "--tenant", description = "Tenant identifier, Required to load flows from path with the enterprise edition")
private String tenantId;
@CommandLine.Option(names = {"--worker-thread"}, description = "the number of worker threads, defaults to four times the number of available processors. Set it to 0 to avoid starting a worker.")
@CommandLine.Option(names = {"--worker-thread"}, description = "the number of worker threads, defaults to eight times the number of available processors. Set it to 0 to avoid starting a worker.")
private int workerThread = defaultWorkerThread();
@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "a list of execution identifiers to skip, separated by a coma; for troubleshooting purpose only")

View File

@@ -22,7 +22,7 @@ public class WorkerCommand extends AbstractServerCommand {
@Inject
private ApplicationContext applicationContext;
@Option(names = {"-t", "--thread"}, description = "The max number of worker threads, defaults to four times the number of available processors")
@Option(names = {"-t", "--thread"}, description = "The max number of worker threads, defaults to eight times the number of available processors")
private int thread = defaultWorkerThread();
@Option(names = {"-g", "--worker-group"}, description = "The worker group key, must match the regex [a-zA-Z0-9_-]+ (EE only)")

View File

@@ -162,7 +162,15 @@ public class FileChangedEventListener {
}
} catch (NoSuchFileException e) {
log.error("File not found: {}", entry, e);
log.warn("File not found: {}, deleting it", entry, e);
// the file might have been deleted while reading so if not found we try to delete the flow
flows.stream()
.filter(flow -> flow.getPath().equals(filePath.toString()))
.findFirst()
.ifPresent(flowWithPath -> {
flowFilesManager.deleteFlow(flowWithPath.getTenantId(), flowWithPath.getNamespace(), flowWithPath.getId());
this.flows.removeIf(fwp -> fwp.uidWithoutRevision().equals(flowWithPath.uidWithoutRevision()));
});
} catch (IOException e) {
log.error("Error reading file: {}", entry, e);
}

View File

@@ -122,12 +122,13 @@ public class JsonSchemaGenerator {
if (jsonNode instanceof ObjectNode clazzSchema && clazzSchema.get("required") instanceof ArrayNode requiredPropsNode && clazzSchema.get("properties") instanceof ObjectNode properties) {
List<String> requiredFieldValues = StreamSupport.stream(requiredPropsNode.spliterator(), false)
.map(JsonNode::asText)
.toList();
.collect(Collectors.toList());
properties.fields().forEachRemaining(e -> {
int indexInRequiredArray = requiredFieldValues.indexOf(e.getKey());
if (indexInRequiredArray != -1 && e.getValue() instanceof ObjectNode valueNode && valueNode.has("default")) {
requiredPropsNode.remove(indexInRequiredArray);
requiredFieldValues.remove(indexInRequiredArray);
}
});

View File

@@ -1,11 +1,10 @@
package io.kestra.core.models;
import io.kestra.core.utils.MapUtils;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
public record Label(@NotNull String key, @NotNull String value) {
@@ -29,11 +28,36 @@ public record Label(@NotNull String key, @NotNull String value) {
* @return the nested {@link Map}.
*/
public static Map<String, Object> toNestedMap(List<Label> labels) {
Map<String, Object> asMap = labels.stream()
return MapUtils.flattenToNestedMap(toMap(labels));
}
/**
* Static helper method for converting a list of labels to a flat map.
* Key order is kept.
*
* @param labels The list of {@link Label} to be converted.
* @return the flat {@link Map}.
*/
public static Map<String, String> toMap(@Nullable List<Label> labels) {
if (labels == null || labels.isEmpty()) return Collections.emptyMap();
return labels.stream()
.filter(label -> label.value() != null && label.key() != null)
// using an accumulator in case labels with the same key exists: the first is kept
.collect(Collectors.toMap(Label::key, Label::value, (first, second) -> first));
return MapUtils.flattenToNestedMap(asMap);
// using an accumulator in case labels with the same key exists: the second is kept
.collect(Collectors.toMap(Label::key, Label::value, (first, second) -> second, LinkedHashMap::new));
}
/**
* Static helper method for deduplicating a list of labels by their key.
* Value of the last key occurrence is kept.
*
* @param labels The list of {@link Label} to be deduplicated.
* @return the deduplicated {@link List}.
*/
public static List<Label> deduplicate(@Nullable List<Label> labels) {
if (labels == null || labels.isEmpty()) return Collections.emptyList();
return toMap(labels).entrySet().stream()
.map(entry -> new Label(entry.getKey(), entry.getValue()))
.collect(Collectors.toCollection(ArrayList::new));
}
/**

View File

@@ -6,9 +6,9 @@ import com.fasterxml.jackson.annotation.JsonValue;
import io.kestra.core.exceptions.InvalidQueryFiltersException;
import io.kestra.core.models.dashboards.filters.*;
import io.kestra.core.utils.Enums;
import java.util.ArrayList;
import lombok.Builder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -49,42 +49,27 @@ public record QueryFilter(
PREFIX
}
@SuppressWarnings("unchecked")
private List<Object> asValues(Object value) {
return value instanceof String valueStr ? Arrays.asList(valueStr.split(",")) : (List<Object>) value;
}
@SuppressWarnings("unchecked")
public <T extends Enum<T>> AbstractFilter<T> toDashboardFilterBuilder(T field, Object value) {
switch (this.operation) {
case EQUALS:
return EqualTo.<T>builder().field(field).value(value).build();
case NOT_EQUALS:
return NotEqualTo.<T>builder().field(field).value(value).build();
case GREATER_THAN:
return GreaterThan.<T>builder().field(field).value(value).build();
case LESS_THAN:
return LessThan.<T>builder().field(field).value(value).build();
case GREATER_THAN_OR_EQUAL_TO:
return GreaterThanOrEqualTo.<T>builder().field(field).value(value).build();
case LESS_THAN_OR_EQUAL_TO:
return LessThanOrEqualTo.<T>builder().field(field).value(value).build();
case IN:
return In.<T>builder().field(field).values(asValues(value)).build();
case NOT_IN:
return NotIn.<T>builder().field(field).values(asValues(value)).build();
case STARTS_WITH:
return StartsWith.<T>builder().field(field).value(value.toString()).build();
case ENDS_WITH:
return EndsWith.<T>builder().field(field).value(value.toString()).build();
case CONTAINS:
return Contains.<T>builder().field(field).value(value.toString()).build();
case REGEX:
return Regex.<T>builder().field(field).value(value.toString()).build();
case PREFIX:
return Regex.<T>builder().field(field).value("^" + value.toString().replace(".", "\\.") + "(?:\\..+)?$").build();
default:
throw new IllegalArgumentException("Unsupported operation: " + this.operation);
}
return switch (this.operation) {
case EQUALS -> EqualTo.<T>builder().field(field).value(value).build();
case NOT_EQUALS -> NotEqualTo.<T>builder().field(field).value(value).build();
case GREATER_THAN -> GreaterThan.<T>builder().field(field).value(value).build();
case LESS_THAN -> LessThan.<T>builder().field(field).value(value).build();
case GREATER_THAN_OR_EQUAL_TO -> GreaterThanOrEqualTo.<T>builder().field(field).value(value).build();
case LESS_THAN_OR_EQUAL_TO -> LessThanOrEqualTo.<T>builder().field(field).value(value).build();
case IN -> In.<T>builder().field(field).values(asValues(value)).build();
case NOT_IN -> NotIn.<T>builder().field(field).values(asValues(value)).build();
case STARTS_WITH -> StartsWith.<T>builder().field(field).value(value.toString()).build();
case ENDS_WITH -> EndsWith.<T>builder().field(field).value(value.toString()).build();
case CONTAINS -> Contains.<T>builder().field(field).value(value.toString()).build();
case REGEX -> Regex.<T>builder().field(field).value(value.toString()).build();
case PREFIX -> Regex.<T>builder().field(field).value("^" + value.toString().replace(".", "\\.") + "(?:\\..+)?$").build();
};
}
public enum Field {
@@ -154,6 +139,12 @@ public record QueryFilter(
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
}
},
EXECUTION_ID("executionId") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
}
},
CHILD_FILTER("childFilter") {
@Override
public List<Op> supportedOp() {
@@ -228,7 +219,7 @@ public record QueryFilter(
@Override
public List<Field> supportedField() {
return List.of(Field.QUERY, Field.SCOPE, Field.NAMESPACE, Field.START_DATE,
Field.END_DATE, Field.FLOW_ID, Field.TRIGGER_ID, Field.MIN_LEVEL
Field.END_DATE, Field.FLOW_ID, Field.TRIGGER_ID, Field.MIN_LEVEL, Field.EXECUTION_ID
);
}
},

View File

@@ -25,6 +25,7 @@ import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
import io.kestra.core.services.LabelService;
import io.kestra.core.test.flow.TaskFixture;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.MapUtils;
import io.swagger.v3.oas.annotations.Hidden;
import jakarta.annotation.Nullable;
@@ -131,12 +132,12 @@ public class Execution implements DeletedInterface, TenantInterface {
* @param labels The Flow labels.
* @return a new {@link Execution}.
*/
public static Execution newExecution(final Flow flow, final List<Label> labels) {
public static Execution newExecution(final FlowInterface flow, final List<Label> labels) {
return newExecution(flow, null, labels, Optional.empty());
}
public List<Label> getLabels() {
return Optional.ofNullable(this.labels).orElse(new ArrayList<>());
return ListUtils.emptyOnNull(this.labels);
}
/**
@@ -181,8 +182,22 @@ public class Execution implements DeletedInterface, TenantInterface {
}
/**
* Customization of Lombok-generated builder.
*/
public static class ExecutionBuilder {
/**
* Enforce unique values of {@link Label} when using the builder.
*
* @param labels The labels.
* @return Deduplicated labels.
*/
public ExecutionBuilder labels(List<Label> labels) {
this.labels = Label.deduplicate(labels);
return this;
}
void prebuild() {
this.originalId = this.id;
this.metadata = ExecutionMetadata.builder()
@@ -231,7 +246,6 @@ public class Execution implements DeletedInterface, TenantInterface {
}
public Execution withLabels(List<Label> labels) {
return new Execution(
this.tenantId,
this.id,
@@ -241,7 +255,7 @@ public class Execution implements DeletedInterface, TenantInterface {
this.taskRunList,
this.inputs,
this.outputs,
labels,
Label.deduplicate(labels),
this.variables,
this.state,
this.parentId,
@@ -400,7 +414,7 @@ public class Execution implements DeletedInterface, TenantInterface {
*
* @param resolvedTasks normal tasks
* @param resolvedErrors errors tasks
* @param resolvedErrors finally tasks
* @param resolvedFinally finally tasks
* @return the flow we need to follow
*/
public List<ResolvedTask> findTaskDependingFlowState(

View File

@@ -38,6 +38,8 @@ public abstract class AbstractFlow implements FlowInterface {
@Min(value = 1)
Integer revision;
String description;
@Valid
List<Input<?>> inputs;

View File

@@ -61,13 +61,10 @@ public class Flow extends AbstractFlow implements HasUID {
}
});
String description;
Map<String, Object> variables;
@Valid
@NotEmpty
@Schema(additionalProperties = Schema.AdditionalPropertiesValue.TRUE)
List<Task> tasks;
@@ -125,7 +122,7 @@ public class Flow extends AbstractFlow implements HasUID {
AbstractRetry retry;
@Valid
@PluginProperty(beta = true)
@PluginProperty
List<SLA> sla;
public Stream<String> allTypes() {

View File

@@ -4,6 +4,7 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.utils.IdUtils;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import java.util.Optional;
@@ -57,6 +58,7 @@ public interface FlowId {
@Getter
@AllArgsConstructor
@EqualsAndHashCode
class Default implements FlowId {
private final String tenantId;
private final String namespace;

View File

@@ -31,6 +31,8 @@ public interface FlowInterface extends FlowId, DeletedInterface, TenantInterface
Pattern YAML_REVISION_MATCHER = Pattern.compile("(?m)^revision: \\d+\n?");
String getDescription();
boolean isDisabled();
boolean isDeleted();

View File

@@ -116,7 +116,7 @@ public class State {
}
public Instant maxDate() {
if (this.histories.size() == 0) {
if (this.histories.isEmpty()) {
return Instant.now();
}
@@ -124,7 +124,7 @@ public class State {
}
public Instant minDate() {
if (this.histories.size() == 0) {
if (this.histories.isEmpty()) {
return Instant.now();
}
@@ -173,6 +173,11 @@ public class State {
return this.current.isBreakpoint();
}
@JsonIgnore
public boolean isQueued() {
return this.current.isQueued();
}
@JsonIgnore
public boolean isRetrying() {
return this.current.isRetrying();
@@ -206,6 +211,14 @@ public class State {
return this.histories.get(this.histories.size() - 2).state.isPaused();
}
/**
* Return true if the execution has failed, then was restarted.
* This is to disambiguate between a RESTARTED after PAUSED and RESTARTED after FAILED state.
*/
public boolean failedThenRestarted() {
return this.current == Type.RESTARTED && this.histories.get(this.histories.size() - 2).state.isFailed();
}
@Introspected
public enum Type {
CREATED,
@@ -264,6 +277,10 @@ public class State {
return this == Type.KILLED;
}
public boolean isQueued(){
return this == Type.QUEUED;
}
/**
* @return states that are terminal to an execution
*/

View File

@@ -20,9 +20,8 @@ public class FileInput extends Input<URI> {
private static final String DEFAULT_EXTENSION = ".upl";
@Builder.Default
@Deprecated(since = "0.24", forRemoval = true)
public String extension = DEFAULT_EXTENSION;
public String extension;
@Override
public void validate(URI input) throws ConstraintViolationException {
@@ -33,6 +32,7 @@ public class FileInput extends Input<URI> {
String res = inputs.stream()
.filter(in -> in instanceof FileInput)
.filter(in -> in.getId().equals(fileName))
.filter(flowInput -> ((FileInput) flowInput).getExtension() != null)
.map(flowInput -> ((FileInput) flowInput).getExtension())
.findFirst()
.orElse(FileInput.DEFAULT_EXTENSION);

View File

@@ -222,6 +222,7 @@ public class Trigger extends TriggerContext implements HasUID {
}
// If trigger is a schedule and execution ended after the next execution date
else if (abstractTrigger instanceof Schedule schedule &&
this.getNextExecutionDate() != null &&
execution.getState().getEndDate().get().isAfter(this.getNextExecutionDate().toInstant())
) {
RecoverMissedSchedules recoverMissedSchedules = Optional.ofNullable(schedule.getRecoverMissedSchedules())

View File

@@ -28,6 +28,7 @@ public interface QueueFactoryInterface {
String SUBFLOWEXECUTIONRESULT_NAMED = "subflowExecutionResultQueue";
String CLUSTER_EVENT_NAMED = "clusterEventQueue";
String SUBFLOWEXECUTIONEND_NAMED = "subflowExecutionEndQueue";
String EXECUTION_RUNNING_NAMED = "executionRunningQueue";
QueueInterface<Execution> execution();
@@ -58,4 +59,6 @@ public interface QueueFactoryInterface {
QueueInterface<SubflowExecutionResult> subflowExecutionResult();
QueueInterface<SubflowExecutionEnd> subflowExecutionEnd();
QueueInterface<ExecutionRunning> executionRunning();
}

View File

@@ -27,7 +27,7 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
void delete(String consumerGroup, T message) throws QueueException;
default Runnable receive(Consumer<Either<T, DeserializationException>> consumer) {
return receive((String) null, consumer);
return receive(null, consumer, false);
}
default Runnable receive(String consumerGroup, Consumer<Either<T, DeserializationException>> consumer) {

View File

@@ -27,8 +27,6 @@ public class QueueService {
return ((Executor) object).getExecution().getId();
} else if (object.getClass() == MetricEntry.class) {
return null;
} else if (object.getClass() == ExecutionRunning.class) {
return ((ExecutionRunning) object).getExecution().getId();
} else if (object.getClass() == SubflowExecutionEnd.class) {
return ((SubflowExecutionEnd) object).getParentExecutionId();
} else {

View File

@@ -0,0 +1,12 @@
package io.kestra.core.queues;
import java.io.Serial;
public class UnsupportedMessageException extends QueueException {
@Serial
private static final long serialVersionUID = 1L;
public UnsupportedMessageException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -161,7 +161,7 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
}
List<Execution> lastExecutions(
@Nullable String tenantId,
String tenantId,
@Nullable List<FlowFilter> flows
);
}

View File

@@ -81,9 +81,7 @@ public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry
Flux<LogEntry> findAsync(
@Nullable String tenantId,
@Nullable String namespace,
@Nullable Level minLevel,
ZonedDateTime startDate
List<QueryFilter> filters
);
Flux<LogEntry> findAllAsync(@Nullable String tenantId);
@@ -96,5 +94,7 @@ public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry
void deleteByQuery(String tenantId, String namespace, String flowId, String triggerId);
void deleteByFilters(String tenantId, List<QueryFilter> filters);
int deleteByQuery(String tenantId, String namespace, String flowId, String executionId, List<Level> logLevels, ZonedDateTime startDate, ZonedDateTime endDate);
}

View File

@@ -1,5 +1,6 @@
package io.kestra.core.runners;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.utils.IdUtils;
import jakarta.validation.constraints.NotNull;
@@ -11,7 +12,7 @@ import lombok.With;
@Value
@AllArgsConstructor
@Builder
public class ExecutionRunning {
public class ExecutionRunning implements HasUID {
String tenantId;
@NotNull
@@ -26,6 +27,7 @@ public class ExecutionRunning {
@With
ConcurrencyState concurrencyState;
@Override
public String uid() {
return IdUtils.fromPartsAndSeparator('|', this.tenantId, this.namespace, this.flowId, this.execution.getId());
}

View File

@@ -86,7 +86,7 @@ public class Executor {
public Boolean canBeProcessed() {
return !(this.getException() != null || this.getFlow() == null || this.getFlow() instanceof FlowWithException || this.getFlow().getTasks() == null ||
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint());
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint() || this.getExecution().getState().isQueued());
}
public Executor withFlow(FlowWithSource flow) {

View File

@@ -102,49 +102,39 @@ public class ExecutorService {
return this.flowExecutorInterface;
}
public Executor checkConcurrencyLimit(Executor executor, FlowInterface flow, Execution execution, long count) {
// if above the limit, handle concurrency limit based on its behavior
if (count >= flow.getConcurrency().getLimit()) {
public ExecutionRunning processExecutionRunning(FlowInterface flow, int runningCount, ExecutionRunning executionRunning) {
// if concurrency was removed, it can be null as we always get the latest flow definition
if (flow.getConcurrency() != null && runningCount >= flow.getConcurrency().getLimit()) {
return switch (flow.getConcurrency().getBehavior()) {
case QUEUE -> {
var newExecution = execution.withState(State.Type.QUEUED);
ExecutionRunning executionRunning = ExecutionRunning.builder()
.tenantId(flow.getTenantId())
.namespace(flow.getNamespace())
.flowId(flow.getId())
.execution(newExecution)
.concurrencyState(ExecutionRunning.ConcurrencyState.QUEUED)
.build();
// when max concurrency is reached, we throttle the execution and stop processing
logService.logExecution(
newExecution,
executionRunning.getExecution(),
Level.INFO,
"Flow is queued due to concurrency limit exceeded, {} running(s)",
count
"Execution is queued due to concurrency limit exceeded, {} running(s)",
runningCount
);
// return the execution queued
yield executor
.withExecutionRunning(executionRunning)
.withExecution(newExecution, "checkConcurrencyLimit");
var newExecution = executionRunning.getExecution().withState(State.Type.QUEUED);
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_QUEUED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_QUEUED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
yield executionRunning
.withExecution(newExecution)
.withConcurrencyState(ExecutionRunning.ConcurrencyState.QUEUED);
}
case CANCEL ->
executor.withExecution(execution.withState(State.Type.CANCELLED), "checkConcurrencyLimit");
executionRunning
.withExecution(executionRunning.getExecution().withState(State.Type.CANCELLED))
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
case FAIL ->
executor.withException(new IllegalStateException("Flow is FAILED due to concurrency limit exceeded"), "checkConcurrencyLimit");
executionRunning
.withExecution(executionRunning.getExecution().failedExecutionFromExecutor(new IllegalStateException("Execution is FAILED due to concurrency limit exceeded")).getExecution())
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
};
}
// if under the limit, update the executor with a RUNNING ExecutionRunning to track them
var executionRunning = new ExecutionRunning(
flow.getTenantId(),
flow.getNamespace(),
flow.getId(),
executor.getExecution(),
ExecutionRunning.ConcurrencyState.RUNNING
);
return executor.withExecutionRunning(executionRunning);
// if under the limit, run it!
return executionRunning
.withExecution(executionRunning.getExecution().withState(State.Type.RUNNING))
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
}
public Executor process(Executor executor) {

View File

@@ -286,18 +286,10 @@ public class FlowableUtils {
// start as many tasks as we have concurrency slots
return collect.values().stream()
.map(resolvedTasks -> filterCreated(resolvedTasks, taskRuns, parentTaskRun))
.map(resolvedTasks -> resolveSequentialNexts(execution, resolvedTasks, null, null, parentTaskRun))
.filter(resolvedTasks -> !resolvedTasks.isEmpty())
.limit(concurrencySlots)
.map(resolvedTasks -> resolvedTasks.getFirst().toNextTaskRun(execution))
.toList();
}
private static List<ResolvedTask> filterCreated(List<ResolvedTask> tasks, List<TaskRun> taskRuns, TaskRun parentTaskRun) {
return tasks.stream()
.filter(resolvedTask -> taskRuns.stream()
.noneMatch(taskRun -> FlowableUtils.isTaskRunFor(resolvedTask, taskRun, parentTaskRun))
)
.map(resolvedTasks -> resolvedTasks.getFirst())
.toList();
}

View File

@@ -764,6 +764,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
workerTask = workerTask.withTaskRun(workerTask.getTaskRun().withState(state));
WorkerTaskResult workerTaskResult = new WorkerTaskResult(workerTask.getTaskRun(), dynamicTaskRuns);
this.workerTaskResultQueue.emit(workerTaskResult);
// upload the cache file, hash may not be present if we didn't succeed in computing it
@@ -796,6 +797,10 @@ public class Worker implements Service, Runnable, AutoCloseable {
// If it's a message too big, we remove the outputs
failed = failed.withOutputs(Variables.empty());
}
if (e instanceof UnsupportedMessageException) {
// we expect the offending char is in the output so we remove it
failed = failed.withOutputs(Variables.empty());
}
WorkerTaskResult workerTaskResult = new WorkerTaskResult(failed);
RunContextLogger contextLogger = runContextLoggerFactory.create(workerTask);
contextLogger.logger().error("Unable to emit the worker task result to the queue: {}", e.getMessage(), e);
@@ -818,7 +823,11 @@ public class Worker implements Service, Runnable, AutoCloseable {
private Optional<String> hashTask(RunContext runContext, Task task) {
try {
var map = JacksonMapper.toMap(task);
var rMap = runContext.render(map);
// If there are task provided variables, rendering the task may fail.
// The best we can do is to add a fake 'workingDir' as it's an often added variables,
// and it should not be part of the task hash.
Map<String, Object> variables = Map.of("workingDir", "workingDir");
var rMap = runContext.render(map, variables);
var json = JacksonMapper.ofJson().writeValueAsBytes(rMap);
MessageDigest digest = MessageDigest.getInstance("SHA-256");
digest.update(json);

View File

@@ -102,6 +102,19 @@ public abstract class AbstractDate {
}
if (value instanceof Long longValue) {
if(value.toString().length() == 13) {
return Instant.ofEpochMilli(longValue).atZone(zoneId);
}else if(value.toString().length() == 19 ){
if(value.toString().endsWith("000")){
long seconds = longValue/1_000_000_000;
int nanos = (int) (longValue%1_000_000_000);
return Instant.ofEpochSecond(seconds,nanos).atZone(zoneId);
}else{
long milliseconds = longValue/1_000_000;
int micros = (int) (longValue%1_000_000);
return Instant.ofEpochMilli(milliseconds).atZone(zoneId).withNano(micros*1000);
}
}
return Instant.ofEpochSecond(longValue).atZone(zoneId);
}

View File

@@ -8,6 +8,7 @@ import io.kestra.core.events.CrudEventType;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
@@ -318,7 +319,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
}
synchronized (this) { // we need a sync block as we read then update so we should not do it in multiple threads concurrently
List<Trigger> triggers = triggerState.findAllForAllTenants();
Map<String, Trigger> triggers = triggerState.findAllForAllTenants().stream().collect(Collectors.toMap(HasUID::uid, Function.identity()));
flows
.stream()
@@ -328,7 +329,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
.flatMap(flow -> flow.getTriggers().stream().filter(trigger -> trigger instanceof WorkerTriggerInterface).map(trigger -> new FlowAndTrigger(flow, trigger)))
.distinct()
.forEach(flowAndTrigger -> {
Optional<Trigger> trigger = triggers.stream().filter(t -> t.uid().equals(Trigger.uid(flowAndTrigger.flow(), flowAndTrigger.trigger()))).findFirst(); // must have one or none
String triggerUid = Trigger.uid(flowAndTrigger.flow(), flowAndTrigger.trigger());
Optional<Trigger> trigger = Optional.ofNullable(triggers.get(triggerUid));
if (trigger.isEmpty()) {
RunContext runContext = runContextFactory.of(flowAndTrigger.flow(), flowAndTrigger.trigger());
ConditionContext conditionContext = conditionService.conditionContext(runContext, flowAndTrigger.flow(), null);
@@ -467,9 +469,12 @@ public abstract class AbstractScheduler implements Scheduler, Service {
private List<FlowWithTriggers> computeSchedulable(List<FlowWithSource> flows, List<Trigger> triggerContextsToEvaluate, ScheduleContextInterface scheduleContext) {
List<String> flowToKeep = triggerContextsToEvaluate.stream().map(Trigger::getFlowId).toList();
List<String> flowIds = flows.stream().map(FlowId::uidWithoutRevision).toList();
Map<String, Trigger> triggerById = triggerContextsToEvaluate.stream().collect(Collectors.toMap(HasUID::uid, Function.identity()));
// delete trigger which flow has been deleted
triggerContextsToEvaluate.stream()
.filter(trigger -> !flows.stream().map(FlowId::uidWithoutRevision).toList().contains(FlowId.uid(trigger)))
.filter(trigger -> !flowIds.contains(FlowId.uid(trigger)))
.forEach(trigger -> {
try {
this.triggerState.delete(trigger);
@@ -491,12 +496,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
.map(abstractTrigger -> {
RunContext runContext = runContextFactory.of(flow, abstractTrigger);
ConditionContext conditionContext = conditionService.conditionContext(runContext, flow, null);
Trigger triggerContext = null;
Trigger lastTrigger = triggerContextsToEvaluate
.stream()
.filter(triggerContextToFind -> triggerContextToFind.uid().equals(Trigger.uid(flow, abstractTrigger)))
.findFirst()
.orElse(null);
Trigger triggerContext;
Trigger lastTrigger = triggerById.get(Trigger.uid(flow, abstractTrigger));
// If a trigger is not found in triggers to evaluate, then we ignore it
if (lastTrigger == null) {
return null;

View File

@@ -250,9 +250,10 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
stateLock.lock();
// Optional callback to be executed at the end.
Runnable returnCallback = null;
localServiceState = localServiceState(service);
try {
localServiceState = localServiceState(service);
if (localServiceState == null) {
return null; // service has been unregistered.
}
@@ -301,7 +302,7 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
// Update the local instance
this.serviceRegistry.register(localServiceState.with(remoteInstance));
} catch (Exception e) {
final ServiceInstance localInstance = localServiceState(service).instance();
final ServiceInstance localInstance = localServiceState.instance();
log.error("[Service id={}, type='{}', hostname='{}'] Failed to update state to {}. Error: {}",
localInstance.uid(),
localInstance.type(),
@@ -317,7 +318,7 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
returnCallback.run();
}
}
return localServiceState(service).instance();
return Optional.ofNullable(localServiceState(service)).map(LocalServiceState::instance).orElse(null);
}
private void mayDisableStateUpdate(final Service service, final ServiceInstance instance) {
@@ -371,9 +372,11 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
final Service service,
final ServiceInstance instance,
final boolean isLivenessEnabled) {
// Never shutdown STANDALONE server or WEB_SERVER service.
if (instance.server().type().equals(ServerInstance.Type.STANDALONE) ||
instance.is(ServiceType.WEBSERVER)) {
// Never shutdown STANDALONE server or WEBSERVER and INDEXER services.
if (ServerInstance.Type.STANDALONE.equals(instance.server().type()) ||
instance.is(ServiceType.INDEXER) ||
instance.is(ServiceType.WEBSERVER)
) {
// Force the RUNNING state.
return Optional.of(instance.state(Service.ServiceState.RUNNING, now, null));
}

View File

@@ -1,5 +1,8 @@
package io.kestra.core.server;
import com.fasterxml.jackson.annotation.JsonCreator;
import io.kestra.core.utils.Enums;
/**
* Supported Kestra's service types.
*/
@@ -9,4 +12,14 @@ public enum ServiceType {
SCHEDULER,
WEBSERVER,
WORKER,
INVALID;
@JsonCreator
public static ServiceType fromString(final String value) {
try {
return Enums.getForNameIgnoreCase(value, ServiceType.class, INVALID);
} catch (IllegalArgumentException e) {
return INVALID;
}
}
}

View File

@@ -9,6 +9,7 @@ import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.validations.ModelValidator;
@@ -51,7 +52,6 @@ import java.util.stream.StreamSupport;
@Singleton
@Slf4j
public class FlowService {
@Inject
Optional<FlowRepositoryInterface> flowRepository;
@@ -236,6 +236,7 @@ public class FlowService {
}
List<String> warnings = new ArrayList<>(checkValidSubflows(flow, tenantId));
List<io.kestra.plugin.core.trigger.Flow> flowTriggers = ListUtils.emptyOnNull(flow.getTriggers()).stream()
.filter(io.kestra.plugin.core.trigger.Flow.class::isInstance)
.map(io.kestra.plugin.core.trigger.Flow.class::cast)
@@ -246,6 +247,21 @@ public class FlowService {
}
});
// add warning for runnable properties (timeout, workerGroup, taskCache) when used not in a runnable
flow.allTasksWithChilds().forEach(task -> {
if (!(task instanceof RunnableTask<?>)) {
if (task.getTimeout() != null) {
warnings.add("The task '" + task.getId() + "' cannot use the 'timeout' property as it's only relevant for runnable tasks.");
}
if (task.getTaskCache() != null) {
warnings.add("The task '" + task.getId() + "' cannot use the 'taskCache' property as it's only relevant for runnable tasks.");
}
if (task.getWorkerGroup() != null) {
warnings.add("The task '" + task.getId() + "' cannot use the 'workerGroup' property as it's only relevant for runnable tasks.");
}
}
});
return warnings;
}
@@ -531,29 +547,26 @@ public class FlowService {
throw noRepositoryException();
}
List<FlowTopology> flowTopologies = flowTopologyRepository.get().findByFlow(tenant, namespace, id, destinationOnly);
return expandAll ? recursiveFlowTopology(tenant, namespace, id, destinationOnly) : flowTopologies.stream();
return expandAll ? recursiveFlowTopology(new ArrayList<>(), tenant, namespace, id, destinationOnly) : flowTopologyRepository.get().findByFlow(tenant, namespace, id, destinationOnly).stream();
}
private Stream<FlowTopology> recursiveFlowTopology(String tenantId, String namespace, String flowId, boolean destinationOnly) {
private Stream<FlowTopology> recursiveFlowTopology(List<FlowId> flowIds, String tenantId, String namespace, String id, boolean destinationOnly) {
if (flowTopologyRepository.isEmpty()) {
throw noRepositoryException();
}
List<FlowTopology> flowTopologies = flowTopologyRepository.get().findByFlow(tenantId, namespace, flowId, destinationOnly);
List<FlowTopology> subTopologies = flowTopologies.stream()
// filter on destination is not the current node to avoid an infinite loop
.filter(topology -> !(topology.getDestination().getTenantId().equals(tenantId) && topology.getDestination().getNamespace().equals(namespace) && topology.getDestination().getId().equals(flowId)))
.toList();
List<FlowTopology> flowTopologies = flowTopologyRepository.get().findByFlow(tenantId, namespace, id, destinationOnly);
if (subTopologies.isEmpty()) {
FlowId flowId = FlowId.of(tenantId, namespace, id, null);
if (flowIds.contains(flowId)) {
return flowTopologies.stream();
} else {
return Stream.concat(flowTopologies.stream(), subTopologies.stream()
.map(topology -> topology.getDestination())
// recursively fetch child nodes
.flatMap(destination -> recursiveFlowTopology(destination.getTenantId(), destination.getNamespace(), destination.getId(), destinationOnly)));
}
flowIds.add(flowId);
return flowTopologies.stream()
.flatMap(topology -> Stream.of(topology.getDestination(), topology.getSource()))
// recursively fetch child nodes
.flatMap(node -> recursiveFlowTopology(flowIds, node.getTenantId(), node.getNamespace(), node.getId(), destinationOnly));
}
private IllegalStateException noRepositoryException() {

View File

@@ -1,37 +1,193 @@
package io.kestra.core.utils;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
public class Either<L, R> {
private final Optional<L> left;
private final Optional<R> right;
private Either(Optional<L> left, Optional<R> right) {
this.left = left;
this.right = right;
}
/**
* Simple {@link Either} monad type.
*
* @param <L> the {@link Left} type.
* @param <R> the {@link Right} type.
*/
public abstract sealed class Either<L, R> permits Either.Left, Either.Right {
public static <L, R> Either<L, R> left(L value) {
return new Either<>(Optional.ofNullable(value), Optional.empty());
return new Left<>(value);
}
public boolean isLeft() {
return this.left.isPresent();
}
public L getLeft() {
return this.left.get();
}
public static <L, R> Either<L, R> right(R value) {
return new Either<>(Optional.empty(), Optional.ofNullable(value));
return new Right<>(value);
}
public boolean isRight() {
return this.right.isPresent();
/**
* Returns {@code true} if this is a {@link Left}, {@code false} otherwise.
*/
public abstract boolean isLeft();
/**
* Returns {@code true} if this is a {@link Right}, {@code false} otherwise.
*/
public abstract boolean isRight();
/**
* Returns the left value.
*
* @throws NoSuchElementException if is not left.
*/
public abstract L getLeft();
/**
* Returns the right value.
*
* @throws NoSuchElementException if is not right.
*/
public abstract R getRight();
public LeftProjection<L, R> left() {
return new LeftProjection<>(this);
}
public R getRight() {
return this.right.get();
public RightProjection<L, R> right() {
return new RightProjection<>(this);
}
}
public <T> T fold(final Function<L, T> fl, final Function<R, T> fr) {
return isLeft() ? fl.apply(getLeft()) : fr.apply(getRight());
}
public static final class Left<L, R> extends Either<L, R> {
private final L value;
private Left(L value) {
this.value = value;
}
/**
* @return {@code true}.
*/
@Override
public boolean isLeft() {
return true;
}
/**
* @return {@code false}.
*/
@Override
public boolean isRight() {
return false;
}
@Override
public L getLeft() {
return value;
}
@Override
public R getRight() {
throw new NoSuchElementException("This is Left");
}
}
public static final class Right<L, R> extends Either<L, R> {
private final R value;
private Right(R value) {
this.value = value;
}
/**
* @return {@code false}.
*/
@Override
public boolean isLeft() {
return false;
}
/**
* @return {@code true}.
*/
@Override
public boolean isRight() {
return true;
}
@Override
public L getLeft() {
throw new NoSuchElementException("This is Right");
}
@Override
public R getRight() {
return value;
}
}
public static class LeftProjection<L, R> {
private final Either<L, R> either;
LeftProjection(final Either<L, R> either) {
Objects.requireNonNull(either, "either can't be null");
this.either = either;
}
public boolean exists() {
return either.isLeft();
}
public L get() {
return either.getLeft();
}
public <LL> Either<LL, R> map(final Function<? super L, ? extends LL> fn) {
if (either.isLeft()) return Either.left(fn.apply(either.getLeft()));
else return Either.right(either.getRight());
}
public <LL> Either<LL, R> flatMap(final Function<? super L, Either<LL, R>> fn) {
if (either.isLeft()) return fn.apply(either.getLeft());
else return Either.right(either.getRight());
}
public Optional<L> toOptional() {
return exists() ? Optional.of(either.getLeft()) : Optional.empty();
}
}
public static class RightProjection<L, R> {
private final Either<L, R> either;
RightProjection(final Either<L, R> either) {
Objects.requireNonNull(either, "either can't be null");
this.either = either;
}
public boolean exists() {
return either.isRight();
}
public R get() {
return either.getRight();
}
public <RR> Either<L, RR> map(final Function<? super R, ? extends RR> fn) {
if (either.isRight()) return Either.right(fn.apply(either.getRight()));
else return Either.left(either.getLeft());
}
public <RR> Either<L, RR> flatMap(final Function<? super R, Either<L, RR>> fn) {
if (either.isRight()) return fn.apply(either.getRight());
else return Either.left(either.getLeft());
}
public Optional<R> toOptional() {
return exists() ? Optional.of(either.getRight()) : Optional.empty();
}
}
}

View File

@@ -4,6 +4,7 @@ import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
@@ -118,6 +119,25 @@ public final class Enums {
));
}
/**
* Convert an object to a list of a specific enum.
* @param value the object to convert to list of enum.
* @param enumClass the class of the enum to convert to.
* @return A list of the corresponding enum type
* @param <T> The type of the enum.
* @throws IllegalArgumentException If the value does not match any enum value.
*/
public static <T extends Enum<T>> List<T> fromList(Object value, Class<T> enumClass) {
return switch (value) {
case List<?> list when !list.isEmpty() && enumClass.isInstance(list.getFirst()) -> (List<T>) list;
case List<?> list when !list.isEmpty() && list.getFirst() instanceof String ->
list.stream().map(item -> Enum.valueOf(enumClass, item.toString().toUpperCase())).collect(Collectors.toList());
case Enum<?> enumValue when enumClass.isInstance(enumValue) -> List.of(enumClass.cast(enumValue));
case String stringValue -> List.of(Enum.valueOf(enumClass, stringValue.toUpperCase()));
default -> throw new IllegalArgumentException("Field requires a " + enumClass.getSimpleName() + " or List<" + enumClass.getSimpleName() + "> value");
};
}
private Enums() {
}
}

View File

@@ -55,4 +55,20 @@ public class ListUtils {
return newList;
}
public static List<?> convertToList(Object object){
if (object instanceof List<?> list) {
return list;
} else {
throw new IllegalArgumentException("%s in not an instance of List".formatted(object.getClass()));
}
}
public static List<String> convertToListString(Object object){
if (object instanceof List<?> list && (list.isEmpty() || list.getFirst() instanceof String)) {
return (List<String>) list;
} else {
throw new IllegalArgumentException("%s in not an instance of List of String".formatted(object));
}
}
}

View File

@@ -169,7 +169,7 @@ public class MapUtils {
}
/**
* Utility method nested a flattened map.
* Utility method that nests a flattened map.
*
* @param flatMap the flattened map.
* @return the nested map.
@@ -203,4 +203,44 @@ public class MapUtils {
}
return result;
}
/**
* Utility method that flatten a nested map.
* <p>
* NOTE: for simplicity, this method didn't allow to flatten maps with conflicting keys that would end up in different flatten keys,
* this could be related later if needed by flattening {k1: k2: {k3: v1}, k1: {k4: v2}} to {k1.k2.k3: v1, k1.k4: v2} is prohibited for now.
*
* @param nestedMap the nested map.
* @return the flattened map.
*
* @throws IllegalArgumentException if any entry contains a map of more than one element.
*/
public static Map<String, Object> nestedToFlattenMap(@NotNull Map<String, Object> nestedMap) {
Map<String, Object> result = new TreeMap<>();
for (Map.Entry<String, Object> entry : nestedMap.entrySet()) {
if (entry.getValue() instanceof Map<?, ?> map) {
Map.Entry<String, Object> flatten = flattenEntry(entry.getKey(), (Map<String, Object>) map);
result.put(flatten.getKey(), flatten.getValue());
} else {
result.put(entry.getKey(), entry.getValue());
}
}
return result;
}
private static Map.Entry<String, Object> flattenEntry(String key, Map<String, Object> value) {
if (value.size() > 1) {
throw new IllegalArgumentException("You cannot flatten a map with an entry that is a map of more than one element, conflicting key: " + key);
}
Map.Entry<String, Object> entry = value.entrySet().iterator().next();
String newKey = key + "." + entry.getKey();
Object newValue = entry.getValue();
if (newValue instanceof Map<?, ?> map) {
return flattenEntry(newKey, (Map<String, Object>) map);
} else {
return Map.entry(newKey, newValue);
}
}
}

View File

@@ -0,0 +1,77 @@
package io.kestra.core.validations;
import io.micronaut.context.annotation.Context;
import io.micronaut.context.env.Environment;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import java.io.Serial;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.List;
/**
* Enforces validation rules upon the application configuration.
*/
@Slf4j
@Context
public class AppConfigValidator {
private static final String KESTRA_URL_KEY = "kestra.url";
private final Environment environment;
@Inject
public AppConfigValidator(Environment environment) {
this.environment = environment;
}
@PostConstruct
void validate() {
final List<Boolean> validationResults = List.of(
isKestraUrlValid()
);
if (validationResults.contains(false)) {
throw new AppConfigException("Invalid configuration");
}
}
private boolean isKestraUrlValid() {
if (!environment.containsProperty(KESTRA_URL_KEY)) {
return true;
}
final String rawUrl = environment.getProperty(KESTRA_URL_KEY, String.class).orElseThrow();
final URL url;
try {
url = URI.create(rawUrl).toURL();
} catch (IllegalArgumentException | MalformedURLException e) {
log.error(
"Value of the '{}' configuration property must be a valid URL - e.g. https://your.company.com",
KESTRA_URL_KEY
);
return false;
}
if (!List.of("http", "https").contains(url.getProtocol())) {
log.error(
"Value of the '{}' configuration property must contain either HTTP or HTTPS scheme - e.g. https://your.company.com",
KESTRA_URL_KEY
);
return false;
}
return true;
}
public static class AppConfigException extends RuntimeException {
@Serial
private static final long serialVersionUID = 1L;
public AppConfigException(String errorMessage) {
super(errorMessage);
}
}
}

View File

@@ -54,9 +54,10 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
violations.add("Namespace '" + value.getNamespace() + "' does not exist but is required to exist before a flow can be created in it.");
}
List<Task> allTasks = value.allTasksWithChilds();
// tasks unique id
List<String> taskIds = value.allTasksWithChilds()
.stream()
List<String> taskIds = allTasks.stream()
.map(Task::getId)
.toList();
@@ -72,8 +73,8 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
violations.add("Duplicate trigger id with name [" + String.join(", ", duplicateIds) + "]");
}
value.allTasksWithChilds()
.stream().filter(task -> task instanceof ExecutableTask<?> executableTask
allTasks.stream()
.filter(task -> task instanceof ExecutableTask<?> executableTask
&& value.getId().equals(executableTask.subflowId().flowId())
&& value.getNamespace().equals(executableTask.subflowId().namespace()))
.forEach(task -> violations.add("Recursive call to flow [" + value.getNamespace() + "." + value.getId() + "]"));
@@ -102,7 +103,7 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
.map(input -> Pattern.compile("\\{\\{\\s*inputs." + input.getId() + "\\s*\\}\\}"))
.collect(Collectors.toList());
List<String> invalidTasks = value.allTasks()
List<String> invalidTasks = allTasks.stream()
.filter(task -> checkObjectFieldsWithPatterns(task, inputsWithMinusPatterns))
.map(task -> task.getId())
.collect(Collectors.toList());
@@ -112,12 +113,12 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
" [" + String.join(", ", invalidTasks) + "]");
}
List<Pattern> outputsWithMinusPattern = value.allTasks()
List<Pattern> outputsWithMinusPattern = allTasks.stream()
.filter(output -> Optional.ofNullable(output.getId()).orElse("").contains("-"))
.map(output -> Pattern.compile("\\{\\{\\s*outputs\\." + output.getId() + "\\.[^}]+\\s*\\}\\}"))
.collect(Collectors.toList());
invalidTasks = value.allTasks()
invalidTasks = allTasks.stream()
.filter(task -> checkObjectFieldsWithPatterns(task, outputsWithMinusPattern))
.map(task -> task.getId())
.collect(Collectors.toList());

View File

@@ -19,7 +19,6 @@ import lombok.experimental.SuperBuilder;
@NoArgsConstructor
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
@EqualsAndHashCode
//@TriggersDataFilterValidation
@Schema(
title = "Display Execution data in a dashboard chart.",
description = "Execution data can be displayed in charts broken out by Namespace and filtered by State, for example."

View File

@@ -111,8 +111,9 @@ public class Labels extends Task implements ExecutionUpdatableTask {
})
).collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue
));
Map.Entry::getValue,
(first, second) -> second)
);
} else if (labels instanceof Map<?, ?> map) {
labelsAsMap = map.entrySet()
.stream()

View File

@@ -208,48 +208,50 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
return Optional.empty();
}
boolean isOutputsAllowed = runContext
.<Boolean>pluginConfiguration(PLUGIN_FLOW_OUTPUTS_ENABLED)
.orElse(true);
final Output.OutputBuilder builder = Output.builder()
.executionId(execution.getId())
.state(execution.getState().getCurrent());
final Map<String, Object> subflowOutputs = Optional
.ofNullable(flow.getOutputs())
.map(outputs -> outputs
.stream()
.collect(Collectors.toMap(
io.kestra.core.models.flows.Output::getId,
io.kestra.core.models.flows.Output::getValue)
)
)
.orElseGet(() -> isOutputsAllowed ? this.getOutputs() : null);
VariablesService variablesService = ((DefaultRunContext) runContext).getApplicationContext().getBean(VariablesService.class);
if (subflowOutputs != null) {
try {
Map<String, Object> outputs = runContext.render(subflowOutputs);
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
if (flow.getOutputs() != null && flowInputOutput != null) {
outputs = flowInputOutput.typedOutputs(flow, execution, outputs);
}
builder.outputs(outputs);
} catch (Exception e) {
runContext.logger().warn("Failed to extract outputs with the error: '{}'", e.getLocalizedMessage(), e);
var state = State.Type.fail(this);
Variables variables = variablesService.of(StorageContext.forTask(taskRun), builder.build());
taskRun = taskRun
.withState(state)
.withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(state)).build()))
.withOutputs(variables);
if (this.wait) { // we only compute outputs if we wait for the subflow
boolean isOutputsAllowed = runContext
.<Boolean>pluginConfiguration(PLUGIN_FLOW_OUTPUTS_ENABLED)
.orElse(true);
return Optional.of(SubflowExecutionResult.builder()
.executionId(execution.getId())
.state(State.Type.FAILED)
.parentTaskRun(taskRun)
.build());
final Map<String, Object> subflowOutputs = Optional
.ofNullable(flow.getOutputs())
.map(outputs -> outputs
.stream()
.collect(Collectors.toMap(
io.kestra.core.models.flows.Output::getId,
io.kestra.core.models.flows.Output::getValue)
)
)
.orElseGet(() -> isOutputsAllowed ? this.getOutputs() : null);
if (subflowOutputs != null) {
try {
Map<String, Object> outputs = runContext.render(subflowOutputs);
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
if (flow.getOutputs() != null && flowInputOutput != null) {
outputs = flowInputOutput.typedOutputs(flow, execution, outputs);
}
builder.outputs(outputs);
} catch (Exception e) {
runContext.logger().warn("Failed to extract outputs with the error: '{}'", e.getLocalizedMessage(), e);
var state = State.Type.fail(this);
Variables variables = variablesService.of(StorageContext.forTask(taskRun), builder.build());
taskRun = taskRun
.withState(state)
.withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(state)).build()))
.withOutputs(variables);
return Optional.of(SubflowExecutionResult.builder()
.executionId(execution.getId())
.state(State.Type.FAILED)
.parentTaskRun(taskRun)
.build());
}
}
}

View File

@@ -9,11 +9,11 @@ import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.services.FlowService;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import org.codehaus.commons.nullanalysis.NotNull;
import java.util.NoSuchElementException;

View File

@@ -0,0 +1,11 @@
<svg width="512" height="512" viewBox="0 0 512 512" fill="currentColor" xmlns="http://www.w3.org/2000/svg">
<g clip-path="url(#clip0_1765_9330)">
<path d="M244.592 215.915C251.569 208.938 262.881 208.938 269.858 215.915L298.537 244.595C305.514 251.572 305.514 262.883 298.537 269.86L269.858 298.54C262.881 305.517 251.569 305.517 244.592 298.54L215.913 269.86C208.936 262.883 208.936 251.572 215.913 244.595L244.592 215.915Z" />
<path d="M376.685 215.687C383.537 208.835 394.646 208.835 401.498 215.687L430.63 244.818C437.482 251.67 437.482 262.78 430.63 269.632L401.498 298.763C394.646 305.615 383.537 305.615 376.685 298.763L347.553 269.632C340.701 262.78 340.701 251.67 347.553 244.818L376.685 215.687Z" />
<path d="M244.818 83.8243C251.671 76.9722 262.78 76.9722 269.632 83.8243L298.763 112.956C305.616 119.808 305.616 130.917 298.763 137.769L269.632 166.901C262.78 173.753 251.671 173.753 244.818 166.901L215.687 137.769C208.835 130.917 208.835 119.808 215.687 112.956L244.818 83.8243Z" />
<path d="M232.611 178.663C239.588 185.64 239.588 196.951 232.611 203.928L203.931 232.608C196.955 239.585 185.643 239.585 178.666 232.608L149.986 203.928C143.01 196.952 143.01 185.64 149.986 178.663L178.666 149.983C185.643 143.006 196.955 143.006 203.931 149.983L232.611 178.663Z" />
<path d="M166.901 244.818C173.753 251.67 173.753 262.78 166.901 269.632L137.77 298.763C130.918 305.615 119.808 305.615 112.956 298.763L83.8246 269.632C76.9725 262.78 76.9725 251.67 83.8246 244.818L112.956 215.687C119.808 208.835 130.918 208.835 137.77 215.687L166.901 244.818Z" />
<path d="M364.472 178.663C371.449 185.64 371.449 196.951 364.472 203.928L335.793 232.608C328.816 239.585 317.504 239.585 310.527 232.608L281.848 203.928C274.871 196.952 274.871 185.64 281.848 178.663L310.527 149.983C317.504 143.006 328.816 143.006 335.793 149.983L364.472 178.663Z" />
<path d="M285.45 367.015C301.037 382.602 301.037 407.873 285.45 423.46C269.863 439.047 244.591 439.047 229.004 423.46C213.417 407.873 213.417 382.602 229.004 367.015C244.591 351.428 269.863 351.428 285.45 367.015Z" />
</g>
</svg>

After

Width:  |  Height:  |  Size: 2.1 KiB

View File

@@ -0,0 +1,11 @@
<svg width="512" height="512" viewBox="0 0 512 512" fill="currentColor" xmlns="http://www.w3.org/2000/svg">
<g clip-path="url(#clip0_1765_9330)">
<path d="M244.592 215.915C251.569 208.938 262.881 208.938 269.858 215.915L298.537 244.595C305.514 251.572 305.514 262.883 298.537 269.86L269.858 298.54C262.881 305.517 251.569 305.517 244.592 298.54L215.913 269.86C208.936 262.883 208.936 251.572 215.913 244.595L244.592 215.915Z" />
<path d="M376.685 215.687C383.537 208.835 394.646 208.835 401.498 215.687L430.63 244.818C437.482 251.67 437.482 262.78 430.63 269.632L401.498 298.763C394.646 305.615 383.537 305.615 376.685 298.763L347.553 269.632C340.701 262.78 340.701 251.67 347.553 244.818L376.685 215.687Z" />
<path d="M244.818 83.8243C251.671 76.9722 262.78 76.9722 269.632 83.8243L298.763 112.956C305.616 119.808 305.616 130.917 298.763 137.769L269.632 166.901C262.78 173.753 251.671 173.753 244.818 166.901L215.687 137.769C208.835 130.917 208.835 119.808 215.687 112.956L244.818 83.8243Z" />
<path d="M232.611 178.663C239.588 185.64 239.588 196.951 232.611 203.928L203.931 232.608C196.955 239.585 185.643 239.585 178.666 232.608L149.986 203.928C143.01 196.952 143.01 185.64 149.986 178.663L178.666 149.983C185.643 143.006 196.955 143.006 203.931 149.983L232.611 178.663Z" />
<path d="M166.901 244.818C173.753 251.67 173.753 262.78 166.901 269.632L137.77 298.763C130.918 305.615 119.808 305.615 112.956 298.763L83.8246 269.632C76.9725 262.78 76.9725 251.67 83.8246 244.818L112.956 215.687C119.808 208.835 130.918 208.835 137.77 215.687L166.901 244.818Z" />
<path d="M364.472 178.663C371.449 185.64 371.449 196.951 364.472 203.928L335.793 232.608C328.816 239.585 317.504 239.585 310.527 232.608L281.848 203.928C274.871 196.952 274.871 185.64 281.848 178.663L310.527 149.983C317.504 143.006 328.816 143.006 335.793 149.983L364.472 178.663Z" />
<path d="M285.45 367.015C301.037 382.602 301.037 407.873 285.45 423.46C269.863 439.047 244.591 439.047 229.004 423.46C213.417 407.873 213.417 382.602 229.004 367.015C244.591 351.428 269.863 351.428 285.45 367.015Z" />
</g>
</svg>

After

Width:  |  Height:  |  Size: 2.1 KiB

View File

@@ -112,7 +112,7 @@ class JsonSchemaGeneratorTest {
var requiredWithDefault = definitions.get("io.kestra.core.docs.JsonSchemaGeneratorTest-RequiredWithDefault");
assertThat(requiredWithDefault, is(notNullValue()));
assertThat((List<String>) requiredWithDefault.get("required"), not(contains("requiredWithDefault")));
assertThat((List<String>) requiredWithDefault.get("required"), not(containsInAnyOrder("requiredWithDefault", "anotherRequiredWithDefault")));
var properties = (Map<String, Map<String, Object>>) flow.get("properties");
var listeners = properties.get("listeners");
@@ -253,7 +253,7 @@ class JsonSchemaGeneratorTest {
void requiredAreRemovedIfThereIsADefault() {
Map<String, Object> generate = jsonSchemaGenerator.properties(Task.class, RequiredWithDefault.class);
assertThat(generate, is(not(nullValue())));
assertThat((List<String>) generate.get("required"), not(containsInAnyOrder("requiredWithDefault")));
assertThat((List<String>) generate.get("required"), not(containsInAnyOrder("requiredWithDefault", "anotherRequiredWithDefault")));
assertThat((List<String>) generate.get("required"), containsInAnyOrder("requiredWithNoDefault"));
}
@@ -466,6 +466,11 @@ class JsonSchemaGeneratorTest {
@Builder.Default
private Property<TaskWithEnum.TestClass> requiredWithDefault = Property.ofValue(TaskWithEnum.TestClass.builder().testProperty("test").build());
@PluginProperty
@NotNull
@Builder.Default
private Property<TaskWithEnum.TestClass> anotherRequiredWithDefault = Property.ofValue(TaskWithEnum.TestClass.builder().testProperty("test2").build());
@PluginProperty
@NotNull
private Property<TaskWithEnum.TestClass> requiredWithNoDefault;

View File

@@ -1,11 +1,12 @@
package io.kestra.core.models;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class LabelTest {
@Test
@@ -15,9 +16,8 @@ class LabelTest {
new Label(Label.CORRELATION_ID, "id"))
);
Assertions.assertEquals(
Map.of("system", Map.of("username", "test", "correlationId", "id")),
result
assertThat(result).isEqualTo(
Map.of("system", Map.of("username", "test", "correlationId", "id"))
);
}
@@ -29,9 +29,48 @@ class LabelTest {
new Label(Label.CORRELATION_ID, "id"))
);
Assertions.assertEquals(
Map.of("system", Map.of("username", "test1", "correlationId", "id")),
result
assertThat(result).isEqualTo(
Map.of("system", Map.of("username", "test2", "correlationId", "id"))
);
}
@Test
void shouldGetMapGivenDistinctLabels() {
Map<String, String> result = Label.toMap(List.of(
new Label(Label.USERNAME, "test"),
new Label(Label.CORRELATION_ID, "id"))
);
assertThat(result).isEqualTo(
Map.of(Label.USERNAME, "test", Label.CORRELATION_ID, "id")
);
}
@Test
void shouldGetMapGivenDuplicateLabels() {
Map<String, String> result = Label.toMap(List.of(
new Label(Label.USERNAME, "test1"),
new Label(Label.USERNAME, "test2"),
new Label(Label.CORRELATION_ID, "id"))
);
assertThat(result).isEqualTo(
Map.of(Label.USERNAME, "test2", Label.CORRELATION_ID, "id")
);
}
@Test
void shouldDuplicateLabelsWithKeyOrderKept() {
List<Label> result = Label.deduplicate(List.of(
new Label(Label.USERNAME, "test1"),
new Label(Label.USERNAME, "test2"),
new Label(Label.CORRELATION_ID, "id"),
new Label(Label.USERNAME, "test3"))
);
assertThat(result).containsExactly(
new Label(Label.USERNAME, "test3"),
new Label(Label.CORRELATION_ID, "id")
);
}
}

View File

@@ -94,6 +94,14 @@ public class QueryFilterTest {
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.ENDS_WITH).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.CONTAINS).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.EQUALS).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.NOT_EQUALS).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.IN).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.NOT_IN).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.STARTS_WITH).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.ENDS_WITH).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.CONTAINS).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.EQUALS).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.NOT_EQUALS).build(), Resource.EXECUTION),
@@ -204,6 +212,13 @@ public class QueryFilterTest {
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.REGEX).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.PREFIX).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.GREATER_THAN).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.LESS_THAN).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.GREATER_THAN_OR_EQUAL_TO).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.LESS_THAN_OR_EQUAL_TO).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.REGEX).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.PREFIX).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.GREATER_THAN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.LESS_THAN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.GREATER_THAN_OR_EQUAL_TO).build(), Resource.EXECUTION),

View File

@@ -2,6 +2,7 @@ package io.kestra.core.models.executions;
import io.kestra.core.models.Label;
import io.kestra.core.utils.IdUtils;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import io.kestra.core.models.flows.State;
@@ -157,7 +158,58 @@ class ExecutionTest {
.labels(List.of(new Label("test", "test-value")))
.build();
assertThat(execution.getLabels().size()).isEqualTo(1);
assertThat(execution.getLabels().getFirst()).isEqualTo(new Label("test", "test-value"));
assertThat(execution.getLabels()).containsExactly(new Label("test", "test-value"));
}
@Test
void labelsGetDeduplicated() {
final List<Label> duplicatedLabels = List.of(
new Label("test", "value1"),
new Label("test", "value2")
);
final Execution executionWithLabels = Execution.builder()
.build()
.withLabels(duplicatedLabels);
assertThat(executionWithLabels.getLabels()).containsExactly(new Label("test", "value2"));
final Execution executionBuilder = Execution.builder()
.labels(duplicatedLabels)
.build();
assertThat(executionBuilder.getLabels()).containsExactly(new Label("test", "value2"));
}
@Test
@Disabled("Solve label deduplication on instantization")
void labelsGetDeduplicatedOnNewInstance() {
final List<Label> duplicatedLabels = List.of(
new Label("test", "value1"),
new Label("test", "value2")
);
final Execution executionNew = new Execution(
"foo",
"id",
"namespace",
"flowId",
1,
Collections.emptyList(),
Map.of(),
Map.of(),
duplicatedLabels,
Map.of(),
State.of(State.Type.SUCCESS, Collections.emptyList()),
"parentId",
"originalId",
null,
false,
null,
null,
null,
null,
null,
null
);
assertThat(executionNew.getLabels()).containsExactly(new Label("test", "value2"));
}
}

View File

@@ -44,6 +44,7 @@ import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static io.kestra.core.models.flows.FlowScope.USER;
@@ -198,6 +199,7 @@ public abstract class AbstractExecutionRepositoryTest {
return Stream.of(
QueryFilter.builder().field(Field.TIME_RANGE).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.TRIGGER_ID).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.EXECUTION_ID).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.WORKER_ID).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.EXISTING_ONLY).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.MIN_LEVEL).value(Level.DEBUG).operation(Op.EQUALS).build()
@@ -740,4 +742,16 @@ public abstract class AbstractExecutionRepositoryTest {
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
assertThat(executions.size()).isEqualTo(0L);
}
@Test
protected void shouldReturnLastExecutionsWhenInputsAreNull() {
inject();
List<Execution> lastExecutions = executionRepository.lastExecutions(MAIN_TENANT, null);
assertThat(lastExecutions).isNotEmpty();
Set<String> flowIds = lastExecutions.stream().map(Execution::getFlowId).collect(Collectors.toSet());
assertThat(flowIds.size()).isEqualTo(lastExecutions.size());
}
}

View File

@@ -160,6 +160,7 @@ public abstract class AbstractFlowRepositoryTest {
QueryFilter.builder().field(Field.TIME_RANGE).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).value("executionTriggerId").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.TRIGGER_ID).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.EXECUTION_ID).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.CHILD_FILTER).value(ChildFilter.CHILD).operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.WORKER_ID).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.EXISTING_ONLY).value("test").operation(Op.EQUALS).build(),

View File

@@ -34,6 +34,7 @@ import static io.kestra.core.models.flows.FlowScope.SYSTEM;
import static io.kestra.core.models.flows.FlowScope.USER;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatReflectiveOperationException;
import static org.junit.jupiter.api.Assertions.assertThrows;
@KestraTest
@@ -42,11 +43,15 @@ public abstract class AbstractLogRepositoryTest {
protected LogRepositoryInterface logRepository;
protected static LogEntry.LogEntryBuilder logEntry(Level level) {
return logEntry(level, IdUtils.create());
}
protected static LogEntry.LogEntryBuilder logEntry(Level level, String executionId) {
return LogEntry.builder()
.flowId("flowId")
.namespace("io.kestra.unittest")
.taskId("taskId")
.executionId(IdUtils.create())
.executionId(executionId)
.taskRunId(IdUtils.create())
.attemptNumber(0)
.timestamp(Instant.now())
@@ -60,13 +65,36 @@ public abstract class AbstractLogRepositoryTest {
@ParameterizedTest
@MethodSource("filterCombinations")
void should_find_all(QueryFilter filter){
logRepository.save(logEntry(Level.INFO).build());
logRepository.save(logEntry(Level.INFO, "executionId").build());
ArrayListTotal<LogEntry> entries = logRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter));
assertThat(entries).hasSize(1);
}
@ParameterizedTest
@MethodSource("filterCombinations")
void should_find_async(QueryFilter filter){
logRepository.save(logEntry(Level.INFO, "executionId").build());
Flux<LogEntry> find = logRepository.findAsync(MAIN_TENANT, List.of(filter));
List<LogEntry> logEntries = find.collectList().block();
assertThat(logEntries).hasSize(1);
}
@ParameterizedTest
@MethodSource("filterCombinations")
void should_delete_with_filter(QueryFilter filter){
logRepository.save(logEntry(Level.INFO, "executionId").build());
logRepository.deleteByFilters(MAIN_TENANT, List.of(filter));
assertThat(logRepository.findAllAsync(MAIN_TENANT).collectList().block()).isEmpty();
}
static Stream<QueryFilter> filterCombinations() {
return Stream.of(
QueryFilter.builder().field(Field.QUERY).value("flowId").operation(Op.EQUALS).build(),
@@ -105,6 +133,13 @@ public abstract class AbstractLogRepositoryTest {
QueryFilter.builder().field(Field.TRIGGER_ID).value("Id").operation(Op.ENDS_WITH).build(),
QueryFilter.builder().field(Field.TRIGGER_ID).value(List.of("triggerId")).operation(Op.IN).build(),
QueryFilter.builder().field(Field.TRIGGER_ID).value(List.of("anotherId")).operation(Op.NOT_IN).build(),
QueryFilter.builder().field(Field.EXECUTION_ID).value("executionId").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.EXECUTION_ID).value("anotherId").operation(Op.NOT_EQUALS).build(),
QueryFilter.builder().field(Field.EXECUTION_ID).value("xecution").operation(Op.CONTAINS).build(),
QueryFilter.builder().field(Field.EXECUTION_ID).value("execution").operation(Op.STARTS_WITH).build(),
QueryFilter.builder().field(Field.EXECUTION_ID).value("Id").operation(Op.ENDS_WITH).build(),
QueryFilter.builder().field(Field.EXECUTION_ID).value(List.of("executionId")).operation(Op.IN).build(),
QueryFilter.builder().field(Field.EXECUTION_ID).value(List.of("anotherId")).operation(Op.NOT_IN).build(),
QueryFilter.builder().field(Field.MIN_LEVEL).value(Level.DEBUG).operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.MIN_LEVEL).value(Level.ERROR).operation(Op.NOT_EQUALS).build()
);
@@ -284,32 +319,6 @@ public abstract class AbstractLogRepositoryTest {
assertThat(find.size()).isZero();
}
@Test
void findAsync() {
logRepository.save(logEntry(Level.INFO).build());
logRepository.save(logEntry(Level.ERROR).build());
logRepository.save(logEntry(Level.WARN).build());
logRepository.save(logEntry(Level.INFO).executionKind(ExecutionKind.TEST).build()); // should not be visible here
ZonedDateTime startDate = ZonedDateTime.now().minusSeconds(1);
Flux<LogEntry> find = logRepository.findAsync(MAIN_TENANT, "io.kestra.unittest", Level.INFO, startDate);
List<LogEntry> logEntries = find.collectList().block();
assertThat(logEntries).hasSize(3);
find = logRepository.findAsync(MAIN_TENANT, null, Level.ERROR, startDate);
logEntries = find.collectList().block();
assertThat(logEntries).hasSize(1);
find = logRepository.findAsync(MAIN_TENANT, "io.kestra.unused", Level.INFO, startDate);
logEntries = find.collectList().block();
assertThat(logEntries).hasSize(0);
find = logRepository.findAsync(MAIN_TENANT, null, Level.INFO, startDate.plusSeconds(2));
logEntries = find.collectList().block();
assertThat(logEntries).hasSize(0);
}
@Test
void findAllAsync() {
logRepository.save(logEntry(Level.INFO).build());

View File

@@ -101,6 +101,7 @@ public abstract class AbstractTriggerRepositoryTest {
QueryFilter.builder().field(Field.STATE).value(State.Type.RUNNING).operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.TIME_RANGE).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.EXECUTION_ID).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.CHILD_FILTER).value(ChildFilter.CHILD).operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.EXISTING_ONLY).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.MIN_LEVEL).value(Level.DEBUG).operation(Op.EQUALS).build()

View File

@@ -417,6 +417,18 @@ public abstract class AbstractRunnerTest {
flowConcurrencyCaseTest.flowConcurrencyCancelPause();
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-for-each-item.yaml", "flows/valids/flow-concurrency-queue.yml"})
protected void flowConcurrencyWithForEachItem() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem();
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-queue-fail.yml"})
void concurrencyQueueRestarted() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueueRestarted();
}
@Test
@ExecuteFlow("flows/valids/executable-fail.yml")
void badExecutable(Execution execution) {

View File

@@ -8,18 +8,29 @@ import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.StringUtils;
import reactor.core.publisher.Flux;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.time.Duration;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
@@ -28,7 +39,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Singleton
public class FlowConcurrencyCaseTest {
@Inject
private RunnerUtils runnerUtils;
private StorageInterface storageInterface;
@Inject
protected RunnerUtils runnerUtils;
@Inject
private FlowInputOutput flowIO;
@Inject
private FlowRepositoryInterface flowRepository;
@@ -37,6 +54,9 @@ public class FlowConcurrencyCaseTest {
@Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue;
@Inject
private ExecutionService executionService;
public void flowConcurrencyCancel() throws TimeoutException, QueueException, InterruptedException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
Execution execution2 = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel");
@@ -237,4 +257,109 @@ public class FlowConcurrencyCaseTest {
assertThat(secondExecutionResult.get().getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
assertThat(secondExecutionResult.get().getState().getHistories().get(1).getState()).isEqualTo(State.Type.CANCELLED);
}
public void flowConcurrencyWithForEachItem() throws TimeoutException, QueueException, InterruptedException, URISyntaxException, IOException {
URI file = storageUpload();
Map<String, Object> inputs = Map.of("file", file.toString(), "batch", 4);
Execution forEachItem = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-for-each-item", null,
(flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs), Duration.ofSeconds(5));
assertThat(forEachItem.getState().getCurrent()).isEqualTo(Type.RUNNING);
Set<String> executionIds = new HashSet<>();
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
if ("flow-concurrency-queue".equals(e.getLeft().getFlowId()) && e.getLeft().getState().isRunning()) {
executionIds.add(e.getLeft().getId());
}
});
// wait a little to be sure there are not too many executions started
Thread.sleep(500);
assertThat(executionIds).hasSize(1);
receive.blockLast();
Execution terminated = runnerUtils.awaitExecution(e -> e.getId().equals(forEachItem.getId()) && e.getState().isTerminated(), () -> {}, Duration.ofSeconds(10));
assertThat(terminated.getState().getCurrent()).isEqualTo(Type.SUCCESS);
}
public void flowConcurrencyQueueRestarted() throws Exception {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-fail", null, null, Duration.ofSeconds(30));
Flow flow = flowRepository
.findById(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-fail", Optional.empty())
.orElseThrow();
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
executionQueue.emit(execution2);
assertThat(execution1.getState().isRunning()).isTrue();
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CREATED);
var executionResult1 = new AtomicReference<Execution>();
var executionResult2 = new AtomicReference<Execution>();
CountDownLatch latch1 = new CountDownLatch(2);
AtomicReference<Execution> failedExecution = new AtomicReference<>();
CountDownLatch latch2 = new CountDownLatch(1);
CountDownLatch latch3 = new CountDownLatch(1);
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
if (e.getLeft().getId().equals(execution1.getId())) {
executionResult1.set(e.getLeft());
if (e.getLeft().getState().getCurrent() == Type.FAILED) {
failedExecution.set(e.getLeft());
latch1.countDown();
}
}
if (e.getLeft().getId().equals(execution2.getId())) {
executionResult2.set(e.getLeft());
if (e.getLeft().getState().getCurrent() == State.Type.RUNNING) {
latch2.countDown();
}
if (e.getLeft().getState().getCurrent() == Type.FAILED) {
latch3.countDown();
}
}
});
assertTrue(latch2.await(1, TimeUnit.MINUTES));
assertThat(failedExecution.get()).isNotNull();
// here the first fail and the second is now running.
// we restart the first one, it should be queued then fail again.
Execution restarted = executionService.restart(failedExecution.get(), null);
executionQueue.emit(restarted);
assertTrue(latch3.await(1, TimeUnit.MINUTES));
assertTrue(latch1.await(1, TimeUnit.MINUTES));
receive.blockLast();
assertThat(executionResult1.get().getState().getCurrent()).isEqualTo(Type.FAILED);
// it should have been queued after restarted
assertThat(executionResult1.get().getState().getHistories().stream().anyMatch(history -> history.getState() == Type.RESTARTED)).isTrue();
assertThat(executionResult1.get().getState().getHistories().stream().anyMatch(history -> history.getState() == Type.QUEUED)).isTrue();
assertThat(executionResult2.get().getState().getCurrent()).isEqualTo(Type.FAILED);
assertThat(executionResult2.get().getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
assertThat(executionResult2.get().getState().getHistories().get(1).getState()).isEqualTo(State.Type.QUEUED);
assertThat(executionResult2.get().getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
}
private URI storageUpload() throws URISyntaxException, IOException {
File tempFile = File.createTempFile("file", ".txt");
Files.write(tempFile.toPath(), content());
return storageInterface.put(
MAIN_TENANT,
null,
new URI("/file/storage/file.txt"),
new FileInputStream(tempFile)
);
}
private List<String> content() {
return IntStream
.range(0, 7)
.mapToObj(value -> StringUtils.leftPad(value + "", 20))
.toList();
}
}

View File

@@ -273,7 +273,7 @@ public class RestartCaseTest {
// wait
Execution finishedRestartedExecution = runnerUtils.awaitExecution(
execution -> execution.getState().getCurrent() == State.Type.SUCCESS && execution.getId().equals(firstExecution.getId()),
execution -> executionService.isTerminated(flow, execution) && execution.getState().isSuccess() && execution.getId().equals(firstExecution.getId()),
throwRunnable(() -> {
Execution restartedExec = executionService.restart(firstExecution, null);
assertThat(restartedExec).isNotNull();

View File

@@ -9,6 +9,7 @@ import io.kestra.core.utils.TestsUtils;
import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.event.Level;
@@ -109,7 +110,8 @@ class RunContextLoggerTest {
logger.info("test myawesomepassmyawesomepass myawesomepass myawesomepassmyawesomepass");
logger.warn("test {}", URI.create("http://it-s.secret"));
matchingLog = TestsUtils.awaitLogs(logs, 3);
// the 3 logs will create 4 log entries as exceptions stacktraces are logged separately at the TRACE level
matchingLog = TestsUtils.awaitLogs(logs, 4);
receive.blockLast();
assertThat(matchingLog.stream().filter(logEntry -> logEntry.getLevel().equals(Level.DEBUG)).findFirst().orElseThrow().getMessage()).isEqualTo("test john@****** test");
assertThat(matchingLog.stream().filter(logEntry -> logEntry.getLevel().equals(Level.TRACE)).findFirst().orElseThrow().getMessage()).contains("exception from doe.com");

View File

@@ -5,6 +5,7 @@ import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.queues.QueueException;
@@ -18,6 +19,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -77,8 +79,12 @@ public class TaskCacheTest {
@Plugin
public static class CounterTask extends Task implements RunnableTask<CounterTask.Output> {
private String workingDir;
@Override
public Output run(RunContext runContext) throws Exception {
Map<String, Object> variables = Map.of("workingDir", runContext.workingDir().path().toString());
runContext.render(this.workingDir, variables);
return Output.builder()
.counter(COUNTER.incrementAndGet())
.build();

View File

@@ -71,6 +71,8 @@ class DateFilterTest {
{{ "2013-09-08T17:19:12+02:00" | date(timeZone="Europe/Paris") }}
{{ "2013-09-08T17:19:12" | date(timeZone="Europe/Paris") }}
{{ "2013-09-08" | date(timeZone="Europe/Paris") }}
{{ "08.09.2023" | date("yyyy-MM-dd", existingFormat="dd.MM.yyyy") }}
{{ "08092023" | date("yyyy-MM-dd", existingFormat="ddMMyyyy") }}
""",
Map.of()
);
@@ -80,6 +82,8 @@ class DateFilterTest {
2013-09-08T17:19:12.000000+02:00
2013-09-08T17:19:12.000000+02:00
2013-09-08T00:00:00.000000+02:00
2023-09-08
2023-09-08
""");
}
@@ -171,7 +175,9 @@ class DateFilterTest {
render = variableRenderer.render("{{ now(format=\"sql_milli\") }}", ImmutableMap.of());
assertThat(render).isEqualTo(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));
// a millisecond can pass between the render and now so we can't assert on a precise to millisecond date
assertThat(render).startsWith(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
assertThat(render).hasSize(23);
}
@Test
@@ -185,4 +191,41 @@ class DateFilterTest {
assertThat(render).isEqualTo("2013-09-07T17:19:12.123456+02:00");
}
@Test
void timestampDateFormat() throws IllegalVariableEvaluationException {
String render =
variableRenderer.render(
"""
{{ 1378653552 | date(format="iso_sec", timeZone="Europe/Paris") }}
{{ 1378653552123 | date(format="iso_milli", timeZone="Europe/Paris") }}
{{ 1378653552123 | date(timeZone="Europe/Paris") }}
{{ 1378653552123 | date(format="iso_zoned_date_time", timeZone="Europe/Paris") }}
{{ 1378653552123456000 | date(format="iso", timeZone="Europe/Paris") }}
{{ 1378653552000123456 | date(format="iso", timeZone="Europe/Paris") }}
{{ 1378653552 | date(format="sql_sec", timeZone="Europe/Paris") }}
{{ 1378653552123 | date(format="sql_milli", timeZone="Europe/Paris") }}
{{ 1378653552123456000 | date(format="sql", timeZone="Europe/Paris") }}
{{ 1378653552000123456 | date(format="sql", timeZone="Europe/Paris") }}
{{ 1378653552123 | date(format="sql_milli", timeZone="UTC") }}
{{ "1378653552123" | number | date(format="sql_milli", timeZone="UTC") }}
""",
Map.of());
assertThat(render).isEqualTo("""
2013-09-08T17:19:12+02:00
2013-09-08T17:19:12.123+02:00
2013-09-08T17:19:12.123000+02:00
2013-09-08T17:19:12.123+02:00[Europe/Paris]
2013-09-08T17:19:12.123456+02:00
2013-09-08T17:19:12.123456+02:00
2013-09-08 17:19:12
2013-09-08 17:19:12.123
2013-09-08 17:19:12.123456
2013-09-08 17:19:12.123456
2013-09-08 15:19:12.123
2013-09-08 15:19:12.123
""");
}
}

View File

@@ -372,4 +372,44 @@ class FlowServiceTest {
assertThat(exceptions.size()).isZero();
}
@Test
void shouldReturnValidationForRunnablePropsOnFlowable() {
// Given
String source = """
id: dolphin_164914
namespace: company.team
tasks:
- id: for
type: io.kestra.plugin.core.flow.ForEach
values: [1, 2, 3]
workerGroup:
key: toto
timeout: PT10S
taskCache:
enabled: true
tasks:
- id: hello
type: io.kestra.plugin.core.log.Log
message: Hello World! 🚀
workerGroup:
key: toto
timeout: PT10S
taskCache:
enabled: true
""";
// When
List<ValidateConstraintViolation> results = flowService.validate("my-tenant", source);
// Then
assertThat(results).hasSize(1);
assertThat(results.getFirst().getWarnings()).hasSize(3);
assertThat(results.getFirst().getWarnings()).containsExactlyInAnyOrder(
"The task 'for' cannot use the 'timeout' property as it's only relevant for runnable tasks.",
"The task 'for' cannot use the 'taskCache' property as it's only relevant for runnable tasks.",
"The task 'for' cannot use the 'workerGroup' property as it's only relevant for runnable tasks."
);
}
}

View File

@@ -0,0 +1,462 @@
package io.kestra.core.utils;
import org.junit.jupiter.api.Test;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.function.Function;
import static org.assertj.core.api.Assertions.*;
class EitherTest {
@Test
void shouldCreateLeftInstance() {
// Given
String leftValue = "error";
// When
Either<String, Integer> either = Either.left(leftValue);
// Then
assertThat(either).isInstanceOf(Either.Left.class);
assertThat(either.isLeft()).isTrue();
assertThat(either.isRight()).isFalse();
assertThat(either.getLeft()).isEqualTo(leftValue);
}
@Test
void shouldCreateRightInstance() {
// Given
Integer rightValue = 42;
// When
Either<String, Integer> either = Either.right(rightValue);
// Then
assertThat(either).isInstanceOf(Either.Right.class);
assertThat(either.isRight()).isTrue();
assertThat(either.isLeft()).isFalse();
assertThat(either.getRight()).isEqualTo(rightValue);
}
@Test
void shouldCreateLeftWithNullValue() {
// When
Either<String, Integer> either = Either.left(null);
// Then
assertThat(either.isLeft()).isTrue();
assertThat(either.getLeft()).isNull();
}
@Test
void shouldCreateRightWithNullValue() {
// When
Either<String, Integer> either = Either.right(null);
// Then
assertThat(either.isRight()).isTrue();
assertThat(either.getRight()).isNull();
}
@Test
void leftShouldReturnCorrectValues() {
// Given
String leftValue = "error message";
Either<String, Integer> either = Either.left(leftValue);
// Then
assertThat(either.isLeft()).isTrue();
assertThat(either.isRight()).isFalse();
assertThat(either.getLeft()).isEqualTo(leftValue);
}
@Test
void leftShouldThrowExceptionWhenGettingRightValue() {
// Given
Either<String, Integer> either = Either.left("error");
// When/Then
assertThatThrownBy(either::getRight)
.isInstanceOf(NoSuchElementException.class)
.hasMessage("This is Left");
}
@Test
void rightShouldReturnCorrectValues() {
// Given
Integer rightValue = 100;
Either<String, Integer> either = Either.right(rightValue);
// Then
assertThat(either.isRight()).isTrue();
assertThat(either.isLeft()).isFalse();
assertThat(either.getRight()).isEqualTo(rightValue);
}
@Test
void rightShouldThrowExceptionWhenGettingLeftValue() {
// Given
Either<String, Integer> either = Either.right(42);
// When/Then
assertThatThrownBy(either::getLeft)
.isInstanceOf(NoSuchElementException.class)
.hasMessage("This is Right");
}
@Test
void shouldApplyLeftFunctionForLeftInstanceInFold() {
// Given
Either<String, Integer> either = Either.left("error");
Function<String, String> leftFn = s -> "Left: " + s;
Function<Integer, String> rightFn = i -> "Right: " + i;
// When
String result = either.fold(leftFn, rightFn);
// Then
assertThat(result).isEqualTo("Left: error");
}
@Test
void shouldApplyRightFunctionForRightInstanceInFold() {
// Given
Either<String, Integer> either = Either.right(42);
Function<String, String> leftFn = s -> "Left: " + s;
Function<Integer, String> rightFn = i -> "Right: " + i;
// When
String result = either.fold(leftFn, rightFn);
// Then
assertThat(result).isEqualTo("Right: 42");
}
@Test
void shouldHandleNullReturnValuesInFold() {
// Given
Either<String, Integer> leftEither = Either.left("error");
Either<String, Integer> rightEither = Either.right(42);
// When
String leftResult = leftEither.fold(s -> null, i -> "not null");
String rightResult = rightEither.fold(s -> "not null", i -> null);
// Then
assertThat(leftResult).isNull();
assertThat(rightResult).isNull();
}
@Test
void leftProjectionShouldExistForLeftInstance() {
// Given
Either<String, Integer> either = Either.left("error");
// When
Either.LeftProjection<String, Integer> projection = either.left();
// Then
assertThat(projection.exists()).isTrue();
assertThat(projection.get()).isEqualTo("error");
}
@Test
void leftProjectionShouldNotExistForRightInstance() {
// Given
Either<String, Integer> either = Either.right(42);
// When
Either.LeftProjection<String, Integer> projection = either.left();
// Then
assertThat(projection.exists()).isFalse();
assertThatThrownBy(projection::get)
.isInstanceOf(NoSuchElementException.class)
.hasMessage("This is Right");
}
@Test
void leftProjectionMapShouldTransformLeftValue() {
// Given
Either<String, Integer> either = Either.left("error");
// When
Either<Integer, Integer> result = either.left().map(String::length);
// Then
assertThat(result.isLeft()).isTrue();
assertThat(result.getLeft()).isEqualTo(5);
}
@Test
void leftProjectionMapShouldPreserveRightValue() {
// Given
Either<String, Integer> either = Either.right(42);
// When
Either<Integer, Integer> result = either.left().map(String::length);
// Then
assertThat(result.isRight()).isTrue();
assertThat(result.getRight()).isEqualTo(42);
}
@Test
void leftProjectionFlatMapShouldTransformLeftValue() {
// Given
Either<String, Integer> either = Either.left("error");
// When
Either<Integer, Integer> result = either.left().flatMap(s -> Either.left(s.length()));
// Then
assertThat(result.isLeft()).isTrue();
assertThat(result.getLeft()).isEqualTo(5);
}
@Test
void leftProjectionFlatMapShouldPreserveRightValue() {
// Given
Either<String, Integer> either = Either.right(42);
// When
Either<Integer, Integer> result = either.left().flatMap(s -> Either.left(s.length()));
// Then
assertThat(result.isRight()).isTrue();
assertThat(result.getRight()).isEqualTo(42);
}
@Test
void leftProjectionFlatMapCanReturnRight() {
// Given
Either<String, Integer> either = Either.left("error");
// When
Either<String, Integer> result = either.left().flatMap(s -> Either.right(999));
// Then
assertThat(result.isRight()).isTrue();
assertThat(result.getRight()).isEqualTo(999);
}
@Test
void leftProjectionToOptionalShouldReturnPresentForLeft() {
// Given
Either<String, Integer> either = Either.left("error");
// When
Optional<String> optional = either.left().toOptional();
// Then
assertThat(optional).isPresent();
assertThat(optional.get()).isEqualTo("error");
}
@Test
void leftProjectionToOptionalShouldReturnEmptyForRight() {
// Given
Either<String, Integer> either = Either.right(42);
// When
Optional<String> optional = either.left().toOptional();
// Then
assertThat(optional).isEmpty();
}
@Test
void leftProjectionConstructorShouldThrowForNullEither() {
// When/Then
assertThatThrownBy(() -> new Either.LeftProjection<>(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("either can't be null");
}
@Test
void rightProjectionShouldExistForRightInstance() {
// Given
Either<String, Integer> either = Either.right(42);
// When
Either.RightProjection<String, Integer> projection = either.right();
// Then
assertThat(projection.exists()).isTrue();
assertThat(projection.get()).isEqualTo(42);
}
@Test
void rightProjectionShouldNotExistForLeftInstance() {
// Given
Either<String, Integer> either = Either.left("error");
// When
Either.RightProjection<String, Integer> projection = either.right();
// Then
assertThat(projection.exists()).isFalse();
assertThatThrownBy(projection::get)
.isInstanceOf(NoSuchElementException.class)
.hasMessage("This is Left");
}
@Test
void rightProjectionMapShouldTransformRightValue() {
// Given
Either<String, Integer> either = Either.right(42);
// When
Either<String, String> result = either.right().map(Object::toString);
// Then
assertThat(result.isRight()).isTrue();
assertThat(result.getRight()).isEqualTo("42");
}
@Test
void rightProjectionMapShouldPreserveLeftValue() {
// Given
Either<String, Integer> either = Either.left("error");
// When
Either<String, String> result = either.right().map(Object::toString);
// Then
assertThat(result.isLeft()).isTrue();
assertThat(result.getLeft()).isEqualTo("error");
}
@Test
void rightProjectionFlatMapShouldTransformRightValue() {
// Given
Either<String, Integer> either = Either.right(42);
// When
Either<String, String> result = either.right().flatMap(i -> Either.right(i.toString()));
// Then
assertThat(result.isRight()).isTrue();
assertThat(result.getRight()).isEqualTo("42");
}
@Test
void rightProjectionFlatMapShouldPreserveLeftValue() {
// Given
Either<String, Integer> either = Either.left("error");
// When
Either<String, String> result = either.right().flatMap(i -> Either.right(i.toString()));
// Then
assertThat(result.isLeft()).isTrue();
assertThat(result.getLeft()).isEqualTo("error");
}
@Test
void rightProjectionFlatMapCanReturnLeft() {
// Given
Either<String, Integer> either = Either.right(42);
// When
Either<String, Integer> result = either.right().flatMap(i -> Either.left("converted"));
// Then
assertThat(result.isLeft()).isTrue();
assertThat(result.getLeft()).isEqualTo("converted");
}
@Test
void rightProjectionToOptionalShouldReturnPresentForRight() {
// Given
Either<String, Integer> either = Either.right(42);
// When
Optional<Integer> optional = either.right().toOptional();
// Then
assertThat(optional).isPresent();
assertThat(optional.get()).isEqualTo(42);
}
@Test
void rightProjectionToOptionalShouldReturnEmptyForLeft() {
// Given
Either<String, Integer> either = Either.left("error");
// When
Optional<Integer> optional = either.right().toOptional();
// Then
assertThat(optional).isEmpty();
}
@Test
void rightProjectionConstructorShouldThrowForNullEither() {
// When/Then
assertThatThrownBy(() -> new Either.RightProjection<>(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("either can't be null");
}
@Test
void shouldHandleNullValuesInTransformations() {
// Given
Either<String, Integer> leftEither = Either.left(null);
Either<String, Integer> rightEither = Either.right(null);
// When/Then
assertThat(leftEither.left().map(s -> s == null ? "was null" : s).getLeft())
.isEqualTo("was null");
assertThat(rightEither.right().map(i -> i == null ? "was null" : i.toString()).getRight())
.isEqualTo("was null");
}
@Test
void shouldHandleComplexTypeTransformations() {
// Given
Either<Exception, String> either = Either.right("hello world");
// When
Either<String, Integer> result = either
.left().map(Exception::getMessage)
.right().map(String::length);
// Then
assertThat(result.isRight()).isTrue();
assertThat(result.getRight()).isEqualTo(11);
}
@Test
void shouldChainTransformationsCorrectly() {
// Given
Either<String, Integer> either = Either.right(10);
// When
Either<String, String> result = either
.right().flatMap(i -> i > 5 ? Either.right(i * 2) : Either.left("too small"))
.right().map(i -> "Result: " + i);
// Then
assertThat(result.isRight()).isTrue();
assertThat(result.getRight()).isEqualTo("Result: 20");
}
@Test
void shouldHandleProjectionChainingWithErrorCases() {
// Given
Either<String, Integer> either = Either.right(3);
// When
Either<String, String> result = either
.right().flatMap(i -> i > 5 ? Either.right(i * 2) : Either.left("too small"))
.right().map(i -> "Result: " + i);
// Then
assertThat(result.isLeft()).isTrue();
assertThat(result.getLeft()).isEqualTo("too small");
}
}

View File

@@ -1,6 +1,9 @@
package io.kestra.core.utils;
import org.junit.Assert;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -25,7 +28,7 @@ class EnumsTest {
@Test
void shouldThrowExceptionGivenInvalidString() {
Assertions.assertThrows(IllegalArgumentException.class, () -> {
assertThrows(IllegalArgumentException.class, () -> {
Enums.getForNameIgnoreCase("invalid", TestEnum.class);
});
}
@@ -49,11 +52,22 @@ class EnumsTest {
String invalidValue = "invalidValue";
// Act & Assert
IllegalArgumentException exception = Assert.assertThrows(IllegalArgumentException.class, () ->
IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () ->
Enums.fromString(invalidValue, mapping, "TestEnumWithValue")
);
}
@Test
void should_get_from_list(){
assertThat(Enums.fromList(List.of(TestEnum.ENUM1, TestEnum.ENUM2), TestEnum.class)).isEqualTo(List.of(TestEnum.ENUM1, TestEnum.ENUM2));
assertThat(Enums.fromList(List.of("ENUM1", "ENUM2"), TestEnum.class)).isEqualTo(List.of(TestEnum.ENUM1, TestEnum.ENUM2));
assertThat(Enums.fromList(TestEnum.ENUM1, TestEnum.class)).isEqualTo(List.of(TestEnum.ENUM1));
assertThat(Enums.fromList("ENUM1", TestEnum.class)).isEqualTo(List.of(TestEnum.ENUM1));
assertThrows(IllegalArgumentException.class, () -> Enums.fromList(List.of("string1", "string2"), TestEnum.class));
assertThrows(IllegalArgumentException.class, () -> Enums.fromList("non enum value", TestEnum.class));
}
enum TestEnum {
ENUM1, ENUM2
}

View File

@@ -6,6 +6,7 @@ import java.util.Collections;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
class ListUtilsTest {
@@ -36,4 +37,19 @@ class ListUtilsTest {
assertThat(ListUtils.concat(list1, null)).isEqualTo(List.of("1", "2"));
assertThat(ListUtils.concat(null, list2)).isEqualTo(List.of("3", "4"));
}
@Test
void convertToList(){
assertThat(ListUtils.convertToList(List.of(1, 2, 3))).isEqualTo(List.of(1, 2, 3));
assertThrows(IllegalArgumentException.class, () -> ListUtils.convertToList("not a list"));
}
@Test
void convertToListString(){
assertThat(ListUtils.convertToListString(List.of("string1", "string2"))).isEqualTo(List.of("string1", "string2"));
assertThat(ListUtils.convertToListString(List.of())).isEqualTo(List.of());
assertThrows(IllegalArgumentException.class, () -> ListUtils.convertToListString("not a list"));
assertThrows(IllegalArgumentException.class, () -> ListUtils.convertToListString(List.of(1, 2, 3)));
}
}

View File

@@ -9,6 +9,7 @@ import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
class MapUtilsTest {
@SuppressWarnings("unchecked")
@@ -194,4 +195,23 @@ class MapUtilsTest {
assertThat(results).hasSize(1);
// due to ordering change on each JVM restart, the result map would be different as different entries will be skipped
}
@Test
void shouldFlattenANestedMap() {
Map<String, Object> results = MapUtils.nestedToFlattenMap(Map.of("k1",Map.of("k2", Map.of("k3", "v1")), "k4", "v2"));
assertThat(results).hasSize(2);
assertThat(results).containsAllEntriesOf(Map.of(
"k1.k2.k3", "v1",
"k4", "v2"
));
}
@Test
void shouldThrowIfNestedMapContainsMultipleEntries() {
var exception = assertThrows(IllegalArgumentException.class,
() -> MapUtils.nestedToFlattenMap(Map.of("k1", Map.of("k2", Map.of("k3", "v1"), "k4", "v2")))
);
assertThat(exception.getMessage()).isEqualTo("You cannot flatten a map with an entry that is a map of more than one element, conflicting key: k1");
}
}

View File

@@ -0,0 +1,77 @@
package io.kestra.core.validations;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.exceptions.BeanInstantiationException;
import org.junit.jupiter.api.Test;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
class AppConfigValidatorTest {
@Test
void validateNoKestraUrl() {
assertThatCode(() -> {
try (ApplicationContext context = ApplicationContext.run()) {
context.getBean(AppConfigValidator.class);
}
})
.as("The bean got initialized properly including the PostConstruct validation")
.doesNotThrowAnyException();
}
@Test
void validateValidKestraUrl() {
assertThatCode(() -> {
try (ApplicationContext context = ApplicationContext.builder()
.deduceEnvironment(false)
.properties(
Map.of("kestra.url", "https://postgres-oss.preview.dev.kestra.io")
)
.start()
) {
context.getBean(AppConfigValidator.class);
}
})
.as("The bean got initialized properly including the PostConstruct validation")
.doesNotThrowAnyException();
}
@Test
void validateInvalidKestraUrl() {
assertThatThrownBy(() -> {
try (ApplicationContext context = ApplicationContext.builder()
.deduceEnvironment(false)
.properties(
Map.of("kestra.url", "postgres-oss.preview.dev.kestra.io")
)
.start()
) {
context.getBean(AppConfigValidator.class);
}
})
.as("The bean initialization failed at PostConstruct")
.isInstanceOf(BeanInstantiationException.class)
.hasMessageContaining("Invalid configuration");
}
@Test
void validateNonHttpKestraUrl() {
assertThatThrownBy(() -> {
try (ApplicationContext context = ApplicationContext.builder()
.deduceEnvironment(false)
.properties(
Map.of("kestra.url", "ftp://postgres-oss.preview.dev.kestra.io")
)
.start()
) {
context.getBean(AppConfigValidator.class);
}
})
.as("The bean initialization failed at PostConstruct")
.isInstanceOf(BeanInstantiationException.class)
.hasMessageContaining("Invalid configuration");
}
}

View File

@@ -54,4 +54,10 @@ class ForEachTest {
assertThat(execution.findTaskRunsByTaskId("e1").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(execution.findTaskRunsByTaskId("e2").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
}
@Test
@ExecuteFlow("flows/valids/foreach-nested.yaml")
void nested(Execution execution) {
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
}
}

View File

@@ -2,7 +2,6 @@ package io.kestra.plugin.core.flow;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.is;
import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.KestraTest;
@@ -104,4 +103,61 @@ class RuntimeLabelsTest {
new Label("taskRunId", labelsTaskRunId),
new Label("existingLabel", "someValue"));
}
@Test
@LoadFlows({"flows/valids/primitive-labels-flow.yml"})
void primitiveTypeLabelsOverrideExistingLabels() throws TimeoutException, QueueException {
Execution execution = runnerUtils.runOne(
MAIN_TENANT,
"io.kestra.tests",
"primitive-labels-flow",
null,
(flow, createdExecution) -> Map.of(
"intLabel", 42,
"boolLabel", true,
"floatLabel", 3.14f
),
null,
List.of(
new Label("intValue", "1"),
new Label("boolValue", "false"),
new Label("floatValue", "4.2f")
)
);
assertThat(execution.getTaskRunList()).hasSize(1);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
String labelsTaskRunId = execution.findTaskRunsByTaskId("update-labels").getFirst().getId();
assertThat(execution.getLabels()).containsExactlyInAnyOrder(
new Label(Label.CORRELATION_ID, execution.getId()),
new Label("intValue", "42"),
new Label("boolValue", "true"),
new Label("floatValue", "3.14"),
new Label("taskRunId", labelsTaskRunId));
}
@Test
@LoadFlows({"flows/valids/labels-update-task-deduplicate.yml"})
void updateGetsDeduplicated() throws TimeoutException, QueueException {
Execution execution = runnerUtils.runOne(
MAIN_TENANT,
"io.kestra.tests",
"labels-update-task-deduplicate",
null,
(flow, createdExecution) -> Map.of(),
null,
List.of()
);
assertThat(execution.getTaskRunList()).hasSize(2);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(execution.getLabels()).containsExactlyInAnyOrder(
new Label(Label.CORRELATION_ID, execution.getId()),
new Label("fromStringKey", "value2"),
new Label("fromListKey", "value2")
);
}
}

View File

@@ -0,0 +1,90 @@
package io.kestra.plugin.core.flow;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.runners.RunnerUtils;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.junit.jupiter.api.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
@KestraTest(startRunner = true)
class SubflowRunnerTest {
@Inject
private RunnerUtils runnerUtils;
@Inject
private ExecutionRepositoryInterface executionRepository;
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue;
@Test
@LoadFlows({"flows/valids/subflow-inherited-labels-child.yaml", "flows/valids/subflow-inherited-labels-parent.yaml"})
void inheritedLabelsAreOverridden() throws QueueException, TimeoutException {
Execution parentExecution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "subflow-inherited-labels-parent");
assertThat(parentExecution.getLabels()).containsExactlyInAnyOrder(
new Label(Label.CORRELATION_ID, parentExecution.getId()),
new Label("parentFlowLabel1", "value1"),
new Label("parentFlowLabel2", "value2")
);
String childExecutionId = (String) parentExecution.findTaskRunsByTaskId("launch").getFirst().getOutputs().get("executionId");
assertThat(childExecutionId).isNotBlank();
Execution childExecution = executionRepository.findById(MAIN_TENANT, childExecutionId).orElseThrow();
assertThat(childExecution.getLabels()).containsExactlyInAnyOrder(
new Label(Label.CORRELATION_ID, parentExecution.getId()), // parent's correlation ID
new Label("childFlowLabel1", "value1"), // defined by the subtask flow
new Label("childFlowLabel2", "value2"), // defined by the subtask flow
new Label("launchTaskLabel", "launchFoo"), // added by Subtask
new Label("parentFlowLabel1", "launchBar"), // overridden by Subtask
new Label("parentFlowLabel2", "value2") // inherited from the parent flow
);
}
@Test
@LoadFlows({"flows/valids/subflow-parent-no-wait.yaml", "flows/valids/subflow-child-with-output.yaml"})
void subflowOutputWithoutWait() throws QueueException, TimeoutException, InterruptedException {
AtomicReference<Execution> childExecution = new AtomicReference<>();
CountDownLatch countDownLatch = new CountDownLatch(1);
Runnable closing = executionQueue.receive(either -> {
if (either.isLeft() && either.getLeft().getFlowId().equals("subflow-child-with-output") && either.getLeft().getState().isTerminated()) {
childExecution.set(either.getLeft());
countDownLatch.countDown();
}
});
Execution parentExecution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "subflow-parent-no-wait");
String childExecutionId = (String) parentExecution.findTaskRunsByTaskId("subflow").getFirst().getOutputs().get("executionId");
assertThat(childExecutionId).isNotBlank();
assertThat(parentExecution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(parentExecution.getTaskRunList()).hasSize(1);
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
assertThat(childExecution.get().getId()).isEqualTo(childExecutionId);
assertThat(childExecution.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(childExecution.get().getTaskRunList()).hasSize(1);
closing.run();
}
}

View File

@@ -4,6 +4,7 @@ namespace: io.kestra.tests
tasks:
- id: cache
type: io.kestra.core.runners.TaskCacheTest$CounterTask
workingDir: "{{workingDir}}"
taskCache:
enabled: true
ttl: PT1S

View File

@@ -0,0 +1,21 @@
id: flow-concurrency-for-each-item
namespace: io.kestra.tests
inputs:
- id: file
type: FILE
- id: batch
type: INT
tasks:
- id: each
type: io.kestra.plugin.core.flow.ForEachItem
items: "{{ inputs.file }}"
batch:
rows: "{{inputs.batch}}"
namespace: io.kestra.tests
flowId: flow-concurrency-queue
wait: true
transmitFailed: true
inputs:
items: "{{ taskrun.items }}"

View File

@@ -0,0 +1,13 @@
id: flow-concurrency-queue-fail
namespace: io.kestra.tests
concurrency:
behavior: QUEUE
limit: 1
tasks:
- id: sleep
type: io.kestra.plugin.core.flow.Sleep
duration: PT2S
- id: fail
type: io.kestra.plugin.core.execution.Fail

View File

@@ -0,0 +1,21 @@
id: foreach-nested
namespace: io.kestra.tests
tasks:
- id: each0
type: io.kestra.plugin.core.flow.ForEach
values: ["l1", "l2"]
tasks:
- id: each1
type: io.kestra.plugin.core.flow.ForEach
concurrencyLimit: 0
values: ["d1", "d2", "d3"]
tasks:
- id: p1
type: io.kestra.plugin.core.debug.Return
format: "{{ parent.taskrun.value }}-{{ taskrun.value }}"
- id: p2
type: io.kestra.plugin.core.debug.Return
format: "{{ outputs.p1[parent.taskrun.value][taskrun.value].value }}"
- id: test
type: io.kestra.plugin.core.debug.Return
format: "{{ outputs.p1 }}"

View File

@@ -0,0 +1,14 @@
id: labels-update-task-deduplicate
namespace: io.kestra.tests
tasks:
- id: from-string
type: io.kestra.plugin.core.execution.Labels
labels: "{ \"fromStringKey\": \"value1\", \"fromStringKey\": \"value2\" }"
- id: from-list
type: io.kestra.plugin.core.execution.Labels
labels:
- key: "fromListKey"
value: "value1"
- key: "fromListKey"
value: "value2"

View File

@@ -0,0 +1,12 @@
id: subflow-child-with-output
namespace: io.kestra.tests
tasks:
- id: return
type: io.kestra.plugin.core.debug.Return
format: "Some value"
outputs:
- id: flow_a_output
type: STRING
value: "{{ outputs.return.value }}"

View File

@@ -0,0 +1,11 @@
id: subflow-inherited-labels-child
namespace: io.kestra.tests
labels:
childFlowLabel1: value1
childFlowLabel2: value2
tasks:
- id: return
type: io.kestra.plugin.core.log.Log
message: "{{ execution.id }}"

View File

@@ -0,0 +1,18 @@
id: subflow-inherited-labels-parent
namespace: io.kestra.tests
labels:
parentFlowLabel1: value1
parentFlowLabel2: value2
tasks:
- id: launch
type: io.kestra.plugin.core.flow.Subflow
namespace: io.kestra.tests
flowId: subflow-inherited-labels-child
wait: true
transmitFailed: true
inheritLabels: true
labels:
launchTaskLabel: launchFoo
parentFlowLabel1: launchBar

View File

@@ -0,0 +1,9 @@
id: subflow-parent-no-wait
namespace: io.kestra.tests
tasks:
- id: subflow
type: io.kestra.plugin.core.flow.Subflow
namespace: io.kestra.tests
flowId: subflow-child-with-output
wait: false

View File

@@ -1,152 +1,199 @@
#!/bin/bash
#!/usr/bin/env bash
#===============================================================================
# SCRIPT: release-plugins.sh
#
# DESCRIPTION:
# This script can be used to run a ./gradlew release command on each kestra plugin repository.
# By default, if no `GITHUB_PAT` environment variable exist, the script will attempt to clone GitHub repositories using SSH_KEY.
# Runs Gradle release for one or multiple Kestra plugin repositories.
# - If $GITHUB_PAT is set, HTTPS cloning via PAT is used.
# - Otherwise, SSH cloning is used (requires SSH key configured on runner).
#
# USAGE:
# ./release-plugins.sh [options] [plugin-repositories...]
#
# USAGE: ./release-plugins.sh [options]
# OPTIONS:
# --release-version <version> Specify the release version (required)
# --next-version <version> Specify the next version (required)
# --dry-run Specify to run in DRY_RUN.
# -y, --yes Automatically confirm prompts (non-interactive).
# -h, --help Show the help message and exit
# --release-version <version> Specify the release version (required).
# --next-version <version> Specify the next (development) version (required).
# --plugin-file <path> File containing the plugin list (default: ../.plugins).
# --dry-run Run in DRY_RUN mode (no publish, no changes pushed).
# --only-changed Skip repositories with no commits since last tag (or --since-tag).
# --since-tag <tag> Use this tag as base for change detection (default: last tag).
# -y, --yes Automatically confirm prompts (non-interactive).
# -h, --help Show this help message and exit.
#
# EXAMPLES:
# To release all plugins:
# ./release-plugins.sh --release-version=0.20.0 --next-version=0.21.0-SNAPSHOT
# To release a specific plugin:
# ./release-plugins.sh --release-version=0.20.0 --next-version=0.21.0-SNAPSHOT plugin-kubernetes
# To release specific plugins from file:
# ./release-plugins.sh --release-version=0.20.0 --plugin-file .plugins
# # Release all plugins from .plugins:
# ./release-plugins.sh --release-version=1.0.0 --next-version=1.1.0-SNAPSHOT
#
# # Release a specific plugin:
# ./release-plugins.sh --release-version=1.0.0 --next-version=1.1.0-SNAPSHOT plugin-kubernetes
#
# # Release specific plugins from file:
# ./release-plugins.sh --release-version=1.0.0 --next-version=1.1.0-SNAPSHOT --plugin-file .plugins
#
# # Release only plugins that have changed since the last tag:
# ./release-plugins.sh --release-version=1.0.0 --next-version=1.1.0-SNAPSHOT --only-changed --yes
#===============================================================================
set -e;
set -euo pipefail
###############################################################
# Global vars
# Globals
###############################################################
BASEDIR=$(dirname "$(readlink -f $0)")
WORKING_DIR=/tmp/kestra-release-plugins-$(date +%s);
BASEDIR=$(dirname "$(readlink -f "$0")")
WORKING_DIR="/tmp/kestra-release-plugins-$(date +%s)"
PLUGIN_FILE="$BASEDIR/../.plugins"
GIT_BRANCH=master
GIT_BRANCH="master" # Fallback if default branch cannot be detected
###############################################################
# Functions
###############################################################
# Function to display the help message
usage() {
echo "Usage: $0 --release-version <version> --next-version [plugin-repositories...]"
echo
echo "Options:"
echo " --release-version <version> Specify the release version (required)."
echo " --next-version <version> Specify the next version (required)."
echo " --plugin-file File containing the plugin list (default: .plugins)"
echo " --dry-run Specify to run in DRY_RUN."
echo " -y, --yes Automatically confirm prompts (non-interactive)."
echo " -h, --help Show this help message and exit."
exit 1
echo "Usage: $0 --release-version <version> --next-version <version> [options] [plugin-repositories...]"
echo
echo "Options:"
echo " --release-version <version> Specify the release version (required)."
echo " --next-version <version> Specify the next version (required)."
echo " --plugin-file <path> File containing the plugin list (default: ../.plugins)."
echo " --dry-run Run in DRY_RUN mode."
echo " --only-changed Skip repositories with no commits since last tag (or --since-tag)."
echo " --since-tag <tag> Use this tag as base for change detection (default: last tag)."
echo " -y, --yes Automatically confirm prompts (non-interactive)."
echo " -h, --help Show this help message and exit."
exit 1
}
# Function to ask to continue
function askToContinue() {
read -p "Are you sure you want to continue? [y/N] " confirm
askToContinue() {
read -r -p "Are you sure you want to continue? [y/N] " confirm
[[ "$confirm" =~ ^[Yy]$ ]] || { echo "Operation cancelled."; exit 1; }
}
# Detect default branch from remote; fallback to $GIT_BRANCH if unknown
detect_default_branch() {
local default_branch
default_branch=$(git remote show origin | sed -n '/HEAD branch/s/.*: //p' || true)
if [[ -z "${default_branch:-}" ]]; then
default_branch="$GIT_BRANCH"
fi
echo "$default_branch"
}
# Return last tag that matches v* or any tag if v* not found; empty if none
last_tag_or_empty() {
local tag
tag=$(git tag --list 'v*' --sort=-v:refname | head -n1 || true)
if [[ -z "${tag:-}" ]]; then
tag=$(git tag --sort=-creatordate | head -n1 || true)
fi
echo "$tag"
}
# True (0) if there are commits since tag on branch, False (1) otherwise.
has_changes_since_tag() {
local tag="$1"
local branch="$2"
if [[ -z "$tag" ]]; then
# No tag => consider it as changed (first release)
return 0
fi
git fetch --tags --quiet
git fetch origin "$branch" --quiet
local count
count=$(git rev-list --count "${tag}..origin/${branch}" || echo "0")
[[ "${count}" -gt 0 ]]
}
###############################################################
# Options
# Options parsing
###############################################################
PLUGINS_ARGS=()
AUTO_YES=false
DRY_RUN=false
# Get the options
ONLY_CHANGED=false
SINCE_TAG=""
RELEASE_VERSION=""
NEXT_VERSION=""
while [[ "$#" -gt 0 ]]; do
case "$1" in
--release-version)
RELEASE_VERSION="$2"
shift 2
;;
--release-version=*)
RELEASE_VERSION="${1#*=}"
shift
;;
--next-version)
NEXT_VERSION="$2"
shift 2
;;
--next-version=*)
NEXT_VERSION="${1#*=}"
shift
;;
--plugin-file)
PLUGIN_FILE="$2"
shift 2
;;
--plugin-file=*)
PLUGIN_FILE="${1#*=}"
shift
;;
--dry-run)
DRY_RUN=true
shift
;;
-y|--yes)
AUTO_YES=true
shift
;;
-h|--help)
usage
;;
*)
PLUGINS_ARGS+=("$1")
shift
;;
esac
case "$1" in
--release-version)
RELEASE_VERSION="$2"; shift 2 ;;
--release-version=*)
RELEASE_VERSION="${1#*=}"; shift ;;
--next-version)
NEXT_VERSION="$2"; shift 2 ;;
--next-version=*)
NEXT_VERSION="${1#*=}"; shift ;;
--plugin-file)
PLUGIN_FILE="$2"; shift 2 ;;
--plugin-file=*)
PLUGIN_FILE="${1#*=}"; shift ;;
--dry-run)
DRY_RUN=true; shift ;;
--only-changed)
ONLY_CHANGED=true; shift ;;
--since-tag)
SINCE_TAG="$2"; shift 2 ;;
--since-tag=*)
SINCE_TAG="${1#*=}"; shift ;;
-y|--yes)
AUTO_YES=true; shift ;;
-h|--help)
usage ;;
*)
PLUGINS_ARGS+=("$1"); shift ;;
esac
done
## Check options
# Required options
if [[ -z "$RELEASE_VERSION" ]]; then
echo -e "Missing required argument: --release-version\n";
usage
echo -e "Missing required argument: --release-version\n"; usage
fi
if [[ -z "$NEXT_VERSION" ]]; then
echo -e "Missing required argument: --next-version\n";
usage
echo -e "Missing required argument: --next-version\n"; usage
fi
## Get plugin list
###############################################################
# Build plugin list (from args or from .plugins)
###############################################################
PLUGINS_ARRAY=()
PLUGINS_COUNT=0
if [[ "${#PLUGINS_ARGS[@]}" -eq 0 ]]; then
if [ -f "$PLUGIN_FILE" ]; then
PLUGINS=$(cat "$PLUGIN_FILE" | grep "io\\.kestra\\." | sed -e '/#/s/^.//' | cut -d':' -f1 | uniq | sort);
PLUGINS_COUNT=$(echo "$PLUGINS" | wc -l);
PLUGINS_ARRAY=$(echo "$PLUGINS" | xargs || echo '');
PLUGINS_ARRAY=($PLUGINS_ARRAY);
if [[ -f "$PLUGIN_FILE" ]]; then
# Keep only uncommented lines, then keep the first column (repo name)
mapfile -t PLUGINS_ARRAY < <(
grep -E '^\s*[^#]' "$PLUGIN_FILE" 2>/dev/null \
| grep "io\.kestra\." \
| cut -d':' -f1 \
| uniq | sort
)
PLUGINS_COUNT="${#PLUGINS_ARRAY[@]}"
else
echo "Plugin file not found: $PLUGIN_FILE"
exit 1
fi
else
PLUGINS_ARRAY=("${PLUGINS_ARGS[@]}")
PLUGINS_COUNT="${#PLUGINS_ARGS[@]}"
fi
# Extract the major and minor versions
# Extract major.minor (e.g. 0.21) to build the release branch name
BASE_VERSION=$(echo "$RELEASE_VERSION" | sed -E 's/^([0-9]+\.[0-9]+)\..*/\1/')
PUSH_RELEASE_BRANCH="releases/v${BASE_VERSION}.x"
## Get plugin list
echo "RELEASE_VERSION=$RELEASE_VERSION"
echo "NEXT_VERSION=$NEXT_VERSION"
echo "PUSH_RELEASE_BRANCH=$PUSH_RELEASE_BRANCH"
echo "GIT_BRANCH=$GIT_BRANCH"
echo "GIT_BRANCH=$GIT_BRANCH (fallback)"
echo "DRY_RUN=$DRY_RUN"
echo "Found ($PLUGINS_COUNT) plugin repositories:";
echo "ONLY_CHANGED=$ONLY_CHANGED"
echo "SINCE_TAG=${SINCE_TAG:-<auto>}"
echo "Found ($PLUGINS_COUNT) plugin repositories:"
for PLUGIN in "${PLUGINS_ARRAY[@]}"; do
echo "$PLUGIN"
echo " - $PLUGIN"
done
if [[ "$AUTO_YES" == false ]]; then
@@ -156,49 +203,77 @@ fi
###############################################################
# Main
###############################################################
mkdir -p $WORKING_DIR
mkdir -p "$WORKING_DIR"
COUNTER=1;
for PLUGIN in "${PLUGINS_ARRAY[@]}"
do
cd $WORKING_DIR;
COUNTER=1
for PLUGIN in "${PLUGINS_ARRAY[@]}"; do
cd "$WORKING_DIR"
echo "---------------------------------------------------------------------------------------"
echo "[$COUNTER/$PLUGINS_COUNT] Release Plugin: $PLUGIN"
echo "---------------------------------------------------------------------------------------"
if [[ -z "${GITHUB_PAT}" ]]; then
git clone git@github.com:kestra-io/$PLUGIN
# Clone the repo using SSH, otherwise PAT if provided
if [[ -z "${GITHUB_PAT:-}" ]]; then
git clone "git@github.com:kestra-io/${PLUGIN}.git"
else
echo "Clone git repository using GITHUB PAT"
git clone https://${GITHUB_PAT}@github.com/kestra-io/$PLUGIN.git
git clone "https://${GITHUB_PAT}@github.com/kestra-io/${PLUGIN}.git"
fi
cd "$PLUGIN";
if [[ "$PLUGIN" == "plugin-transform" ]] && [[ "$GIT_BRANCH" == "master" ]]; then # quickfix
git checkout main;
else
git checkout "$GIT_BRANCH";
cd "$PLUGIN"
# Determine the default branch dynamically to avoid hardcoding "master"/"main"
DEFAULT_BRANCH=$(detect_default_branch)
git checkout "$DEFAULT_BRANCH"
# Skip if the release tag already exists on remote (check both with and without 'v' prefix)
TAG_EXISTS=$(
{ git ls-remote --tags origin "refs/tags/v${RELEASE_VERSION}" \
&& git ls-remote --tags origin "refs/tags/${RELEASE_VERSION}"; } | wc -l
)
if [[ "$TAG_EXISTS" -ne 0 ]]; then
echo "Tag ${RELEASE_VERSION} already exists for $PLUGIN. Skipping..."
COUNTER=$(( COUNTER + 1 ))
continue
fi
# Change detection (if requested)
if [[ "$ONLY_CHANGED" == true ]]; then
git fetch --tags --quiet
git fetch origin "$DEFAULT_BRANCH" --quiet
BASE_TAG="$SINCE_TAG"
if [[ -z "$BASE_TAG" ]]; then
BASE_TAG=$(last_tag_or_empty)
fi
if has_changes_since_tag "$BASE_TAG" "$DEFAULT_BRANCH"; then
echo "Changes detected since ${BASE_TAG:-<no-tag>} on ${DEFAULT_BRANCH}, proceeding."
else
echo "No changes since ${BASE_TAG:-<no-tag>} on ${DEFAULT_BRANCH} for $PLUGIN. Skipping..."
COUNTER=$(( COUNTER + 1 ))
continue
fi
fi
if [[ "$DRY_RUN" == false ]]; then
CURRENT_BRANCH=$(git branch --show-current);
echo "Run gradle release for plugin: $PLUGIN";
echo "Branch: $CURRENT_BRANCH";
CURRENT_BRANCH=$(git branch --show-current)
echo "Run gradle release for plugin: $PLUGIN"
echo "Branch: $CURRENT_BRANCH"
if [[ "$AUTO_YES" == false ]]; then
askToContinue
fi
# Create and push release branch
git checkout -b "$PUSH_RELEASE_BRANCH";
git push -u origin "$PUSH_RELEASE_BRANCH";
# Create and push the release branch (branch that will hold the release versions)
git checkout -b "$PUSH_RELEASE_BRANCH"
git push -u origin "$PUSH_RELEASE_BRANCH"
# Run gradle release
git checkout "$CURRENT_BRANCH";
# Switch back to the working branch to run the gradle release
git checkout "$CURRENT_BRANCH"
# Run Gradle release with snapshot tolerance if releaseVersion contains -SNAPSHOT
if [[ "$RELEASE_VERSION" == *"-SNAPSHOT" ]]; then
# -SNAPSHOT qualifier maybe used to test release-candidates
./gradlew release -Prelease.useAutomaticVersion=true \
-Prelease.releaseVersion="${RELEASE_VERSION}" \
-Prelease.newVersion="${NEXT_VERSION}" \
@@ -211,19 +286,28 @@ do
-Prelease.pushReleaseVersionBranch="${PUSH_RELEASE_BRANCH}"
fi
git push;
# Update the upper bound version of kestra
# Push new commits/tags created by the release plugin
git push --follow-tags
# Update the upper bound version of Kestra on the release branch (e.g., [0.21,))
PLUGIN_KESTRA_VERSION="[${BASE_VERSION},)"
git checkout "$PUSH_RELEASE_BRANCH" && git pull;
git checkout "$PUSH_RELEASE_BRANCH" && git pull --ff-only
sed -i "s/^kestraVersion=.*/kestraVersion=${PLUGIN_KESTRA_VERSION}/" ./gradle.properties
git add ./gradle.properties
git commit -m"chore(deps): update kestraVersion to ${PLUGIN_KESTRA_VERSION}."
git push
sleep 5; # add a short delay to not spam Maven Central
else
echo "Skip gradle release [DRY_RUN=true]";
fi
COUNTER=$(( COUNTER + 1 ));
done;
exit 0;
# Commit only if there are actual changes staged
if ! git diff --cached --quiet; then
git commit -m "chore(deps): update kestraVersion to ${PLUGIN_KESTRA_VERSION}."
git push
fi
# Small delay to avoid hammering Maven Central
sleep 5
else
echo "Skip gradle release [DRY_RUN=true]"
fi
COUNTER=$(( COUNTER + 1 ))
done
exit 0

View File

@@ -1,4 +1,4 @@
version=0.24.0-SNAPSHOT
version=1.0.0-SNAPSHOT
org.gradle.jvmargs=-Xmx2g -XX:MaxMetaspaceSize=512m -XX:+HeapDumpOnOutOfMemoryError
org.gradle.parallel=true

View File

@@ -0,0 +1,15 @@
package io.kestra.runner.h2;
import io.kestra.core.runners.ExecutionRunning;
import io.kestra.jdbc.runner.AbstractJdbcExecutionRunningStorage;
import io.kestra.repository.h2.H2Repository;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@H2QueueEnabled
public class H2ExecutionRunningStorage extends AbstractJdbcExecutionRunningStorage {
public H2ExecutionRunningStorage(@Named("executionrunning") H2Repository<ExecutionRunning> repository) {
super(repository);
}
}

View File

@@ -144,4 +144,12 @@ public class H2QueueFactory implements QueueFactoryInterface {
public QueueInterface<SubflowExecutionEnd> subflowExecutionEnd() {
return new H2Queue<>(SubflowExecutionEnd.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<ExecutionRunning> executionRunning() {
return new H2Queue<>(ExecutionRunning.class, applicationContext);
}
}

View File

@@ -0,0 +1,29 @@
CREATE TABLE IF NOT EXISTS execution_running (
"key" VARCHAR(250) NOT NULL PRIMARY KEY,
"value" TEXT NOT NULL,
"tenant_id" VARCHAR(250) GENERATED ALWAYS AS (JQ_STRING("value", '.tenantId')),
"namespace" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.namespace')),
"flow_id" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.flowId'))
);
CREATE INDEX IF NOT EXISTS execution_running__flow ON execution_running ("tenant_id", "namespace", "flow_id");
ALTER TABLE queues ALTER COLUMN "type" ENUM(
'io.kestra.core.models.executions.Execution',
'io.kestra.core.models.templates.Template',
'io.kestra.core.models.executions.ExecutionKilled',
'io.kestra.core.runners.WorkerJob',
'io.kestra.core.runners.WorkerTaskResult',
'io.kestra.core.runners.WorkerInstance',
'io.kestra.core.runners.WorkerTaskRunning',
'io.kestra.core.models.executions.LogEntry',
'io.kestra.core.models.triggers.Trigger',
'io.kestra.ee.models.audits.AuditLog',
'io.kestra.core.models.executions.MetricEntry',
'io.kestra.core.runners.WorkerTriggerResult',
'io.kestra.core.runners.SubflowExecutionResult',
'io.kestra.core.server.ClusterEvent',
'io.kestra.core.runners.SubflowExecutionEnd',
'io.kestra.core.models.flows.FlowInterface',
'io.kestra.core.runners.ExecutionRunning'
) NOT NULL

View File

@@ -0,0 +1,15 @@
package io.kestra.runner.mysql;
import io.kestra.core.runners.ExecutionRunning;
import io.kestra.jdbc.runner.AbstractJdbcExecutionRunningStorage;
import io.kestra.repository.mysql.MysqlRepository;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@MysqlQueueEnabled
public class MysqlExecutionRunningStorage extends AbstractJdbcExecutionRunningStorage {
public MysqlExecutionRunningStorage(@Named("executionrunning") MysqlRepository<ExecutionRunning> repository) {
super(repository);
}
}

View File

@@ -144,4 +144,12 @@ public class MysqlQueueFactory implements QueueFactoryInterface {
public QueueInterface<SubflowExecutionEnd> subflowExecutionEnd() {
return new MysqlQueue<>(SubflowExecutionEnd.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<ExecutionRunning> executionRunning() {
return new MysqlQueue<>(ExecutionRunning.class, applicationContext);
}
}

View File

@@ -0,0 +1,28 @@
CREATE TABLE IF NOT EXISTS execution_running (
`key` VARCHAR(250) NOT NULL PRIMARY KEY,
`value` JSON NOT NULL,
`tenant_id` VARCHAR(250) GENERATED ALWAYS AS (value ->> '$.tenantId') STORED,
`namespace` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.namespace') STORED NOT NULL,
`flow_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.flowId') STORED NOT NULL,
INDEX ix_flow (tenant_id, namespace, flow_id)
);
ALTER TABLE queues MODIFY COLUMN `type` ENUM(
'io.kestra.core.models.executions.Execution',
'io.kestra.core.models.templates.Template',
'io.kestra.core.models.executions.ExecutionKilled',
'io.kestra.core.runners.WorkerJob',
'io.kestra.core.runners.WorkerTaskResult',
'io.kestra.core.runners.WorkerInstance',
'io.kestra.core.runners.WorkerTaskRunning',
'io.kestra.core.models.executions.LogEntry',
'io.kestra.core.models.triggers.Trigger',
'io.kestra.ee.models.audits.AuditLog',
'io.kestra.core.models.executions.MetricEntry',
'io.kestra.core.runners.WorkerTriggerResult',
'io.kestra.core.runners.SubflowExecutionResult',
'io.kestra.core.server.ClusterEvent',
'io.kestra.core.runners.SubflowExecutionEnd',
'io.kestra.core.models.flows.FlowInterface',
'io.kestra.core.runners.ExecutionRunning'
) NOT NULL;

View File

@@ -0,0 +1 @@
ALTER TABLE queues MODIFY COLUMN `offset` BIGINT NOT NULL AUTO_INCREMENT;

View File

@@ -0,0 +1,15 @@
package io.kestra.runner.postgres;
import io.kestra.core.runners.ExecutionRunning;
import io.kestra.jdbc.runner.AbstractJdbcExecutionRunningStorage;
import io.kestra.repository.postgres.PostgresRepository;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@PostgresQueueEnabled
public class PostgresExecutionRunningStorage extends AbstractJdbcExecutionRunningStorage {
public PostgresExecutionRunningStorage(@Named("executionrunning") PostgresRepository<ExecutionRunning> repository) {
super(repository);
}
}

View File

@@ -144,4 +144,12 @@ public class PostgresQueueFactory implements QueueFactoryInterface {
public QueueInterface<SubflowExecutionEnd> subflowExecutionEnd() {
return new PostgresQueue<>(SubflowExecutionEnd.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<ExecutionRunning> executionRunning() {
return new PostgresQueue<>(ExecutionRunning.class, applicationContext);
}
}

Some files were not shown because too many files have changed in this diff Show More