Compare commits

..

201 Commits

Author SHA1 Message Date
Loïc Mathieu
8746c24170 chore(system): extract queue consumers processing into message handlers 2025-11-19 11:40:56 +01:00
Loïc Mathieu
a09f61fcfd fix(flow): flow trigger with both conditions and preconditions
When a flow have both a condition and a precondition, the condition was evaluated twice which lead to double execution triggered.

Fixes
2025-11-14 18:16:32 +01:00
Loïc Mathieu
687ce00d33 fix(test): increase indexing waiting sleep 2025-11-13 18:08:39 +01:00
Loïc Mathieu
133828bdf1 feat(core): remove deprecated runner property in favor or taskRunner
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-13 18:08:39 +01:00
Loïc Mathieu
94d0975b78 feat(core): remove Property deprecated methdso and constructors
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-13 18:08:39 +01:00
Loïc Mathieu
b6e44954c6 feat(flow): remove FILE input extension
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-13 18:08:39 +01:00
Loïc Mathieu
0167f5f806 feat(flow): remove JSON flow support
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-13 18:08:39 +01:00
Loïc Mathieu
97769faba7 feat(flow): remove state store
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-13 18:08:39 +01:00
Loïc Mathieu
e6b5c8ec77 chore(system): remove kafka stream 2025-11-10 12:26:46 +01:00
Loïc Mathieu
052120766e fix(system): trigger an execution once per condition on flow triggers
Fixes #12560
2025-11-10 12:26:46 +01:00
Loïc Mathieu
999719ea69 fix(core): remove PostgresSchedulerScheduleTest as other JDBC impl didn't have it 2025-11-10 12:26:46 +01:00
Loïc Mathieu
f0790af2e5 feat(system): refactor concurrency limit 2025-11-10 12:26:46 +01:00
Loïc Mathieu
8323691aa3 feat(system): move the DefaultServiceLivenessCoordinator to the executor
As it is only started by the executor it should be inside this module
2025-11-10 12:26:46 +01:00
Loïc Mathieu
1f50be8828 feat(system): move flow topoloigy in its own component 2025-11-10 12:26:46 +01:00
Loïc Mathieu
93de3ecbb0 fix(system): MySQL migration 2025-11-10 12:26:46 +01:00
Loïc Mathieu
a88db9b0ad feat(system): rename WorkerGroupExecutor to WorkerGroupMetaStore 2025-11-10 12:26:46 +01:00
Loïc Mathieu
1401cac418 feat(services): use a single service liveness coordinator 2025-11-10 12:26:46 +01:00
Loïc Mathieu
6e2aaaf8a0 feat(system): un-couple queues and repositories 2025-11-10 12:26:46 +01:00
Loïc Mathieu
ff5d07cef8 feat(system): queue indexer 2025-11-10 12:26:46 +01:00
Loïc Mathieu
b44a855aa5 feat(system): Executor v2 2025-11-10 12:26:46 +01:00
Loïc Mathieu
d499c621d6 fix(locks): tryLock should release the lock 2025-11-04 14:25:08 +01:00
Loïc Mathieu
f6944d4e45 feat(system): improve locks
- Switch LockException to be a runtime exception
- Implements a tryLock() mechanism so skip the runnable if it's already locked
2025-11-04 14:25:08 +01:00
Florian Hussonnois
7f17e42da2 refactor(system): extract JdbcQueuePoller class from JdbcQueue
Extract a JdbcQueueConfiguration and JdbcQueuePoller classes from
JdbcQueue to improve clarity, testability and reuse of the code.
2025-11-04 14:25:08 +01:00
Loïc Mathieu
9bea470010 feat(flows): remove deprecated Schedule.scheduleConditions
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:25:08 +01:00
Loïc Mathieu
9baf648a24 feat(flows): remove deprecated FlowCondition and FlowNamespaceCondition
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:25:08 +01:00
Loïc Mathieu
0bfbee9a8a feat(flows): remove deprecated MultipleCondition condition
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:25:08 +01:00
Loïc Mathieu
7f11774a5c fix(tests): add a sleep to be sure ES indexation happens before deleting 2025-11-04 14:25:08 +01:00
Loïc Mathieu
9693206374 feat(system): add a lock mechanism 2025-11-04 14:25:08 +01:00
Loïc Mathieu
2b1f81047a feat(flows): remove deprecated LocalFiles task
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Loïc Mathieu
9ce2541497 feat(flows): remove deprecated Pebble json function and filter
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Loïc Mathieu
354ee5b233 feat(flows): remove deprecated EachParallel task
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Loïc Mathieu
7208aeec59 feat(flows): remove deprecated EachSequential
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Loïc Mathieu
52a81a7547 feat(flows): remove deprecated flow update task endpoint
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Florian Hussonnois
a108d89c86 chore(core): add a core Disposable interface 2025-11-04 14:24:48 +01:00
Loïc Mathieu
e3a8811ed2 chore(system): switch new migrations to V3 2025-11-04 14:24:48 +01:00
Loïc Mathieu
efcd68dfd5 feat(system): remove deprecated code not used anymore 2025-11-04 14:24:48 +01:00
Loïc Mathieu
c5eccb6476 feat(system): remove task defaults
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Loïc Mathieu
2a1118473e feat(flows): remove flow expand helper
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Loïc Mathieu
d4244a4eb4 feat(flows): remove Templates
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Loïc Mathieu
5e4be69dc9 feat(flows): remove the deprecated Echo task
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:22:25 +01:00
Loïc Mathieu
3b702597f5 feat(flows): remove deprecated ENUM inputs
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:22:25 +01:00
Loïc Mathieu
03883bbeff feat(flows): remove deprecated BOOLEAN inputs
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:22:25 +01:00
Loïc Mathieu
3231cd8b9c feat(flows): remove deprecated flow listeners
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:22:25 +01:00
Loïc Mathieu
35b8364071 feat(flows): remove deprecated input name
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:22:25 +01:00
brian.mulier
9294c9f885 chore(version): upgrade to v1.2.0-SNAPSHOT 2025-11-04 14:04:13 +01:00
brian.mulier
ee63c33ef3 fix(ci): failsafe global-create-new-release-branch.yml 2025-11-04 13:49:59 +01:00
Roman Acevedo
d620dd7dec test: set retryWithFlowableErrors as FlakyTest 2025-11-04 13:46:14 +01:00
brian.mulier
02425586d6 fix(ci): failsafe global-create-new-release-branch.yml 2025-11-04 13:44:44 +01:00
brian.mulier
56d48ddf32 fix(ci): failsafe global-create-new-release-branch.yml 2025-11-04 13:41:32 +01:00
brian.mulier
1a5c79827b fix(ci): failsafe global-create-new-release-branch.yml 2025-11-04 13:36:06 +01:00
Florian Hussonnois
08b20fda68 fix(core): resolution of plugin must be done with a stable version 2025-11-04 11:29:47 +01:00
François Delbrayelle
7192ad1494 doc(http/request): fix doc about basic auth (#12626) 2025-11-04 10:44:36 +01:00
YannC
f164cddf7a Fix/sdk changes (#12411) (#12617)
* Fix/sdk changes (#12411)

* fix: kv controller remove namespace check

* clean(API): add query to filter parameter

* fix: flow update not deprecated

* clean(API): add deprecated on open api

* feat: executions annotations for skipping, follow method generation in sdk

* feat: add typing indication to validateTask

* fix(flowController): set correct hidden for json method in

* fix: optional params in delete executions endpoints

* fix: inputs/outputs as object

* change KV schema type to be object

* add back , deprecated = false on flow update, otherwise its marked as deprecated

* Revert "add back , deprecated = false on flow update, otherwise its marked as deprecated"

This reverts commit 3772404b68f14f0a80af9e0adb9952d58e9102b4.

* feat(API): add multipart to openAPI

* feat(API): add multipart to openAPI

* fix: only use plain-text for setKeyValue endpoint

* fix: KV command test

* chore: add multipart vendor annotations for custom generation on SDK

---------

Co-authored-by: YannC. <ycoornaert@kestra.io>
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>

* fix: kv test remove content type

---------

Co-authored-by: Roman Acevedo <roman.acevedo62@gmail.com>
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-11-04 08:37:08 +01:00
Barthélémy Ledoux
c1e18eb490 refactor: make typescript progress on logs (#12603) 2025-11-03 14:31:26 +01:00
Miloš Paunović
4365a108ac chore(core): enhance github issue templates (#12572) 2025-11-03 10:48:03 +00:00
brian-mulier-p
bb0e15a2cc fix(cli): avoid resaving existing metadata upon migration (#12607) 2025-11-03 11:16:03 +01:00
brian-mulier-p
3ab6d6a94f feat(cli): have separate commands for KV & secrets metadata migrations (#12585) 2025-11-03 09:33:26 +01:00
Krie
e116186201 feat: make charts default duration configurable, add kestra.ui.charts… (#12599)
* feat: make charts default duration configurable, add kestra.ui.charts.default-duration config parameter (default: PT720H/30 days)

* Update cli/src/main/resources/application.yml

Co-authored-by: YannC <37600690+Skraye@users.noreply.github.com>

* Update ui/src/components/flows/FlowRoot.vue

Co-authored-by: YannC <37600690+Skraye@users.noreply.github.com>

* Update ui/src/routes/routes.js

Co-authored-by: YannC <37600690+Skraye@users.noreply.github.com>

* feat: set P30D instead of PT720H for readability

---------

Co-authored-by: YannC <37600690+Skraye@users.noreply.github.com>
Co-authored-by: YannC. <ycoornaert@kestra.io>
2025-11-03 09:21:39 +01:00
Barthélémy Ledoux
6439671b91 fix: on producion, switch appear without fields (#12579) 2025-11-01 22:37:53 +01:00
Piyush Bhaskar
c044634381 fix(core): allow to show multiple labels and few cleanup (#12587) 2025-11-01 01:06:39 +05:30
Roman Acevedo
776ea0a93c ci: add dry run to release-docker.yml (#12586) 2025-10-31 20:02:21 +01:00
Piyush Bhaskar
a799ef8b64 fix(core): fix the pagination (#12569) 2025-10-31 20:57:17 +05:30
Loïc Mathieu
e2e4335771 fix(flows): fail flow validation for duplicate preconditions ID 2025-10-31 16:22:13 +01:00
Loïc Mathieu
f8b0d4217f fix(executions): Flow triggered twice when there are two multiple conditions
Fixes #12560
2025-10-31 16:22:13 +01:00
Pradumna Saraf
c594aa6764 chore(plugin): improve the OutputValues example 2025-10-31 15:15:44 +01:00
Piyush Bhaskar
d09bf5ac96 fix(core): lets have separate key and value input for labels for EQUALS, NOT_EQUALS operator (#12577) 2025-10-31 19:24:58 +05:30
Ishani Kundu
ef0a4e6b1a fix: Decrease the spacing between top of the filters and the bottom of the header (#12573)
Co-authored-by: Piyush Bhaskar <impiyush0012@gmail.com>
2025-10-31 17:35:34 +05:30
Piyush Bhaskar
5f81c19fc7 refactor(triggers): clean usage of trigger_state (#12568) 2025-10-31 16:36:03 +05:30
Mohammad Zaki
701f7e22d8 refactor(core): convert LogLevelSelector.vue to TypeScript and Composition API (#12556)
Co-authored-by: Piyush Bhaskar <impiyush0012@gmail.com>
2025-10-31 15:13:34 +05:30
Miloš Paunović
4bf469c992 fix(core): ensure "clicking fix with ai" button always shows and focuses the code panel (#12565)
Closes https://github.com/kestra-io/kestra/issues/12504.
2025-10-31 10:27:11 +01:00
Roman Acevedo
71e49f9eb5 feat(executions): add IN, NOT_IN, CONTAINS LABELS #11916
- advance on https://github.com/kestra-io/kestra/issues/11587
- companion PR: https://github.com/kestra-io/kestra-ee/pull/5617
2025-10-31 10:20:05 +01:00
Ishani Kundu
76e9b2269f refactor(core): convert Plugin.vue component to ts with Composition API (#12559)
Co-authored-by: Piyush Bhaskar <impiyush0012@gmail.com>
2025-10-31 14:45:48 +05:30
github-actions[bot]
c3f34e1c2a chore(core): localize to languages other than english (#12555)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-10-31 08:10:36 +01:00
Ludovic DEHON
e01e8d8fe0 feat(tasks): add a params for http tasks
close #12507
2025-10-31 00:40:49 +01:00
Dnyanesh Pise
7c5092f281 fix(ui): prevent marking fields as error on login (Fix #12548) (#12554) 2025-10-30 23:34:34 +05:30
Loïc Mathieu
e025677e70 fix(executions): set the execution to KILLING and not RESTARTED when killing a paused flow
Fixes https://github.com/kestra-io/kestra/issues/12417
2025-10-30 18:11:17 +01:00
Bart Ledoux
a3195c8e64 feat(ui): add concurency limit page and route 2025-10-30 17:49:33 +01:00
Loïc Mathieu
9920d190c8 feat(system): add an administration page for concurrency limit
Closes  #11250
2025-10-30 17:49:33 +01:00
brian.mulier
2b29a36850 fix(kv): get value doesn't need metadata migration
Also purge expired kv in metadata migrate command
2025-10-30 16:57:57 +01:00
brian.mulier
07e90de835 fix(core): CrudEvent should not be done on the repository side for KV 2025-10-30 16:57:57 +01:00
Loïc Mathieu
1c097209ac fix(flows): subflow validation could fail in Elasticsearch
As you cannot eq on a null field
2025-10-30 16:29:41 +01:00
Krie
ca70743329 chore(core): clarify usage of vite environment variables in docs (#12520)
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-10-30 16:18:37 +01:00
github-actions[bot]
5d2c93b232 chore(core): localize to languages other than english (#12550)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-10-30 16:16:04 +01:00
Loïc Mathieu
bc7291b8e3 chore(deps): fix OpenTelemetry proto so it works with Protobuf 3
Fixes https://github.com/kestra-io/kestra/issues/12298
2025-10-30 15:47:10 +01:00
Loïc Mathieu
c06ffb3063 feat(system): set taskrun attempt to resubmitted when a taskrun is resubmitted to a worker
Closes https://github.com/kestra-io/kestra/issues/12481
2025-10-30 15:46:05 +01:00
Barthélémy Ledoux
7c89eec500 fix(nocode): switch statements should display corectly (#12509)
Co-authored-by: brian.mulier <bmmulier@hotmail.fr>
2025-10-30 15:34:15 +01:00
Ritoban Dutta
45592597e7 chore: convert StateChart.vue component to TypeScript with Composition API (#12537)
Co-authored-by: Piyush Bhaskar <impiyush0012@gmail.com>
2025-10-30 17:32:47 +05:30
Hemant M Mehta
313fda153a fix: avoid reusing jq scope and revert unnecessary test changes
Signed-off-by: Hemant M Mehta <hemant29mehta@gmail.com>
2025-10-30 12:53:44 +01:00
Hemant M Mehta
6c3bbcea4d fix: Move Scope initialization to static block
Signed-off-by: Hemant M Mehta <hemant29mehta@gmail.com>
2025-10-30 12:53:44 +01:00
Hemant M Mehta
53b46f11aa fix: Increase wait timeout to 120s for restartFlowable tests to fix timing issues
Signed-off-by: Hemant M Mehta <hemant29mehta@gmail.com>
2025-10-30 12:53:44 +01:00
Hemant M Mehta
9396e73f5a fix: Update ExecutionServiceTest replayFlowable
Signed-off-by: Hemant M Mehta <hemant29mehta@gmail.com>
2025-10-30 12:53:44 +01:00
Hemant M Mehta
d02b6b0470 fix: static call issue
Signed-off-by: Hemant M Mehta <hemant29mehta@gmail.com>
2025-10-30 12:53:44 +01:00
Hemant M Mehta
bdfd324a7d fix: update to version
Signed-off-by: Hemant M Mehta <hemant29mehta@gmail.com>
2025-10-30 12:53:44 +01:00
Hemant M Mehta
551f6fe033 fix: updated the version
Signed-off-by: Hemant M Mehta <hemant29mehta@gmail.com>
2025-10-30 12:53:44 +01:00
Hemant M Mehta
7a0b3843e1 fix: jq-filter-zip-exception
closes: #11683
2025-10-30 12:53:44 +01:00
Dheeraj_R_Gowda
d713f2753b refactor(ui): convert TemplateEdit.vue to TypeScript using script setup (#12530)
Co-authored-by: Piyush Bhaskar <impiyush0012@gmail.com>
2025-10-30 16:55:35 +05:30
Loïc Mathieu
bc27e0ea9e chore(executions): deprecate the state store for the KVStore 2025-10-30 11:30:36 +01:00
Piyush Bhaskar
08f4b2ea22 fix(core): properly apply the saved filter containing date range. (#12541) 2025-10-30 15:31:43 +05:30
Barthélémy Ledoux
b64168f115 fix: save tabs state onCreate (#12517) 2025-10-30 10:44:53 +01:00
Piyush Bhaskar
b23aa3eb1a fix(core): make popper open fast (#12533) 2025-10-30 12:51:47 +05:30
Piyush Bhaskar
70b5c03fb2 fix(core): remove the scope from Logs and show the chart by default (#12532) 2025-10-30 12:39:22 +05:30
Piyush Bhaskar
094802dd85 fix(filter): make reset and reset to default of pre applied filter robust. (#12508) 2025-10-30 12:32:20 +05:30
brian.mulier
d9144c8c4f feat(core): introduce KV Metadata in-repository storing (#12342)
part of https://github.com/kestra-io/kestra/issues/12341
2025-10-29 17:18:43 +01:00
brian.mulier
b18d304b77 fix(kv): properly serialize durations 2025-10-29 17:18:43 +01:00
brian.mulier
c38cac5a9d fix(tests): concurrency-safe Template emits 2025-10-29 17:18:43 +01:00
brian.mulier
4ed44754ab fix(core): use index by adding deleted everytime in query 2025-10-29 17:18:43 +01:00
Florian Hussonnois
e62baaabe4 fix(core): fix PluginCatalogService resolve method 2025-10-29 17:10:32 +01:00
Nicolas K.
efac416863 feat(core): force telemetry when license requires it (#12512)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-10-29 17:03:45 +01:00
François Delbrayelle
d26956fc89 doc(http/request): fix doc about basic auth (#12510) 2025-10-29 16:56:27 +01:00
Diksha Ajaykumar Nigam
03a5c52445 refactor(ui): convert LeftMenu.vue to TypeScript using script setup (#12440) 2025-10-29 15:49:24 +01:00
Barthélémy Ledoux
290e0c5ded fix: avoid refreshing token when impersonated (#12476) 2025-10-29 15:47:52 +01:00
Ashwini Kumar
1c0e0fd926 refactor(ui): Convert LeftMenuLink.vue to TypeScript (#12431) 2025-10-29 15:27:19 +01:00
YannC
9042e86f12 fix: make sure taskOutputs is never set as a Variables map (#12484)
close #11967
2025-10-29 15:25:09 +01:00
Barthélémy Ledoux
c6be8798d6 fix: show images in editor (#12503)
Co-authored-by: Piyush Bhaskar <102078527+Piyush-r-bhaskar@users.noreply.github.com>
2025-10-29 15:16:23 +01:00
Barthélémy Ledoux
452ac83b01 fix: open tasks from topo in other panel (#12432) 2025-10-29 15:15:33 +01:00
dependabot[bot]
3dd198f036 build(deps): bump software.amazon.awssdk:bom from 2.35.11 to 2.36.3
Bumps software.amazon.awssdk:bom from 2.35.11 to 2.36.3.

---
updated-dependencies:
- dependency-name: software.amazon.awssdk:bom
  dependency-version: 2.36.3
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-10-29 15:01:51 +01:00
Florian Hussonnois
228863d91a fix(test): fix test on FlowValidationTest 2025-10-29 14:19:56 +01:00
dependabot[bot]
8b17a7c36d build(deps): bump dev.langchain4j:langchain4j-community-bom
Bumps [dev.langchain4j:langchain4j-community-bom](https://github.com/langchain4j/langchain4j-community) from 1.7.1-beta14 to 1.8.0-beta15.
- [Release notes](https://github.com/langchain4j/langchain4j-community/releases)
- [Commits](https://github.com/langchain4j/langchain4j-community/compare/1.7.1-beta14...1.8.0-beta15)

---
updated-dependencies:
- dependency-name: dev.langchain4j:langchain4j-community-bom
  dependency-version: 1.8.0-beta15
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-10-29 14:17:02 +01:00
Loïc Mathieu
55a8896181 chore(tests): set FlowControllerTest.updateFlowFlowFromJsonFromString as flaky
It fails often in CI and as the tested endpoint is deprecated it is not important that the test pass.
2025-10-29 14:12:32 +01:00
dependabot[bot]
fc600cc1e3 build(deps): bump dev.langchain4j:langchain4j-bom from 1.7.1 to 1.8.0
Bumps [dev.langchain4j:langchain4j-bom](https://github.com/langchain4j/langchain4j) from 1.7.1 to 1.8.0.
- [Release notes](https://github.com/langchain4j/langchain4j/releases)
- [Commits](https://github.com/langchain4j/langchain4j/compare/1.7.1...1.8.0)

---
updated-dependencies:
- dependency-name: dev.langchain4j:langchain4j-bom
  dependency-version: 1.8.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-10-29 14:11:54 +01:00
dependabot[bot]
fa23081207 build(deps): bump @swc/core-linux-x64-gnu from 1.13.21 to 1.14.0 in /ui (#12494)
Bumps [@swc/core-linux-x64-gnu](https://github.com/swc-project/swc) from 1.13.21 to 1.14.0.
- [Release notes](https://github.com/swc-project/swc/releases)
- [Changelog](https://github.com/swc-project/swc/blob/main/CHANGELOG.md)
- [Commits](https://github.com/swc-project/swc/compare/v1.13.21...v1.14.0)

---
updated-dependencies:
- dependency-name: "@swc/core-linux-x64-gnu"
  dependency-version: 1.14.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-10-29 13:16:38 +01:00
Piyush Bhaskar
2b04192d1b fix(core): avoid hot reload on enter in input (#12490) 2025-10-29 17:46:29 +05:30
dependabot[bot]
b7fbdf8aed build(deps): bump @swc/core-darwin-x64 from 1.13.21 to 1.14.0 in /ui (#12495)
Bumps [@swc/core-darwin-x64](https://github.com/swc-project/swc) from 1.13.21 to 1.14.0.
- [Release notes](https://github.com/swc-project/swc/releases)
- [Changelog](https://github.com/swc-project/swc/blob/main/CHANGELOG.md)
- [Commits](https://github.com/swc-project/swc/compare/v1.13.21...v1.14.0)

---
updated-dependencies:
- dependency-name: "@swc/core-darwin-x64"
  dependency-version: 1.14.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-10-29 13:15:40 +01:00
dependabot[bot]
5a95fcf1ff build(deps): bump @swc/core-darwin-arm64 from 1.13.21 to 1.14.0 in /ui (#12489)
Bumps [@swc/core-darwin-arm64](https://github.com/swc-project/swc) from 1.13.21 to 1.14.0.
- [Release notes](https://github.com/swc-project/swc/releases)
- [Changelog](https://github.com/swc-project/swc/blob/main/CHANGELOG.md)
- [Commits](https://github.com/swc-project/swc/compare/v1.13.21...v1.14.0)

---
updated-dependencies:
- dependency-name: "@swc/core-darwin-arm64"
  dependency-version: 1.14.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-10-29 13:13:27 +01:00
Miloš Paunović
558ca24dac chore(deps): regular dependency update (#12473)
Performing a weekly round of dependency updates in the NPM ecosystem to keep everything up to date.
2025-10-29 13:03:00 +01:00
dependabot[bot]
1ffc60fe07 build(deps): bump org.sonarqube from 7.0.0.6105 to 7.0.1.6134
Bumps org.sonarqube from 7.0.0.6105 to 7.0.1.6134.

---
updated-dependencies:
- dependency-name: org.sonarqube
  dependency-version: 7.0.1.6134
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-10-29 12:39:14 +01:00
Your Name
4cdbb5f57e Remove duplicate test and explicitly use Integer.valueOf in chunkWithIntegerVariable 2025-10-29 12:36:00 +01:00
Your Name
3f27645b3c test(core): add reproducer for Integer to Long casting issue in chunk filter 2025-10-29 12:36:00 +01:00
Your Name
a897618108 fix(core): handle integer size in chunk Pebble filter 2025-10-29 12:36:00 +01:00
Piyush Bhaskar
cb9662cbd7 fix(core): tweaks for dropdown bg and shadow (#12479) 2025-10-29 16:14:27 +05:30
Naveen Gowda MY
c60be5c9f8 feat: add error feedback and validation to login form Fixes #12361 (#12472)
Co-authored-by: Piyush Bhaskar <impiyush0012@gmail.com>
Co-authored-by: Piyush Bhaskar <102078527+Piyush-r-bhaskar@users.noreply.github.com>
2025-10-29 15:44:15 +05:30
dependabot[bot]
ec74c1ae51 build(deps): bump com.mysql:mysql-connector-j from 9.4.0 to 9.5.0
Bumps [com.mysql:mysql-connector-j](https://github.com/mysql/mysql-connector-j) from 9.4.0 to 9.5.0.
- [Changelog](https://github.com/mysql/mysql-connector-j/blob/release/9.x/CHANGES)
- [Commits](https://github.com/mysql/mysql-connector-j/compare/9.4.0...9.5.0)

---
updated-dependencies:
- dependency-name: com.mysql:mysql-connector-j
  dependency-version: 9.5.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-10-29 11:02:06 +01:00
dependabot[bot]
ded9e8c13a build(deps): bump software.amazon.awssdk.crt:aws-crt
Bumps [software.amazon.awssdk.crt:aws-crt](https://github.com/awslabs/aws-crt-java) from 0.39.3 to 0.39.4.
- [Release notes](https://github.com/awslabs/aws-crt-java/releases)
- [Commits](https://github.com/awslabs/aws-crt-java/compare/v0.39.3...v0.39.4)

---
updated-dependencies:
- dependency-name: software.amazon.awssdk.crt:aws-crt
  dependency-version: 0.39.4
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-10-29 11:00:40 +01:00
dependabot[bot]
fcb2d18beb build(deps): bump actions/upload-artifact from 4 to 5
Bumps [actions/upload-artifact](https://github.com/actions/upload-artifact) from 4 to 5.
- [Release notes](https://github.com/actions/upload-artifact/releases)
- [Commits](https://github.com/actions/upload-artifact/compare/v4...v5)

---
updated-dependencies:
- dependency-name: actions/upload-artifact
  dependency-version: '5'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-10-29 11:00:09 +01:00
dependabot[bot]
c3bc919891 build(deps): bump com.google.cloud:libraries-bom from 26.70.0 to 26.71.0
Bumps [com.google.cloud:libraries-bom](https://github.com/googleapis/java-cloud-bom) from 26.70.0 to 26.71.0.
- [Release notes](https://github.com/googleapis/java-cloud-bom/releases)
- [Changelog](https://github.com/googleapis/java-cloud-bom/blob/main/release-please-config.json)
- [Commits](https://github.com/googleapis/java-cloud-bom/compare/v26.70.0...v26.71.0)

---
updated-dependencies:
- dependency-name: com.google.cloud:libraries-bom
  dependency-version: 26.71.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-10-29 10:59:40 +01:00
brian-mulier-p
03542e91f3 fix(core): show tasks in JSON Schema for Switch.cases (#12478)
part of #10508
2025-10-29 10:26:38 +01:00
Florian Hussonnois
958ee1ef8a fix(system): add resolveVersions method to PluginCatalogService
Related-to: kestra-io/kestra-ee#5171
2025-10-29 10:16:59 +01:00
Piyush Bhaskar
a27348b872 feat(core): add support for single datetime as well (#12471) 2025-10-29 13:35:37 +05:30
Roman Acevedo
36aedec8f0 ci: add skip test param to pre-release.yml 2025-10-28 17:53:57 +01:00
Pratik Murari
9499cfc955 refactor: convert RouterMd.vue component to TypeScript (#12429)
Co-authored-by: Barthélémy Ledoux <ledouxb@me.com>
2025-10-28 16:41:32 +01:00
Anna Geller
d3d14a252b feat: add cloud formation template to deploy Kestra (#12412)
* feat: add cloud formation template to deploy Kestra

* Update README.md
2025-10-28 15:45:43 +01:00
Ridham Anand
425af2a530 refactor(layout): convert DefaultLayout.vue to TypeScript using Composition API (#12424)
Co-authored-by: Bart Ledoux <bledoux@kestra.io>
2025-10-28 15:32:57 +01:00
Arya Soni
0bae8cdbe9 Convert ErrorToastContainer.vue component to TS (#12418)
Co-authored-by: Bart Ledoux <bledoux@kestra.io>
2025-10-28 15:28:41 +01:00
Ravi kumar
b9a5a74674 refactor(ui): convert Settings.vue to TS composition API (#12413)
Co-authored-by: Bart Ledoux <bledoux@kestra.io>
2025-10-28 15:24:19 +01:00
Piyush Bhaskar
222fae2a22 fix(core): add dynamic rowKey for selection handling (#12428) 2025-10-28 19:11:14 +05:30
Loïc Mathieu
4502c52d2b fix(executions): remove errors and finally tasks when restarting
Otherwize we would detect that an error or a finally branch is processing and the flowable state would not be correctly taken.

Moreover, it prevent this branch to be taken again after a restart.

Fixes #11731
2025-10-28 14:29:27 +01:00
Florian Hussonnois
153ac27040 fix(flows): KV pebble expressions with input defaults (#12314)
Fixes: #12314
2025-10-28 14:29:03 +01:00
Florian Hussonnois
6361a02deb feat(core): add prefill prop to input to allow nullable value (#11819)
Added a new 'prefill' property for all inputs
to specify an optional UI hint for pre-filling the input,while
allowing the input to be nullable.

Fixes: #11819
2025-10-28 14:21:26 +01:00
Piyush Bhaskar
163e1e2c8b chore(version): bump ui-libs for a fix. (#12425) 2025-10-28 18:20:54 +05:30
(Tum) Poomtum Rattanarat
07b5e89a2f fix(ui): align label form field in no-code editor (#12144)
Co-authored-by: Piyush Bhaskar <impiyush0012@gmail.com>
Co-authored-by: Piyush Bhaskar <102078527+Piyush-r-bhaskar@users.noreply.github.com>
2025-10-28 17:14:57 +05:30
github-actions[bot]
a3ff8f5c2b chore(core): localize to languages other than english (#12423)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-10-28 12:35:33 +01:00
Irfan
4cd369e44d feat(core): add type validation to file inputs (#12176)
Closes https://github.com/kestra-io/kestra/issues/11266.

Co-authored-by: iitzIrFan <irfanlhawk@gmail.com>
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-10-28 12:33:10 +01:00
Carlos Longhi
364540c45a chore(flows): highlight the id field as a link (#12414)
Closes https://github.com/kestra-io/kestra/issues/12365.

Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-10-28 12:22:55 +01:00
Pratik Murari
65b8958fe8 fix(core): use correct formatting for tags in blueprints for list and details view (#12374)
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
Co-authored-by: Piyush Bhaskar <impiyush0012@gmail.com>
2025-10-28 16:40:57 +05:30
Dheeraj_R_Gowda
e9be141463 Fix: update menu background color (#12366) 2025-10-28 16:39:33 +05:30
Omar Moustafa
69804790fb Fix code snippets overlaying main UI in execution outputs (#12371) 2025-10-28 15:42:10 +05:30
Shatrughan
4a524196d4 refactor(core): convert vue component to typescript and composition api (#12416)
Closes https://github.com/kestra-io/kestra/issues/12397.

Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-10-28 11:11:30 +01:00
Barthélémy Ledoux
eeddfc7b1e fix(no-code): When anyof has array with different items (#12419) 2025-10-28 10:31:02 +01:00
Piyush Bhaskar
9f35f05188 feat(filter): introducing redesigned implemention of new filter. (#12265)
Co-authored-by: GitHub Action <actions@github.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: Bart Ledoux <bledoux@kestra.io>
2025-10-28 15:00:03 +05:30
Adinath R
3984e92004 feat(ui): Redesigned the No Execution Flow Page to align with the rest (#12357)
Co-authored-by: Piyush Bhaskar <102078527+Piyush-r-bhaskar@users.noreply.github.com>
Co-authored-by: Piyush Bhaskar <impiyush0012@gmail.com>
2025-10-28 12:07:09 +05:30
Florian Hussonnois
78c01999ad feat(triggers): add inputs property to webhook trigger
Add a new `inputs` property to the Webhook trigger, allowing input
data to be passed to the triggered flow. If no inputs are defined on the trigger,
the flow will not receive any inputs, even if some have default values.

This behavior ensures backward compatibility with how the Webhook trigger currently works.
2025-10-27 17:02:03 +01:00
Ludovic DEHON
ad13a64ccc fix: make dind example working, and add note for ubuntu users 2025-10-27 16:53:32 +01:00
Mohammad Shahid Beigh
b4017e96c3 refactor(core): convert FlowConcurrency.vue to TypeScript with Composition API (#12119)
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
Co-authored-by: Piyush Bhaskar <impiyush0012@gmail.com>
Co-authored-by: Piyush Bhaskar <102078527+Piyush-r-bhaskar@users.noreply.github.com>
2025-10-27 18:31:23 +05:30
Piyush Bhaskar
b12b64fa40 fix(core): keep the selection with refresh or periodic refresh (#12343)
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-10-27 17:30:47 +05:30
SarthakBorude
5b3ebae8e7 chore(core): update the color of addition line highlight for light mode in monaco editor (#12149)
Closes https://github.com/kestra-io/kestra/issues/11956.

Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-10-27 12:31:02 +01:00
Manikanta Pallapothu
516b1fb1c3 chore(core): update design details on news panel (#12155)
Closes https://github.com/kestra-io/kestra/issues/12032.

Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-10-27 11:44:17 +01:00
Pavan YDG
80befa98e9 chore(core): remove the top pagination from table views (#12335)
Closes https://github.com/kestra-io/kestra/issues/12293.

Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-10-27 11:21:43 +01:00
Piyush Bhaskar
322532a955 fix(core): handle namespace removal applied from setting (#12381) 2025-10-27 15:49:10 +05:30
skayliu
70ad7b5fa2 chore(core): clean code for "Warning: [text blocks] will remove trailing spaces" (#10511) 2025-10-27 10:24:26 +01:00
Piyush Bhaskar
1e14f92d6f fix(core): makke flow search reactive (#12376) 2025-10-27 14:06:49 +05:30
Ravi kumar
fb4e2ca950 fix: auto-refresh namespace Files panel after Playground execution completes (#12114)
* fix: ui-playground-namespace-files-refresh

* fix: refresh tab and duplicates file removal fix

* fix: use the loadNodes function to refresh

---------

Co-authored-by: Bart Ledoux <bledoux@kestra.io>
2025-10-26 17:14:36 +05:30
Barthélémy Ledoux
ed352f8a2e fix: avoid multiple dropdowns in file explorer (#12369) 2025-10-26 12:28:28 +01:00
Barthélémy Ledoux
bd8670e9a5 refactor(ui): extract file tree store (#12299) 2025-10-24 15:12:56 +02:00
Akshay Yadav
1e1b954d0a fix(ui/no-code-editor): Style the disabled section like other items (#12064)
Co-authored-by: Barthélémy Ledoux <bledoux@kestra.io>
2025-10-24 14:35:26 +02:00
Miloš Paunović
4c636578ac chore(core): pass prop as a boolean to resolve console warning (#12339) 2025-10-24 13:44:56 +02:00
Abhyshek Bhalaji
0d1ccb2910 chore(core): add use button to the system namespace blueprints tab (#12336)
Closes https://github.com/kestra-io/kestra/issues/12169.

Co-authored-by: MilosPaunovic <paun992@hotmail.com>
2025-10-24 13:30:37 +02:00
Sanjay Ramsinghani
edc4abc80e chore(core): introduce stronger repelling forces in the dependency view graph (#11910)
Closes https://github.com/kestra-io/kestra/issues/11583.

Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-10-24 12:14:22 +02:00
Prayag
ddf5690325 chore(core): prevent blinking on the dependencies page during loading (#11902)
Closes https://github.com/kestra-io/kestra/issues/11125.

Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-10-24 11:54:15 +02:00
brian-mulier-p
25fcf9695a fix(kv): don't throw in KV function with errorOnMissing=false for expired kv (#12321)
closes #12294
2025-10-24 11:20:40 +02:00
Abhyshek Bhalaji
920c614cc0 chore(core): update copilot button styles for accept and decline actions (#12277)
Closes https://github.com/kestra-io/kestra/issues/12057.

Co-authored-by: MilosPaunovic <paun992@hotmail.com>
2025-10-24 09:49:13 +02:00
Miloš Paunović
1dc18fdb66 chore(deps): regular dependency update (#12328)
Performing a weekly round of dependency updates in the NPM ecosystem to keep everything up to date.
2025-10-24 08:36:17 +02:00
Barthélémy Ledoux
86c7b2f6ae fix(ui): avoid eating comments when changing desc (#12316) 2025-10-24 06:57:46 +02:00
Roman Acevedo
296ddb3b19 test(flaky): mark noGroup and flowWaitFailed as flaky 2025-10-23 20:44:27 +02:00
Maru Karthik Reddy
f3befd174c fix(system): replace deprecated setSerializationInclusion with setDefaultPropertyInclusion (#12315)
The method was deprecated in Jackson 2.7 and will be removed in Jackson 3.0.
This is a direct 1:1 replacement with identical serialization behavior.
2025-10-23 16:53:03 +02:00
dependabot[bot]
d09ce90be4 build(deps): bump opensearchRestVersion from 3.3.0 to 3.3.1
Bumps `opensearchRestVersion` from 3.3.0 to 3.3.1.

Updates `org.opensearch.client:opensearch-rest-client` from 3.3.0 to 3.3.1
- [Release notes](https://github.com/opensearch-project/OpenSearch/releases)
- [Changelog](https://github.com/opensearch-project/OpenSearch/blob/main/CHANGELOG.md)
- [Commits](https://github.com/opensearch-project/OpenSearch/compare/3.3.0...3.3.1)

Updates `org.opensearch.client:opensearch-rest-high-level-client` from 3.3.0 to 3.3.1
- [Release notes](https://github.com/opensearch-project/OpenSearch/releases)
- [Changelog](https://github.com/opensearch-project/OpenSearch/blob/main/CHANGELOG.md)
- [Commits](https://github.com/opensearch-project/OpenSearch/compare/3.3.0...3.3.1)

---
updated-dependencies:
- dependency-name: org.opensearch.client:opensearch-rest-client
  dependency-version: 3.3.1
  dependency-type: direct:production
  update-type: version-update:semver-patch
- dependency-name: org.opensearch.client:opensearch-rest-high-level-client
  dependency-version: 3.3.1
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-10-23 16:39:25 +02:00
dependabot[bot]
87e059a76b build(deps): bump protobufVersion from 3.25.5 to 3.25.8
Bumps `protobufVersion` from 3.25.5 to 3.25.8.

Updates `com.google.protobuf:protobuf-java` from 3.25.5 to 3.25.8
- [Release notes](https://github.com/protocolbuffers/protobuf/releases)
- [Changelog](https://github.com/protocolbuffers/protobuf/blob/main/protobuf_release.bzl)
- [Commits](https://github.com/protocolbuffers/protobuf/compare/v3.25.5...v3.25.8)

Updates `com.google.protobuf:protobuf-java-util` from 3.25.5 to 3.25.8

---
updated-dependencies:
- dependency-name: com.google.protobuf:protobuf-java
  dependency-version: 3.25.8
  dependency-type: direct:production
  update-type: version-update:semver-patch
- dependency-name: com.google.protobuf:protobuf-java-util
  dependency-version: 3.25.8
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-10-23 16:38:41 +02:00
Miloš Paunović
e58b271824 chore(deps): regular dependency update (#12268)
Performing a weekly round of dependency updates in the NPM ecosystem to keep everything up to date.
2025-10-23 15:51:42 +02:00
Loïc Mathieu
c1c46da324 fix(deps): dependabot config ignore in the wrong section 2025-10-23 15:48:58 +02:00
Barthélémy Ledoux
de6abc7650 fix: set lang properly with workers (#12286) 2025-10-23 14:56:50 +02:00
François Delbrayelle
6da0a74ac7 build: add plugin-jms in .plugins (temp for conapi) (#12289) 2025-10-23 14:19:11 +02:00
Bala Yokesh Mani A
df755361e1 refactor: remove unused Status component (#12287) 2025-10-23 17:23:55 +05:30
dependabot[bot]
918c026781 build(deps): bump com.github.oshi:oshi-core from 6.9.0 to 6.9.1
Bumps [com.github.oshi:oshi-core](https://github.com/oshi/oshi) from 6.9.0 to 6.9.1.
- [Release notes](https://github.com/oshi/oshi/releases)
- [Changelog](https://github.com/oshi/oshi/blob/master/CHANGELOG.md)
- [Commits](https://github.com/oshi/oshi/compare/oshi-parent-6.9.0...oshi-parent-6.9.1)

---
updated-dependencies:
- dependency-name: com.github.oshi:oshi-core
  dependency-version: 6.9.1
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-10-23 12:37:55 +02:00
Manuj Chadha
e03b1dbcbb fix: connect timeline dots by setting width to 100% in SCSS (#12281) 2025-10-23 15:41:01 +05:30
Loïc Mathieu
25acd73de0 chore(versions): ignore protobuf 4 versions as we still need 3 2025-10-23 12:05:52 +02:00
Loïc Mathieu
68ee7b80a0 chore(system): don't manage Micrometer and Micronaut OpenAPI
Their version from the Micronaut BOM is now recent enought for our usage.

Closes https://github.com/kestra-io/kestra/pull/12222
2025-10-23 12:05:40 +02:00
Nicolas K.
893e8c1a49 feat(flows): add human task (#12276)
* feat(flows): add human task

* clean(flows): move models and validation into plugin packages and move validation logic to the task

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-10-23 11:39:00 +02:00
Barthélémy Ledoux
f0ba570c3d refactor: remove FlowEditor.vue component (#12284) 2025-10-23 10:37:30 +02:00
Aniket Rathod
c2ab63ceba feat(ui): convert Curl.vue to TypeScript using Composition API #12079 (#12261)
Co-authored-by: Bart Ledoux <bledoux@kestra.io>
2025-10-23 10:17:38 +02:00
Shubham Singh
7a126d71e5 refactor(ui): Convert Timeline.vue to TS (#12270)
Co-authored-by: Piyush Bhaskar <impiyush0012@gmail.com>
2025-10-23 13:40:47 +05:30
Dheeraj_R_Gowda
453477ecb9 fix(ui):improved opening animation for Copilot popin (#12156) 2025-10-23 08:54:43 +02:00
Ananya44444
3f83aaa437 Convert InheritedKVs.vue to TypeScript (#12280) 2025-10-23 08:54:20 +02:00
748 changed files with 27856 additions and 21885 deletions

View File

@@ -32,11 +32,6 @@ In the meantime, you can move onto the next step...
### Development:
- (Optional) By default, your dev server will target `localhost:8080`. If your backend is running elsewhere, you can create `.env.development.local` under `ui` folder with this content:
```
VITE_APP_API_URL={myApiUrl}
```
- Navigate into the `ui` folder and run `npm install` to install the dependencies for the frontend project.
- Now go to the `cli/src/main/resources` folder and create a `application-override.yml` file.

View File

@@ -1,5 +1,8 @@
name: Bug report
description: File a bug report
description: Report a bug or unexpected behavior in the project
labels: ["bug", "area/backend", "area/frontend"]
body:
- type: markdown
attributes:
@@ -20,7 +23,3 @@ body:
- Kestra Version: develop
validations:
required: false
labels:
- bug
- area/backend
- area/frontend

View File

@@ -1,4 +1,4 @@
contact_links:
- name: Chat
url: https://kestra.io/slack
about: Chat with us on Slack.
about: Chat with us on Slack

View File

@@ -1,5 +1,8 @@
name: Feature request
description: Create a new feature request
description: Suggest a new feature or improvement to enhance the project
labels: ["enhancement", "area/backend", "area/frontend"]
body:
- type: textarea
attributes:
@@ -7,7 +10,3 @@ body:
placeholder: Tell us more about your feature request. Don't forget to give us a star! ⭐
validations:
required: true
labels:
- enhancement
- area/backend
- area/frontend

View File

@@ -26,6 +26,10 @@ updates:
open-pull-requests-limit: 50
labels:
- "dependency-upgrade"
ignore:
- dependency-name: "com.google.protobuf:*"
# Ignore versions of Protobuf that are equal to or greater than 4.0.0 as Orc still uses 3
versions: [ "[4,)" ]
# Maintain dependencies for NPM modules
- package-ecosystem: "npm"

View File

@@ -64,7 +64,8 @@ jobs:
cd kestra
# Create and push release branch
git checkout -b "$PUSH_RELEASE_BRANCH";
git checkout -B "$PUSH_RELEASE_BRANCH";
git pull origin "$PUSH_RELEASE_BRANCH" --rebase || echo "No existing branch to pull";
git push -u origin "$PUSH_RELEASE_BRANCH";
# Run gradle release

View File

@@ -5,6 +5,15 @@ on:
tags:
- 'v*'
workflow_dispatch:
inputs:
skip-test:
description: 'Skip test'
type: choice
required: true
default: 'false'
options:
- "true"
- "false"
jobs:
build-artifacts:
@@ -14,6 +23,7 @@ jobs:
backend-tests:
name: Backend tests
uses: kestra-io/actions/.github/workflows/kestra-oss-backend-tests.yml@main
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
secrets:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
@@ -23,6 +33,7 @@ jobs:
frontend-tests:
name: Frontend tests
uses: kestra-io/actions/.github/workflows/kestra-oss-frontend-tests.yml@main
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
secrets:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

View File

@@ -13,11 +13,11 @@ on:
required: true
type: boolean
default: false
plugin-version:
description: 'Plugin version'
required: false
type: string
default: "LATEST"
dry-run:
description: 'Dry run mode that will not write or release anything'
required: true
type: boolean
default: false
jobs:
publish-docker:
@@ -25,9 +25,9 @@ jobs:
if: startsWith(github.ref, 'refs/tags/v')
uses: kestra-io/actions/.github/workflows/kestra-oss-publish-docker.yml@main
with:
plugin-version: ${{ inputs.plugin-version }}
retag-latest: ${{ inputs.retag-latest }}
retag-lts: ${{ inputs.retag-lts }}
dry-run: ${{ inputs.dry-run }}
secrets:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}

View File

@@ -43,7 +43,7 @@ jobs:
# Upload dependency check report
- name: Upload dependency check report
uses: actions/upload-artifact@v4
uses: actions/upload-artifact@v5
if: ${{ always() }}
with:
name: dependency-check-report

View File

@@ -66,6 +66,7 @@
#plugin-jdbc:io.kestra.plugin:plugin-jdbc-sybase:LATEST
#plugin-jenkins:io.kestra.plugin:plugin-jenkins:LATEST
#plugin-jira:io.kestra.plugin:plugin-jira:LATEST
#plugin-jms:io.kestra.plugin:plugin-jms:LATEST
#plugin-kafka:io.kestra.plugin:plugin-kafka:LATEST
#plugin-kestra:io.kestra.plugin:plugin-kestra:LATEST
#plugin-kubernetes:io.kestra.plugin:plugin-kubernetes:LATEST

View File

@@ -68,6 +68,12 @@ Kestra is an open-source, event-driven orchestration platform that makes both **
## 🚀 Quick Start
### Launch on AWS (CloudFormation)
Deploy Kestra on AWS using our CloudFormation template:
[![Launch Stack](https://cdn.rawgit.com/buildkite/cloudformation-launch-stack-button-svg/master/launch-stack.svg)](https://console.aws.amazon.com/cloudformation/home#/stacks/create/review?templateURL=https://kestra-deployment-templates.s3.eu-west-3.amazonaws.com/aws/cloudformation/ec2-rds-s3/kestra-oss.yaml&stackName=kestra-oss)
### Get Started Locally in 5 Minutes
#### Launch Kestra in Docker

View File

@@ -21,7 +21,7 @@ plugins {
// test
id "com.adarshr.test-logger" version "4.0.0"
id "org.sonarqube" version "7.0.0.6105"
id "org.sonarqube" version "7.0.1.6134"
id 'jacoco-report-aggregation'
// helper

View File

@@ -7,7 +7,6 @@ import io.kestra.cli.commands.namespaces.NamespaceCommand;
import io.kestra.cli.commands.plugins.PluginCommand;
import io.kestra.cli.commands.servers.ServerCommand;
import io.kestra.cli.commands.sys.SysCommand;
import io.kestra.cli.commands.templates.TemplateCommand;
import io.micronaut.configuration.picocli.MicronautFactory;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
@@ -39,7 +38,6 @@ import java.util.concurrent.Callable;
PluginCommand.class,
ServerCommand.class,
FlowCommand.class,
TemplateCommand.class,
SysCommand.class,
ConfigCommand.class,
NamespaceCommand.class,

View File

@@ -4,6 +4,7 @@ import io.kestra.core.runners.*;
import io.kestra.core.server.Service;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.executor.DefaultExecutor;
import io.kestra.worker.DefaultWorker;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Value;
@@ -49,7 +50,7 @@ public class StandAloneRunner implements Runnable, AutoCloseable {
running.set(true);
poolExecutor = executorsUtils.cachedThreadPool("standalone-runner");
poolExecutor.execute(applicationContext.getBean(ExecutorInterface.class));
poolExecutor.execute(applicationContext.getBean(DefaultExecutor.class));
if (workerEnabled) {
// FIXME: For backward-compatibility with Kestra 0.15.x and earliest we still used UUID for Worker ID instead of IdUtils

View File

@@ -1,36 +0,0 @@
package io.kestra.cli.commands.flows;
import io.kestra.cli.AbstractCommand;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.serializers.YamlParser;
import jakarta.inject.Inject;
import picocli.CommandLine;
import java.nio.file.Files;
import java.nio.file.Path;
@CommandLine.Command(
name = "expand",
description = "Deprecated - expand a flow"
)
@Deprecated
public class FlowExpandCommand extends AbstractCommand {
@CommandLine.Parameters(index = "0", description = "The flow file to expand")
private Path file;
@Inject
private ModelValidator modelValidator;
@Override
public Integer call() throws Exception {
super.call();
stdErr("Warning, this functionality is deprecated and will be removed at some point.");
String content = IncludeHelperExpander.expand(Files.readString(file), file.getParent());
Flow flow = YamlParser.parse(content, Flow.class);
modelValidator.validate(flow);
stdOut(content);
return 0;
}
}

View File

@@ -21,6 +21,8 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import static io.kestra.core.utils.Rethrow.throwFunction;
@CommandLine.Command(
name = "updates",
description = "Create or update flows from a folder, and optionally delete the ones not present",
@@ -41,7 +43,6 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
@Inject
private TenantIdSelectorService tenantIdSelectorService;
@SuppressWarnings("deprecation")
@Override
public Integer call() throws Exception {
super.call();
@@ -50,13 +51,7 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
List<String> flows = files
.filter(Files::isRegularFile)
.filter(YamlParser::isValidExtension)
.map(path -> {
try {
return IncludeHelperExpander.expand(Files.readString(path, Charset.defaultCharset()), path.getParent());
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.map(throwFunction(path -> Files.readString(path, Charset.defaultCharset())))
.toList();
String body = "";

View File

@@ -1,40 +0,0 @@
package io.kestra.cli.commands.flows;
import com.google.common.io.Files;
import lombok.SneakyThrows;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.util.List;
import java.util.stream.Collectors;
@Deprecated
public abstract class IncludeHelperExpander {
public static String expand(String value, Path directory) throws IOException {
return value.lines()
.map(line -> line.contains("[[>") && line.contains("]]") ? expandLine(line, directory) : line)
.collect(Collectors.joining("\n"));
}
@SneakyThrows
private static String expandLine(String line, Path directory) {
String prefix = line.substring(0, line.indexOf("[[>"));
String suffix = line.substring(line.indexOf("]]") + 2, line.length());
String file = line.substring(line.indexOf("[[>") + 3 , line.indexOf("]]")).strip();
Path includePath = directory.resolve(file);
List<String> include = Files.readLines(includePath.toFile(), Charset.defaultCharset());
// handle single line directly with the suffix (should be between quotes or double-quotes
if(include.size() == 1) {
String singleInclude = include.getFirst();
return prefix + singleInclude + suffix;
}
// multi-line will be expanded with the prefix but no suffix
return include.stream()
.map(includeLine -> prefix + includeLine)
.collect(Collectors.joining("\n"));
}
}

View File

@@ -2,7 +2,6 @@ package io.kestra.cli.commands.flows.namespaces;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.commands.AbstractServiceNamespaceUpdateCommand;
import io.kestra.cli.commands.flows.IncludeHelperExpander;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.serializers.YamlParser;
import io.micronaut.core.type.Argument;
@@ -21,6 +20,8 @@ import java.nio.charset.Charset;
import java.nio.file.Files;
import java.util.List;
import static io.kestra.core.utils.Rethrow.throwFunction;
@CommandLine.Command(
name = "update",
description = "Update flows in namespace",
@@ -44,13 +45,7 @@ public class FlowNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCo
List<String> flows = files
.filter(Files::isRegularFile)
.filter(YamlParser::isValidExtension)
.map(path -> {
try {
return IncludeHelperExpander.expand(Files.readString(path, Charset.defaultCharset()), path.getParent());
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.map(throwFunction(path -> Files.readString(path, Charset.defaultCharset())))
.toList();
String body = "";

View File

@@ -1,28 +0,0 @@
package io.kestra.cli.commands.migrations;
import io.kestra.cli.AbstractCommand;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "metadata",
description = "populate metadata for entities"
)
@Slf4j
public class MetadataMigrationCommand extends AbstractCommand {
@Inject
private MetadataMigrationService metadataMigrationService;
@Override
public Integer call() throws Exception {
super.call();
try {
metadataMigrationService.migrateMetadata();
System.out.println("✅ Metadata migration complete.");
return 0;
} catch (Exception e) {
return 1;
}
}
}

View File

@@ -1,11 +0,0 @@
package io.kestra.cli.commands.migrations;
import jakarta.inject.Singleton;
@Singleton
public class MetadataMigrationService {
public int migrateMetadata() {
// no-op
return 0;
}
}

View File

@@ -2,6 +2,7 @@ package io.kestra.cli.commands.migrations;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.cli.commands.migrations.metadata.MetadataMigrationCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

View File

@@ -0,0 +1,30 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.AbstractCommand;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "kv",
description = "populate metadata for KV"
)
@Slf4j
public class KvMetadataMigrationCommand extends AbstractCommand {
@Inject
private MetadataMigrationService metadataMigrationService;
@Override
public Integer call() throws Exception {
super.call();
try {
metadataMigrationService.kvMigration();
} catch (Exception e) {
System.err.println("❌ KV Metadata migration failed: " + e.getMessage());
e.printStackTrace();
return 1;
}
System.out.println("✅ KV Metadata migration complete.");
return 0;
}
}

View File

@@ -0,0 +1,23 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.AbstractCommand;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "metadata",
description = "populate metadata for entities",
subcommands = {
KvMetadataMigrationCommand.class,
SecretsMetadataMigrationCommand.class
}
)
@Slf4j
public class MetadataMigrationCommand extends AbstractCommand {
@Override
public Integer call() throws Exception {
super.call();
return 0;
}
}

View File

@@ -0,0 +1,89 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.core.models.kv.PersistedKvMetadata;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.KvMetadataRepositoryInterface;
import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.kv.InternalKVStore;
import io.kestra.core.storages.kv.KVEntry;
import io.kestra.core.tenant.TenantService;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import static io.kestra.core.utils.Rethrow.throwConsumer;
import static io.kestra.core.utils.Rethrow.throwFunction;
@Singleton
public class MetadataMigrationService {
@Inject
private TenantService tenantService;
@Inject
private FlowRepositoryInterface flowRepository;
@Inject
private KvMetadataRepositoryInterface kvMetadataRepository;
@Inject
private StorageInterface storageInterface;
protected Map<String, List<String>> namespacesPerTenant() {
String tenantId = tenantService.resolveTenant();
return Map.of(tenantId, flowRepository.findDistinctNamespace(tenantId));
}
public void kvMigration() throws IOException {
this.namespacesPerTenant().entrySet().stream()
.flatMap(namespacesForTenant -> namespacesForTenant.getValue().stream().map(namespace -> Map.entry(namespacesForTenant.getKey(), namespace)))
.flatMap(throwFunction(namespaceForTenant -> {
InternalKVStore kvStore = new InternalKVStore(namespaceForTenant.getKey(), namespaceForTenant.getValue(), storageInterface, kvMetadataRepository);
List<FileAttributes> list = listAllFromStorage(storageInterface, namespaceForTenant.getKey(), namespaceForTenant.getValue());
Map<Boolean, List<KVEntry>> entriesByIsExpired = list.stream()
.map(throwFunction(fileAttributes -> KVEntry.from(namespaceForTenant.getValue(), fileAttributes)))
.collect(Collectors.partitioningBy(kvEntry -> Optional.ofNullable(kvEntry.expirationDate()).map(expirationDate -> Instant.now().isAfter(expirationDate)).orElse(false)));
entriesByIsExpired.get(true).forEach(kvEntry -> {
try {
storageInterface.delete(
namespaceForTenant.getKey(),
namespaceForTenant.getValue(),
kvStore.storageUri(kvEntry.key())
);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
return entriesByIsExpired.get(false).stream().map(kvEntry -> PersistedKvMetadata.from(namespaceForTenant.getKey(), kvEntry));
}))
.forEach(throwConsumer(kvMetadata -> {
if (kvMetadataRepository.findByName(kvMetadata.getTenantId(), kvMetadata.getNamespace(), kvMetadata.getName()).isEmpty()) {
kvMetadataRepository.save(kvMetadata);
}
}));
}
public void secretMigration() throws Exception {
throw new UnsupportedOperationException("Secret migration is not needed in the OSS version");
}
private static List<FileAttributes> listAllFromStorage(StorageInterface storage, String tenant, String namespace) throws IOException {
try {
return storage.list(tenant, namespace, URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.kvPrefix(namespace)));
} catch (FileNotFoundException e) {
return Collections.emptyList();
}
}
}

View File

@@ -0,0 +1,30 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.AbstractCommand;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "secrets",
description = "populate metadata for secrets"
)
@Slf4j
public class SecretsMetadataMigrationCommand extends AbstractCommand {
@Inject
private MetadataMigrationService metadataMigrationService;
@Override
public Integer call() throws Exception {
super.call();
try {
metadataMigrationService.secretMigration();
} catch (Exception e) {
System.err.println("❌ Secrets Metadata migration failed: " + e.getMessage());
e.printStackTrace();
return 1;
}
System.out.println("✅ Secrets Metadata migration complete.");
return 0;
}
}

View File

@@ -62,7 +62,7 @@ public class KvUpdateCommand extends AbstractApiCommand {
Duration ttl = expiration == null ? null : Duration.parse(expiration);
MutableHttpRequest<String> request = HttpRequest
.PUT(apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/kv/" + key, value)
.contentType(MediaType.APPLICATION_JSON_TYPE);
.contentType(MediaType.TEXT_PLAIN);
if (ttl != null) {
request.header("ttl", ttl.toString());

View File

@@ -2,7 +2,7 @@ package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.ServerType;
import io.kestra.core.runners.ExecutorInterface;
import io.kestra.core.runners.Executor;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.services.StartExecutorService;
import io.kestra.core.utils.Await;
@@ -64,7 +64,7 @@ public class ExecutorCommand extends AbstractServerCommand {
super.call();
ExecutorInterface executorService = applicationContext.getBean(ExecutorInterface.class);
Executor executorService = applicationContext.getBean(Executor.class);
executorService.run();
Await.until(() -> !this.applicationContext.isRunning());

View File

@@ -7,7 +7,7 @@ import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.ExecutionQueued;
import io.kestra.core.services.ConcurrencyLimitService;
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStorage;
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStateStore;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
@@ -47,7 +47,7 @@ public class SubmitQueuedCommand extends AbstractCommand {
return 1;
}
else if (queueType.get().equals("postgres") || queueType.get().equals("mysql") || queueType.get().equals("h2")) {
var executionQueuedStorage = applicationContext.getBean(AbstractJdbcExecutionQueuedStorage.class);
var executionQueuedStorage = applicationContext.getBean(AbstractJdbcExecutionQueuedStateStore.class);
var concurrencyLimitService = applicationContext.getBean(ConcurrencyLimitService.class);
for (ExecutionQueued queued : executionQueuedStorage.getAllForAllTenants()) {

View File

@@ -1,7 +1,6 @@
package io.kestra.cli.commands.sys;
import io.kestra.cli.commands.sys.database.DatabaseCommand;
import io.kestra.cli.commands.sys.statestore.StateStoreCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.extern.slf4j.Slf4j;
import io.kestra.cli.AbstractCommand;
@@ -16,7 +15,6 @@ import picocli.CommandLine;
ReindexCommand.class,
DatabaseCommand.class,
SubmitQueuedCommand.class,
StateStoreCommand.class
}
)
@Slf4j

View File

@@ -1,27 +0,0 @@
package io.kestra.cli.commands.sys.statestore;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import picocli.CommandLine;
@CommandLine.Command(
name = "state-store",
description = "Manage Kestra State Store",
mixinStandardHelpOptions = true,
subcommands = {
StateStoreMigrateCommand.class,
}
)
public class StateStoreCommand extends AbstractCommand {
@SneakyThrows
@Override
public Integer call() throws Exception {
super.call();
PicocliRunner.call(App.class, "sys", "state-store", "--help");
return 0;
}
}

View File

@@ -1,81 +0,0 @@
package io.kestra.cli.commands.sys.statestore;
import io.kestra.cli.AbstractCommand;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.StateStore;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.Slugify;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
@CommandLine.Command(
name = "migrate",
description = "Migrate old state store files to use the new KV Store implementation.",
mixinStandardHelpOptions = true
)
@Slf4j
public class StateStoreMigrateCommand extends AbstractCommand {
@Inject
private ApplicationContext applicationContext;
@Override
public Integer call() throws Exception {
super.call();
FlowRepositoryInterface flowRepository = this.applicationContext.getBean(FlowRepositoryInterface.class);
StorageInterface storageInterface = this.applicationContext.getBean(StorageInterface.class);
RunContextFactory runContextFactory = this.applicationContext.getBean(RunContextFactory.class);
flowRepository.findAllForAllTenants().stream().map(flow -> Map.entry(flow, List.of(
URI.create("/" + flow.getNamespace().replace(".", "/") + "/" + Slugify.of(flow.getId()) + "/states"),
URI.create("/" + flow.getNamespace().replace(".", "/") + "/states")
))).map(potentialStateStoreUrisForAFlow -> Map.entry(potentialStateStoreUrisForAFlow.getKey(), potentialStateStoreUrisForAFlow.getValue().stream().flatMap(uri -> {
try {
return storageInterface.allByPrefix(potentialStateStoreUrisForAFlow.getKey().getTenantId(), potentialStateStoreUrisForAFlow.getKey().getNamespace(), uri, false).stream();
} catch (IOException e) {
return Stream.empty();
}
}).toList())).forEach(stateStoreFileUrisForAFlow -> stateStoreFileUrisForAFlow.getValue().forEach(stateStoreFileUri -> {
Flow flow = stateStoreFileUrisForAFlow.getKey();
String[] flowQualifierWithStateQualifiers = stateStoreFileUri.getPath().split("/states/");
String[] statesUriPart = flowQualifierWithStateQualifiers[1].split("/");
String stateName = statesUriPart[0];
String taskRunValue = statesUriPart.length > 2 ? statesUriPart[1] : null;
String stateSubName = statesUriPart[statesUriPart.length - 1];
boolean flowScoped = flowQualifierWithStateQualifiers[0].endsWith("/" + flow.getId());
StateStore stateStore = new StateStore(runContext(runContextFactory, flow), false);
try (InputStream is = storageInterface.get(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri)) {
stateStore.putState(flowScoped, stateName, stateSubName, taskRunValue, is.readAllBytes());
storageInterface.delete(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
stdOut("Successfully ran the state-store migration.");
return 0;
}
private RunContext runContext(RunContextFactory runContextFactory, Flow flow) {
Map<String, String> flowVariables = new HashMap<>();
flowVariables.put("tenantId", flow.getTenantId());
flowVariables.put("id", flow.getId());
flowVariables.put("namespace", flow.getNamespace());
return runContextFactory.of(flow, Map.of("flow", flowVariables));
}
}

View File

@@ -1,34 +0,0 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.cli.commands.templates.namespaces.TemplateNamespaceCommand;
import io.kestra.core.models.templates.TemplateEnabled;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "template",
description = "Manage templates",
mixinStandardHelpOptions = true,
subcommands = {
TemplateNamespaceCommand.class,
TemplateValidateCommand.class,
TemplateExportCommand.class,
}
)
@Slf4j
@TemplateEnabled
public class TemplateCommand extends AbstractCommand {
@SneakyThrows
@Override
public Integer call() throws Exception {
super.call();
PicocliRunner.call(App.class, "template", "--help");
return 0;
}
}

View File

@@ -1,61 +0,0 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.templates.TemplateEnabled;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import java.nio.file.Files;
import java.nio.file.Path;
@CommandLine.Command(
name = "export",
description = "Export templates to a ZIP file",
mixinStandardHelpOptions = true
)
@Slf4j
@TemplateEnabled
public class TemplateExportCommand extends AbstractApiCommand {
private static final String DEFAULT_FILE_NAME = "templates.zip";
@Inject
private TenantIdSelectorService tenantService;
@CommandLine.Option(names = {"--namespace"}, description = "The namespace of templates to export")
public String namespace;
@CommandLine.Parameters(index = "0", description = "The directory to export the file to")
public Path directory;
@Override
public Integer call() throws Exception {
super.call();
try(DefaultHttpClient client = client()) {
MutableHttpRequest<Object> request = HttpRequest
.GET(apiUri("/templates/export/by-query", tenantService.getTenantId(tenantId)) + (namespace != null ? "?namespace=" + namespace : ""))
.accept(MediaType.APPLICATION_OCTET_STREAM);
HttpResponse<byte[]> response = client.toBlocking().exchange(this.requestOptions(request), byte[].class);
Path zipFile = Path.of(directory.toString(), DEFAULT_FILE_NAME);
zipFile.toFile().createNewFile();
Files.write(zipFile, response.body());
stdOut("Exporting template(s) for namespace '" + namespace + "' successfully done !");
} catch (HttpClientResponseException e) {
AbstractValidateCommand.handleHttpException(e, "template");
return 1;
}
return 0;
}
}

View File

@@ -1,35 +0,0 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.core.models.validations.ModelValidator;
import jakarta.inject.Inject;
import picocli.CommandLine;
import java.util.Collections;
@CommandLine.Command(
name = "validate",
description = "Validate a template"
)
@TemplateEnabled
public class TemplateValidateCommand extends AbstractValidateCommand {
@Inject
private ModelValidator modelValidator;
@Override
public Integer call() throws Exception {
return this.call(
Template.class,
modelValidator,
(Object object) -> {
Template template = (Template) object;
return template.getNamespace() + " / " + template.getId();
},
(Object object) -> Collections.emptyList(),
(Object object) -> Collections.emptyList()
);
}
}

View File

@@ -1,31 +0,0 @@
package io.kestra.cli.commands.templates.namespaces;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.core.models.templates.TemplateEnabled;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "namespace",
description = "Manage namespace templates",
mixinStandardHelpOptions = true,
subcommands = {
TemplateNamespaceUpdateCommand.class,
}
)
@Slf4j
@TemplateEnabled
public class TemplateNamespaceCommand extends AbstractCommand {
@SneakyThrows
@Override
public Integer call() throws Exception {
super.call();
PicocliRunner.call(App.class, "template", "namespace", "--help");
return 0;
}
}

View File

@@ -1,74 +0,0 @@
package io.kestra.cli.commands.templates.namespaces;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.commands.AbstractServiceNamespaceUpdateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.core.serializers.YamlParser;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import java.nio.file.Files;
import java.util.List;
import jakarta.validation.ConstraintViolationException;
@CommandLine.Command(
name = "update",
description = "Update namespace templates",
mixinStandardHelpOptions = true
)
@Slf4j
@TemplateEnabled
public class TemplateNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCommand {
@Inject
private TenantIdSelectorService tenantService;
@Override
public Integer call() throws Exception {
super.call();
try (var files = Files.walk(directory)) {
List<Template> templates = files
.filter(Files::isRegularFile)
.filter(YamlParser::isValidExtension)
.map(path -> YamlParser.parse(path.toFile(), Template.class))
.toList();
if (templates.isEmpty()) {
stdOut("No template found on '{}'", directory.toFile().getAbsolutePath());
}
try (DefaultHttpClient client = client()) {
MutableHttpRequest<List<Template>> request = HttpRequest
.POST(apiUri("/templates/", tenantService.getTenantIdAndAllowEETenants(tenantId)) + namespace + "?delete=" + delete, templates);
List<UpdateResult> updated = client.toBlocking().retrieve(
this.requestOptions(request),
Argument.listOf(UpdateResult.class)
);
stdOut(updated.size() + " template(s) for namespace '" + namespace + "' successfully updated !");
updated.forEach(template -> stdOut("- " + template.getNamespace() + "." + template.getId()));
} catch (HttpClientResponseException e) {
AbstractValidateCommand.handleHttpException(e, "template");
return 1;
}
} catch (ConstraintViolationException e) {
AbstractValidateCommand.handleException(e, "template");
return 1;
}
return 0;
}
}

View File

@@ -243,6 +243,10 @@ kestra:
ui-anonymous-usage-report:
enabled: true
ui:
charts:
default-duration: P30D
anonymous-usage-report:
enabled: true
uri: https://api.kestra.io/v1/reports/server-events

View File

@@ -14,7 +14,7 @@ import static org.assertj.core.api.Assertions.assertThat;
class FlowDotCommandTest {
@Test
void run() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("flows/same/first.yaml");
URL directory = FlowDotCommandTest.class.getClassLoader().getResource("flows/same/first.yaml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));

View File

@@ -1,41 +0,0 @@
package io.kestra.cli.commands.flows;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
class FlowExpandCommandTest {
@SuppressWarnings("deprecation")
@Test
void run() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
String[] args = {
"src/test/resources/helper/include.yaml"
};
Integer call = PicocliRunner.call(FlowExpandCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).isEqualTo("id: include\n" +
"namespace: io.kestra.cli\n" +
"\n" +
"# The list of tasks\n" +
"tasks:\n" +
"- id: t1\n" +
" type: io.kestra.plugin.core.debug.Return\n" +
" format: \"Lorem ipsum dolor sit amet\"\n" +
"- id: t2\n" +
" type: io.kestra.plugin.core.debug.Return\n" +
" format: |\n" +
" Lorem ipsum dolor sit amet\n" +
" Lorem ipsum dolor sit amet\n");
}
}
}

View File

@@ -61,7 +61,6 @@ class FlowValidateCommandTest {
assertThat(call).isZero();
assertThat(out.toString()).contains("✓ - system / warning");
assertThat(out.toString()).contains("⚠ - tasks[0] is deprecated");
assertThat(out.toString()).contains(" - io.kestra.core.tasks.log.Log is replaced by io.kestra.plugin.core.log.Log");
}
}

View File

@@ -1,62 +0,0 @@
package io.kestra.cli.commands.flows;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateValidateCommandTest {
@Test
void runLocal() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalids/empty.yaml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
String[] args = {
"--local",
directory.getPath()
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse flow");
assertThat(out.toString()).contains("must not be empty");
}
}
@Test
void runServer() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalids/empty.yaml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
directory.getPath()
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse flow");
assertThat(out.toString()).contains("must not be empty");
}
}
}

View File

@@ -0,0 +1,147 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.App;
import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.models.kv.PersistedKvMetadata;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.KvMetadataRepositoryInterface;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.StorageObject;
import io.kestra.core.storages.kv.*;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.log.Log;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.core.annotation.NonNull;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
public class KvMetadataMigrationCommandTest {
@Test
void run() throws IOException, ResourceExpiredException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
ByteArrayOutputStream err = new ByteArrayOutputStream();
System.setErr(new PrintStream(err));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
/* Initial setup:
* - namespace 1: key, description, value
* - namespace 1: expiredKey
* - namespace 2: anotherKey, anotherDescription
* - Nothing in database */
String namespace = TestsUtils.randomNamespace();
String key = "myKey";
StorageInterface storage = ctx.getBean(StorageInterface.class);
String description = "Some description";
String value = "someValue";
putOldKv(storage, namespace, key, description, value);
String anotherNamespace = TestsUtils.randomNamespace();
String anotherKey = "anotherKey";
String anotherDescription = "another description";
putOldKv(storage, anotherNamespace, anotherKey, anotherDescription, "anotherValue");
String tenantId = TenantService.MAIN_TENANT;
// Expired KV should not be migrated + should be purged from the storage
String expiredKey = "expiredKey";
putOldKv(storage, namespace, expiredKey, Instant.now().minus(Duration.ofMinutes(5)), "some expired description", "expiredValue");
assertThat(storage.exists(tenantId, null, getKvStorageUri(namespace, expiredKey))).isTrue();
KvMetadataRepositoryInterface kvMetadataRepository = ctx.getBean(KvMetadataRepositoryInterface.class);
assertThat(kvMetadataRepository.findByName(tenantId, namespace, key).isPresent()).isFalse();
/* Expected outcome from the migration command:
* - no KV has been migrated because no flow exist in the namespace so they are not picked up because we don't know they exist */
String[] kvMetadataMigrationCommand = {
"migrate", "metadata", "kv"
};
PicocliRunner.call(App.class, ctx, kvMetadataMigrationCommand);
assertThat(out.toString()).contains("✅ KV Metadata migration complete.");
// Still it's not in the metadata repository because no flow exist to find that kv
assertThat(kvMetadataRepository.findByName(tenantId, namespace, key).isPresent()).isFalse();
assertThat(kvMetadataRepository.findByName(tenantId, anotherNamespace, anotherKey).isPresent()).isFalse();
// A flow is created from namespace 1, so the KV in this namespace should be migrated
FlowRepositoryInterface flowRepository = ctx.getBean(FlowRepositoryInterface.class);
flowRepository.create(GenericFlow.of(Flow.builder()
.tenantId(tenantId)
.id("a-flow")
.namespace(namespace)
.tasks(List.of(Log.builder().id("log").type(Log.class.getName()).message("logging").build()))
.build()));
/* We run the migration again:
* - namespace 1 KV is seen and metadata is migrated to database
* - namespace 2 KV is not seen because no flow exist in this namespace
* - expiredKey is deleted from storage and not migrated */
out.reset();
PicocliRunner.call(App.class, ctx, kvMetadataMigrationCommand);
assertThat(out.toString()).contains("✅ KV Metadata migration complete.");
Optional<PersistedKvMetadata> foundKv = kvMetadataRepository.findByName(tenantId, namespace, key);
assertThat(foundKv.isPresent()).isTrue();
assertThat(foundKv.get().getDescription()).isEqualTo(description);
assertThat(kvMetadataRepository.findByName(tenantId, anotherNamespace, anotherKey).isPresent()).isFalse();
KVStore kvStore = new InternalKVStore(tenantId, namespace, storage, kvMetadataRepository);
Optional<KVEntry> actualKv = kvStore.get(key);
assertThat(actualKv.isPresent()).isTrue();
assertThat(actualKv.get().description()).isEqualTo(description);
Optional<KVValue> actualValue = kvStore.getValue(key);
assertThat(actualValue.isPresent()).isTrue();
assertThat(actualValue.get().value()).isEqualTo(value);
assertThat(kvMetadataRepository.findByName(tenantId, namespace, expiredKey).isPresent()).isFalse();
assertThat(storage.exists(tenantId, null, getKvStorageUri(namespace, expiredKey))).isFalse();
/* We run one last time the migration without any change to verify that we don't resave an existing metadata.
* It covers the case where user didn't perform the migrate command yet but they played and added some KV from the UI (so those ones will already be in metadata database). */
out.reset();
PicocliRunner.call(App.class, ctx, kvMetadataMigrationCommand);
assertThat(out.toString()).contains("✅ KV Metadata migration complete.");
foundKv = kvMetadataRepository.findByName(tenantId, namespace, key);
assertThat(foundKv.get().getVersion()).isEqualTo(1);
}
}
private static void putOldKv(StorageInterface storage, String namespace, String key, String description, String value) throws IOException {
putOldKv(storage, namespace, key, Instant.now().plus(Duration.ofMinutes(5)), description, value);
}
private static void putOldKv(StorageInterface storage, String namespace, String key, Instant expirationDate, String description, String value) throws IOException {
URI kvStorageUri = getKvStorageUri(namespace, key);
KVValueAndMetadata kvValueAndMetadata = new KVValueAndMetadata(new KVMetadata(description, expirationDate), value);
storage.put(TenantService.MAIN_TENANT, namespace, kvStorageUri, new StorageObject(
kvValueAndMetadata.metadataAsMap(),
new ByteArrayInputStream(JacksonMapper.ofIon().writeValueAsBytes(kvValueAndMetadata.value()))
));
}
private static @NonNull URI getKvStorageUri(String namespace, String key) {
return URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.kvPrefix(namespace) + "/" + key + ".ion");
}
}

View File

@@ -0,0 +1,29 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.App;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
public class SecretsMetadataMigrationCommandTest {
@Test
void run() {
ByteArrayOutputStream err = new ByteArrayOutputStream();
System.setErr(new PrintStream(err));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
String[] secretMetadataMigrationCommand = {
"migrate", "metadata", "secrets"
};
PicocliRunner.call(App.class, ctx, secretMetadataMigrationCommand);
assertThat(err.toString()).contains("❌ Secrets Metadata migration failed: Secret migration is not needed in the OSS version");
}
}
}

View File

@@ -1,27 +0,0 @@
package io.kestra.cli.commands.sys.statestore;
import io.kestra.cli.commands.sys.database.DatabaseCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
class StateStoreCommandTest {
@Test
void runWithNoParam() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
String[] args = {};
Integer call = PicocliRunner.call(StateStoreCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra sys state-store");
}
}
}

View File

@@ -1,75 +0,0 @@
package io.kestra.cli.commands.sys.statestore;
import io.kestra.core.exceptions.MigrationRequiredException;
import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.StateStore;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.Hashing;
import io.kestra.core.utils.Slugify;
import io.kestra.plugin.core.log.Log;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class StateStoreMigrateCommandTest {
@Test
void runMigration() throws IOException, ResourceExpiredException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).environments("test").start()) {
FlowRepositoryInterface flowRepository = ctx.getBean(FlowRepositoryInterface.class);
Flow flow = Flow.builder()
.tenantId("my-tenant")
.id("a-flow")
.namespace("some.valid.namespace." + ((int) (Math.random() * 1000000)))
.tasks(List.of(Log.builder().id("log").type(Log.class.getName()).message("logging").build()))
.build();
flowRepository.create(GenericFlow.of(flow));
StorageInterface storage = ctx.getBean(StorageInterface.class);
String tenantId = flow.getTenantId();
URI oldStateStoreUri = URI.create("/" + flow.getNamespace().replace(".", "/") + "/" + Slugify.of("a-flow") + "/states/my-state/" + Hashing.hashToString("my-taskrun-value") + "/sub-name");
storage.put(
tenantId,
flow.getNamespace(),
oldStateStoreUri,
new ByteArrayInputStream("my-value".getBytes())
);
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isTrue();
RunContext runContext = ctx.getBean(RunContextFactory.class).of(flow, Map.of("flow", Map.of(
"tenantId", tenantId,
"id", flow.getId(),
"namespace", flow.getNamespace()
)));
StateStore stateStore = new StateStore(runContext, true);
Assertions.assertThrows(MigrationRequiredException.class, () -> stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value"));
String[] args = {};
Integer call = PicocliRunner.call(StateStoreMigrateCommand.class, ctx, args);
assertThat(new String(stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value").readAllBytes())).isEqualTo("my-value");
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isFalse();
assertThat(call).isZero();
}
}
}

View File

@@ -1,65 +0,0 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.commands.templates.namespaces.TemplateNamespaceUpdateCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URL;
import java.util.Map;
import java.util.zip.ZipFile;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateExportCommandTest {
@Test
void run() throws IOException {
URL directory = TemplateExportCommandTest.class.getClassLoader().getResource("templates");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
// we use the update command to add templates to extract
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("3 template(s)");
// then we export them
String[] exportArgs = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"--namespace",
"io.kestra.tests",
"/tmp",
};
PicocliRunner.call(TemplateExportCommand.class, ctx, exportArgs);
File file = new File("/tmp/templates.zip");
assertThat(file.exists()).isTrue();
ZipFile zipFile = new ZipFile(file);
assertThat(zipFile.stream().count()).isEqualTo(3L);
file.delete();
}
}
}

View File

@@ -1,61 +0,0 @@
package io.kestra.cli.commands.templates;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateValidateCommandTest {
@Test
void runLocal() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalidsTemplates/template.yml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
String[] args = {
"--local",
directory.getPath()
};
Integer call = PicocliRunner.call(TemplateValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse template");
assertThat(out.toString()).contains("must not be empty");
}
}
@Test
void runServer() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalidsTemplates/template.yml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
directory.getPath()
};
Integer call = PicocliRunner.call(TemplateValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse template");
assertThat(out.toString()).contains("must not be empty");
}
}
}

View File

@@ -1,26 +0,0 @@
package io.kestra.cli.commands.templates.namespaces;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateNamespaceCommandTest {
@Test
void runWithNoParam() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
String[] args = {};
Integer call = PicocliRunner.call(TemplateNamespaceCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra template namespace");
}
}
}

View File

@@ -1,112 +0,0 @@
package io.kestra.cli.commands.templates.namespaces;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateNamespaceUpdateCommandTest {
@Test
void run() {
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("3 template(s)");
}
}
@Test
void invalid() {
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("invalidsTemplates");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
Integer call = PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
// assertThat(call, is(1));
assertThat(out.toString()).contains("Unable to parse templates");
assertThat(out.toString()).contains("must not be empty");
}
}
@Test
void runNoDelete() {
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates");
URL subDirectory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates/templatesSubFolder");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("3 template(s)");
String[] newArgs = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
subDirectory.getPath(),
"--no-delete"
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, newArgs);
assertThat(out.toString()).contains("1 template(s)");
}
}
}

View File

@@ -3,8 +3,8 @@ namespace: system
tasks:
- id: deprecated
type: io.kestra.plugin.core.debug.Echo
format: Hello World
type: io.kestra.plugin.core.log.Log
message: Hello World
- id: alias
type: io.kestra.core.tasks.log.Log
message: I'm an alias

View File

@@ -4,7 +4,6 @@ import io.kestra.core.models.dashboards.Dashboard;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.PluginDefault;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.triggers.AbstractTrigger;
import jakarta.inject.Singleton;
@@ -36,7 +35,6 @@ public class JsonSchemaCache {
public JsonSchemaCache(final JsonSchemaGenerator jsonSchemaGenerator) {
this.jsonSchemaGenerator = Objects.requireNonNull(jsonSchemaGenerator, "JsonSchemaGenerator cannot be null");
registerClassForType(SchemaType.FLOW, Flow.class);
registerClassForType(SchemaType.TEMPLATE, Template.class);
registerClassForType(SchemaType.TASK, Task.class);
registerClassForType(SchemaType.TRIGGER, AbstractTrigger.class);
registerClassForType(SchemaType.PLUGINDEFAULT, PluginDefault.class);

View File

@@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ScheduleCondition;
import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.DataFilterKPI;
import io.kestra.core.models.dashboards.charts.Chart;
@@ -64,7 +63,7 @@ import static io.kestra.core.serializers.JacksonMapper.MAP_TYPE_REFERENCE;
@Singleton
@Slf4j
public class JsonSchemaGenerator {
private static final List<Class<?>> TYPES_RESOLVED_AS_STRING = List.of(Duration.class, LocalTime.class, LocalDate.class, LocalDateTime.class, ZonedDateTime.class, OffsetDateTime.class, OffsetTime.class);
private static final List<Class<?>> SUBTYPE_RESOLUTION_EXCLUSION_FOR_PLUGIN_SCHEMA = List.of(Task.class, AbstractTrigger.class);
@@ -277,8 +276,8 @@ public class JsonSchemaGenerator {
.with(Option.DEFINITION_FOR_MAIN_SCHEMA)
.with(Option.PLAIN_DEFINITION_KEYS)
.with(Option.ALLOF_CLEANUP_AT_THE_END);
// HACK: Registered a custom JsonUnwrappedDefinitionProvider prior to the JacksonModule
// HACK: Registered a custom JsonUnwrappedDefinitionProvider prior to the JacksonModule
// to be able to return an CustomDefinition with an empty node when the ResolvedType can't be found.
builder.forTypesInGeneral().withCustomDefinitionProvider(new JsonUnwrappedDefinitionProvider(){
@Override
@@ -320,7 +319,7 @@ public class JsonSchemaGenerator {
// inline some type
builder.forTypesInGeneral()
.withCustomDefinitionProvider(new CustomDefinitionProviderV2() {
@Override
public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) {
if (javaType.isInstanceOf(Map.class) || javaType.isInstanceOf(Enum.class)) {
@@ -688,15 +687,6 @@ public class JsonSchemaGenerator {
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
.flatMap(clz -> safelyResolveSubtype(declaredType, clz, typeContext).stream())
.toList();
} else if (declaredType.getErasedType() == ScheduleCondition.class) {
return getRegisteredPlugins()
.stream()
.flatMap(registeredPlugin -> registeredPlugin.getConditions().stream())
.filter(ScheduleCondition.class::isAssignableFrom)
.filter(p -> allowedPluginTypes.isEmpty() || allowedPluginTypes.contains(p.getName()))
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
.flatMap(clz -> safelyResolveSubtype(declaredType, clz, typeContext).stream())
.toList();
} else if (declaredType.getErasedType() == TaskRunner.class) {
return getRegisteredPlugins()
.stream()

View File

@@ -6,7 +6,6 @@ import io.kestra.core.utils.Enums;
public enum SchemaType {
FLOW,
TEMPLATE,
TASK,
TRIGGER,
PLUGINDEFAULT,

View File

@@ -91,11 +91,13 @@ public class HttpConfiguration {
@Deprecated
private final String proxyPassword;
@Schema(title = "The username for HTTP basic authentication.")
@Schema(title = "The username for HTTP basic authentication. " +
"Deprecated, use `auth` property with a `BasicAuthConfiguration` instance instead.")
@Deprecated
private final String basicAuthUser;
@Schema(title = "The password for HTTP basic authentication.")
@Schema(title = "The password for HTTP basic authentication. " +
"Deprecated, use `auth` property with a `BasicAuthConfiguration` instance instead.")
@Deprecated
private final String basicAuthPassword;

View File

@@ -0,0 +1,27 @@
package io.kestra.core.lock;
import io.kestra.core.models.HasUID;
import io.kestra.core.utils.IdUtils;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import java.time.Instant;
import java.time.LocalDateTime;
@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Lock implements HasUID {
private String category;
private String id;
private String owner;
private Instant createdAt;
@Override
public String uid() {
return IdUtils.fromParts(this.category, this.id);
}
}

View File

@@ -0,0 +1,13 @@
package io.kestra.core.lock;
import io.kestra.core.exceptions.KestraRuntimeException;
public class LockException extends KestraRuntimeException {
public LockException(String message) {
super(message);
}
public LockException(Throwable cause) {
super(cause);
}
}

View File

@@ -0,0 +1,195 @@
package io.kestra.core.lock;
import io.kestra.core.repositories.LockRepositoryInterface;
import io.kestra.core.server.ServerInstance;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
/**
* This service provides facility for executing Runnable and Callable tasks inside a lock.
* Note: it may be handy to provide a tryLock facility that, if locked, skips executing the Runnable or Callable and exits immediately.
*
* @implNote There is no expiry for locks, so a service may hold a lock infinitely until the service is restarted as the
* liveness mechanism releases all locks when the service is unreachable.
* This may be improved at some point by adding an expiry (for ex 30s) and running a thread that will periodically
* increase the expiry for all exiting locks. This should allow quicker recovery of zombie locks than relying on the liveness mechanism,
* as a service wanted to lock an expired lock would be able to take it over.
*/
@Slf4j
@Singleton
public class LockService {
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(300);
private static final int DEFAULT_SLEEP_MS = 1;
private final LockRepositoryInterface lockRepository;
@Inject
public LockService(LockRepositoryInterface lockRepository) {
this.lockRepository = lockRepository;
}
/**
* Executes a Runnable inside a lock.
* If the lock is already taken, it will wait for at most the default lock timeout of 5mn.
* @see #doInLock(String, String, Duration, Runnable)
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public void doInLock(String category, String id, Runnable runnable) {
doInLock(category, id, DEFAULT_TIMEOUT, runnable);
}
/**
* Executes a Runnable inside a lock.
* If the lock is already taken, it will wait for at most the <code>timeout</code> duration.
* @see #doInLock(String, String, Runnable)
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
* @param timeout how much time to wait for the lock if another process already holds the same lock
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public void doInLock(String category, String id, Duration timeout, Runnable runnable) {
if (!lock(category, id, timeout)) {
throw new LockException("Unable to hold the lock inside the configured timeout of " + timeout);
}
try {
runnable.run();
} finally {
unlock(category, id);
}
}
/**
* Attempts to execute the provided {@code runnable} within a lock.
* If the lock is already held by another process, the execution is skipped.
*
* @param category the category of the lock, e.g., 'executions'
* @param id the identifier of the lock within the specified category, e.g., an execution ID
* @param runnable the task to be executed if the lock is successfully acquired
*/
public void tryLock(String category, String id, Runnable runnable) {
if (lock(category, id, Duration.ZERO)) {
try {
runnable.run();
} finally {
unlock(category, id);
}
} else {
log.debug("Lock '{}'.'{}' already hold, skipping", category, id);
}
}
/**
* Executes a Callable inside a lock.
* If the lock is already taken, it will wait for at most the default lock timeout of 5mn.
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public <T> T callInLock(String category, String id, Callable<T> callable) throws Exception {
return callInLock(category, id, DEFAULT_TIMEOUT, callable);
}
/**
* Executes a Callable inside a lock.
* If the lock is already taken, it will wait for at most the <code>timeout</code> duration.
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
* @param timeout how much time to wait for the lock if another process already holds the same lock
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public <T> T callInLock(String category, String id, Duration timeout, Callable<T> callable) throws Exception {
if (!lock(category, id, timeout)) {
throw new LockException("Unable to hold the lock inside the configured timeout of " + timeout);
}
try {
return callable.call();
} finally {
unlock(category, id);
}
}
/**
* Release all locks hold by this service identifier.
*/
public List<Lock> releaseAllLocks(String serviceId) {
return lockRepository.deleteByOwner(serviceId);
}
/**
* @return true if the lock identified by this category and identifier already exist.
*/
public boolean isLocked(String category, String id) {
return lockRepository.findById(category, id).isPresent();
}
private boolean lock(String category, String id, Duration timeout) throws LockException {
log.debug("Locking '{}'.'{}'", category, id);
long deadline = System.currentTimeMillis() + timeout.toMillis();
do {
Optional<Lock> existing = lockRepository.findById(category, id);
if (existing.isEmpty()) {
// we can try to lock!
Lock newLock = new Lock(category, id, ServerInstance.INSTANCE_ID, Instant.now());
if (lockRepository.create(newLock)) {
return true;
} else {
log.debug("Cannot create the lock, it may have been created after we check for its existence and before we create it");
}
} else {
log.debug("Already locked by: {}", existing.get().getOwner());
}
// fast path for when we don't want to wait for the lock
if (timeout.isZero()) {
return false;
}
try {
Thread.sleep(DEFAULT_SLEEP_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new LockException(e);
}
} while (System.currentTimeMillis() < deadline);
log.debug("Lock already hold, waiting for it to be released");
return false;
}
private void unlock(String category, String id) {
log.debug("Unlocking '{}'.'{}'", category, id);
Optional<Lock> existing = lockRepository.findById(category, id);
if (existing.isEmpty()) {
log.warn("Try to unlock unknown lock '{}'.'{}', ignoring it", category, id);
return;
}
if (!existing.get().getOwner().equals(ServerInstance.INSTANCE_ID)) {
log.warn("Try to unlock a lock we no longer own '{}'.'{}', ignoring it", category, id);
return;
}
lockRepository.deleteById(category, id);
}
}

View File

@@ -0,0 +1,7 @@
package io.kestra.core.models;
public enum FetchVersion {
LATEST,
OLD,
ALL
}

View File

@@ -100,7 +100,7 @@ public record QueryFilter(
LABELS("labels") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS);
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.IN, Op.NOT_IN, Op.CONTAINS);
}
},
FLOW_ID("flowId") {
@@ -109,6 +109,12 @@ public record QueryFilter(
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
}
},
UPDATED("updated") {
@Override
public List<Op> supportedOp() {
return List.of(Op.GREATER_THAN_OR_EQUAL_TO, Op.GREATER_THAN, Op.LESS_THAN_OR_EQUAL_TO, Op.LESS_THAN, Op.EQUALS, Op.NOT_EQUALS);
}
},
START_DATE("startDate") {
@Override
public List<Op> supportedOp() {
@@ -259,6 +265,16 @@ public record QueryFilter(
Field.NAMESPACE
);
}
},
KV_METADATA {
@Override
public List<Field> supportedField() {
return List.of(
Field.QUERY,
Field.NAMESPACE,
Field.UPDATED
);
}
};
public abstract List<Field> supportedField();

View File

@@ -0,0 +1,3 @@
package io.kestra.core.models;
public record TenantAndNamespace(String tenantId, String namespace) {}

View File

@@ -2,7 +2,11 @@ package io.kestra.core.models.conditions;
import io.kestra.core.exceptions.InternalException;
/**
* Conditions of type ScheduleCondition have a special behavior inside the {@link io.kestra.plugin.core.trigger.Schedule} trigger.
* They are evaluated specifically and would be taken into account when computing the next evaluation date.
* Only conditions based on date should be marked as ScheduleCondition.
*/
public interface ScheduleCondition {
boolean test(ConditionContext conditionContext) throws InternalException;
}

View File

@@ -28,6 +28,7 @@ import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.MapUtils;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
@@ -77,10 +78,12 @@ public class Execution implements DeletedInterface, TenantInterface {
@With
@JsonInclude(JsonInclude.Include.NON_EMPTY)
@Schema(implementation = Object.class)
Map<String, Object> inputs;
@With
@JsonInclude(JsonInclude.Include.NON_EMPTY)
@Schema(implementation = Object.class)
Map<String, Object> outputs;
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
@@ -88,6 +91,7 @@ public class Execution implements DeletedInterface, TenantInterface {
List<Label> labels;
@With
@Schema(implementation = Object.class)
Map<String, Object> variables;
@NotNull
@@ -941,7 +945,15 @@ public class Execution implements DeletedInterface, TenantInterface {
for (TaskRun current : taskRuns) {
if (!MapUtils.isEmpty(current.getOutputs())) {
if (current.getIteration() != null) {
taskOutputs = MapUtils.merge(taskOutputs, outputs(current, byIds));
Map<String, Object> merged = MapUtils.merge(taskOutputs, outputs(current, byIds));
// If one of two of the map is null in the merge() method, we just return the other
// And if the not null map is a Variables (= read only), we cast it back to a simple
// hashmap to avoid taskOutputs becoming read-only
// i.e this happen in nested loopUntil tasks
if (merged instanceof Variables) {
merged = new HashMap<>(merged);
}
taskOutputs = merged;
} else {
taskOutputs.putAll(outputs(current, byIds));
}

View File

@@ -9,6 +9,7 @@ import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.utils.IdUtils;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
@@ -55,6 +56,7 @@ public class TaskRun implements TenantInterface {
@With
@JsonInclude(JsonInclude.Include.ALWAYS)
@Nullable
@Schema(implementation = Object.class)
Variables outputs;
@NotNull
@@ -195,17 +197,17 @@ public class TaskRun implements TenantInterface {
taskRunBuilder.attempts = new ArrayList<>();
taskRunBuilder.attempts.add(TaskRunAttempt.builder()
.state(new State(this.state, State.Type.KILLED))
.state(new State(this.state, State.Type.RESUBMITTED))
.build()
);
} else {
ArrayList<TaskRunAttempt> taskRunAttempts = new ArrayList<>(taskRunBuilder.attempts);
TaskRunAttempt lastAttempt = taskRunAttempts.get(taskRunBuilder.attempts.size() - 1);
if (!lastAttempt.getState().isTerminated()) {
taskRunAttempts.set(taskRunBuilder.attempts.size() - 1, lastAttempt.withState(State.Type.KILLED));
taskRunAttempts.set(taskRunBuilder.attempts.size() - 1, lastAttempt.withState(State.Type.RESUBMITTED));
} else {
taskRunAttempts.add(TaskRunAttempt.builder()
.state(new State().withState(State.Type.KILLED))
.state(new State().withState(State.Type.RESUBMITTED))
.build()
);
}

View File

@@ -12,7 +12,6 @@ import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.flows.sla.SLA;
import io.kestra.core.models.listeners.Listener;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
@@ -49,7 +48,7 @@ import java.util.stream.Stream;
public class Flow extends AbstractFlow implements HasUID {
private static final ObjectMapper NON_DEFAULT_OBJECT_MAPPER = JacksonMapper.ofYaml()
.copy()
.setSerializationInclusion(JsonInclude.Include.NON_DEFAULT);
.setDefaultPropertyInclusion(JsonInclude.Include.NON_DEFAULT);
private static final ObjectMapper WITHOUT_REVISION_OBJECT_MAPPER = NON_DEFAULT_OBJECT_MAPPER.copy()
.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true)
@@ -85,10 +84,6 @@ public class Flow extends AbstractFlow implements HasUID {
return this._finally;
}
@Valid
@Deprecated
List<Listener> listeners;
@Valid
List<Task> afterExecution;
@@ -98,20 +93,6 @@ public class Flow extends AbstractFlow implements HasUID {
@Valid
List<PluginDefault> pluginDefaults;
@Valid
List<PluginDefault> taskDefaults;
@Deprecated
public void setTaskDefaults(List<PluginDefault> taskDefaults) {
this.pluginDefaults = taskDefaults;
this.taskDefaults = taskDefaults;
}
@Deprecated
public List<PluginDefault> getTaskDefaults() {
return this.taskDefaults;
}
@Valid
Concurrency concurrency;
@@ -144,7 +125,7 @@ public class Flow extends AbstractFlow implements HasUID {
this.tasks != null ? this.tasks : Collections.<Task>emptyList(),
this.errors != null ? this.errors : Collections.<Task>emptyList(),
this._finally != null ? this._finally : Collections.<Task>emptyList(),
this.afterExecutionTasks()
this.afterExecution != null ? this.afterExecution : Collections.<Task>emptyList()
)
.flatMap(Collection::stream);
}
@@ -245,55 +226,6 @@ public class Flow extends AbstractFlow implements HasUID {
.orElse(null);
}
/**
* @deprecated should not be used
*/
@Deprecated(forRemoval = true, since = "0.21.0")
public Flow updateTask(String taskId, Task newValue) throws InternalException {
Task task = this.findTaskByTaskId(taskId);
Flow flow = this instanceof FlowWithSource flowWithSource ? flowWithSource.toFlow() : this;
Map<String, Object> map = NON_DEFAULT_OBJECT_MAPPER.convertValue(flow, JacksonMapper.MAP_TYPE_REFERENCE);
return NON_DEFAULT_OBJECT_MAPPER.convertValue(
recursiveUpdate(map, task, newValue),
Flow.class
);
}
private static Object recursiveUpdate(Object object, Task previous, Task newValue) {
if (object instanceof Map<?, ?> value) {
if (value.containsKey("id") && value.get("id").equals(previous.getId()) &&
value.containsKey("type") && value.get("type").equals(previous.getType())
) {
return NON_DEFAULT_OBJECT_MAPPER.convertValue(newValue, JacksonMapper.MAP_TYPE_REFERENCE);
} else {
return value
.entrySet()
.stream()
.map(e -> new AbstractMap.SimpleEntry<>(
e.getKey(),
recursiveUpdate(e.getValue(), previous, newValue)
))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
} else if (object instanceof Collection<?> value) {
return value
.stream()
.map(r -> recursiveUpdate(r, previous, newValue))
.toList();
} else {
return object;
}
}
private List<Task> afterExecutionTasks() {
return ListUtils.concat(
ListUtils.emptyOnNull(this.getListeners()).stream().flatMap(listener -> listener.getTasks().stream()).toList(),
this.getAfterExecution()
);
}
public boolean equalsWithoutRevision(FlowInterface o) {
try {
return WITHOUT_REVISION_OBJECT_MAPPER.writeValueAsString(this).equals(WITHOUT_REVISION_OBJECT_MAPPER.writeValueAsString(o));

View File

@@ -136,7 +136,7 @@ public interface FlowInterface extends FlowId, DeletedInterface, TenantInterface
class SourceGenerator {
private static final ObjectMapper NON_DEFAULT_OBJECT_MAPPER = JacksonMapper.ofJson()
.copy()
.setSerializationInclusion(JsonInclude.Include.NON_DEFAULT);
.setDefaultPropertyInclusion(JsonInclude.Include.NON_DEFAULT);
static String generate(final FlowInterface flow) {
try {

View File

@@ -19,7 +19,6 @@ public class FlowWithSource extends Flow {
String source;
@SuppressWarnings("deprecation")
public Flow toFlow() {
return Flow.builder()
.tenantId(this.tenantId)
@@ -34,7 +33,6 @@ public class FlowWithSource extends Flow {
.tasks(this.tasks)
.errors(this.errors)
._finally(this._finally)
.listeners(this.listeners)
.afterExecution(this.afterExecution)
.triggers(this.triggers)
.pluginDefaults(this.pluginDefaults)
@@ -60,7 +58,6 @@ public class FlowWithSource extends Flow {
.build();
}
@SuppressWarnings("deprecation")
public static FlowWithSource of(Flow flow, String source) {
return FlowWithSource.builder()
.tenantId(flow.tenantId)
@@ -76,7 +73,6 @@ public class FlowWithSource extends Flow {
.errors(flow.errors)
._finally(flow._finally)
.afterExecution(flow.afterExecution)
.listeners(flow.listeners)
.triggers(flow.triggers)
.pluginDefaults(flow.pluginDefaults)
.disabled(flow.disabled)

View File

@@ -1,10 +1,10 @@
package io.kestra.core.models.flows;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.kestra.core.models.flows.input.*;
import io.kestra.core.models.property.Property;
import io.kestra.core.validations.InputValidation;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.ConstraintViolationException;
@@ -25,7 +25,6 @@ import lombok.experimental.SuperBuilder;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY)
@JsonSubTypes({
@JsonSubTypes.Type(value = ArrayInput.class, name = "ARRAY"),
@JsonSubTypes.Type(value = BooleanInput.class, name = "BOOLEAN"),
@JsonSubTypes.Type(value = BoolInput.class, name = "BOOL"),
@JsonSubTypes.Type(value = DateInput.class, name = "DATE"),
@JsonSubTypes.Type(value = DateTimeInput.class, name = "DATETIME"),
@@ -36,7 +35,6 @@ import lombok.experimental.SuperBuilder;
@JsonSubTypes.Type(value = JsonInput.class, name = "JSON"),
@JsonSubTypes.Type(value = SecretInput.class, name = "SECRET"),
@JsonSubTypes.Type(value = StringInput.class, name = "STRING"),
@JsonSubTypes.Type(value = EnumInput.class, name = "ENUM"),
@JsonSubTypes.Type(value = SelectInput.class, name = "SELECT"),
@JsonSubTypes.Type(value = TimeInput.class, name = "TIME"),
@JsonSubTypes.Type(value = URIInput.class, name = "URI"),
@@ -44,6 +42,7 @@ import lombok.experimental.SuperBuilder;
@JsonSubTypes.Type(value = YamlInput.class, name = "YAML"),
@JsonSubTypes.Type(value = EmailInput.class, name = "EMAIL"),
})
@InputValidation
public abstract class Input<T> implements Data {
@Schema(
title = "The ID of the input."
@@ -53,9 +52,6 @@ public abstract class Input<T> implements Data {
@Pattern(regexp="^[a-zA-Z0-9][.a-zA-Z0-9_-]*")
String id;
@Deprecated
String name;
@Schema(
title = "The type of the input."
)
@@ -80,20 +76,17 @@ public abstract class Input<T> implements Data {
title = "The default value to use if no value is specified."
)
Property<T> defaults;
@Schema(
title = "The suggested value for the input.",
description = "Optional UI hint for pre-filling the input. Cannot be used together with a default value."
)
Property<T> prefill;
@Schema(
title = "The display name of the input."
)
String displayName;
public abstract void validate(T input) throws ConstraintViolationException;
@JsonSetter
public void setName(String name) {
if (this.id == null) {
this.id = name;
}
this.name = name;
}
}

View File

@@ -236,14 +236,15 @@ public class State {
RETRYING,
RETRIED,
SKIPPED,
BREAKPOINT;
BREAKPOINT,
RESUBMITTED;
public boolean isTerminated() {
return this == Type.FAILED || this == Type.WARNING || this == Type.SUCCESS || this == Type.KILLED || this == Type.CANCELLED || this == Type.RETRIED || this == Type.SKIPPED;
return this == Type.FAILED || this == Type.WARNING || this == Type.SUCCESS || this == Type.KILLED || this == Type.CANCELLED || this == Type.RETRIED || this == Type.SKIPPED || this == Type.RESUBMITTED;
}
public boolean isTerminatedNoFail() {
return this == Type.WARNING || this == Type.SUCCESS || this == Type.RETRIED || this == Type.SKIPPED;
return this == Type.WARNING || this == Type.SUCCESS || this == Type.RETRIED || this == Type.SKIPPED || this == Type.RESUBMITTED;
}
public boolean isCreated() {

View File

@@ -9,11 +9,9 @@ import io.micronaut.core.annotation.Introspected;
@Introspected
public enum Type {
STRING(StringInput.class.getName()),
ENUM(EnumInput.class.getName()),
SELECT(SelectInput.class.getName()),
INT(IntInput.class.getName()),
FLOAT(FloatInput.class.getName()),
BOOLEAN(BooleanInput.class.getName()),
BOOL(BoolInput.class.getName()),
DATETIME(DateTimeInput.class.getName()),
DATE(DateInput.class.getName()),

View File

@@ -1,19 +0,0 @@
package io.kestra.core.models.flows.input;
import io.kestra.core.models.flows.Input;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import jakarta.validation.ConstraintViolationException;
@SuperBuilder
@Getter
@NoArgsConstructor
@Deprecated
public class BooleanInput extends Input<Boolean> {
@Override
public void validate(Boolean input) throws ConstraintViolationException {
// no validation yet
}
}

View File

@@ -1,39 +0,0 @@
package io.kestra.core.models.flows.input;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.validations.Regex;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.constraints.NotNull;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.List;
@SuperBuilder
@Getter
@NoArgsConstructor
@Deprecated
public class EnumInput extends Input<String> {
@Schema(
title = "List of values.",
description = "DEPRECATED; use 'SELECT' instead."
)
@NotNull
List<@Regex String> values;
@Override
public void validate(String input) throws ConstraintViolationException {
if (!values.contains(input) && this.getRequired()) {
throw ManualConstraintViolation.toConstraintViolationException(
"it must match the values `" + values + "`",
this,
EnumInput.class,
getId(),
input
);
}
}
}

View File

@@ -1,10 +1,9 @@
package io.kestra.core.models.flows.input;
import java.util.Set;
import io.kestra.core.models.flows.Input;
import io.kestra.core.validations.FileInputValidation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@@ -18,24 +17,35 @@ import java.util.List;
@FileInputValidation
public class FileInput extends Input<URI> {
private static final String DEFAULT_EXTENSION = ".upl";
public static final String DEFAULT_EXTENSION = ".upl";
@Deprecated(since = "0.24", forRemoval = true)
public String extension;
/**
* List of allowed file extensions (e.g., [".csv", ".txt", ".pdf"]).
* Each extension must start with a dot.
*/
private List<String> allowedFileExtensions;
/**
* Gets the file extension from the URI's path
*/
private String getFileExtension(URI uri) {
String path = uri.getPath();
int lastDotIndex = path.lastIndexOf(".");
return lastDotIndex >= 0 ? path.substring(lastDotIndex).toLowerCase() : "";
}
@Override
public void validate(URI input) throws ConstraintViolationException {
// no validation yet
}
if (input == null || allowedFileExtensions == null || allowedFileExtensions.isEmpty()) {
return;
}
public static String findFileInputExtension(@NotNull final List<Input<?>> inputs, @NotNull final String fileName) {
String res = inputs.stream()
.filter(in -> in instanceof FileInput)
.filter(in -> in.getId().equals(fileName))
.filter(flowInput -> ((FileInput) flowInput).getExtension() != null)
.map(flowInput -> ((FileInput) flowInput).getExtension())
.findFirst()
.orElse(FileInput.DEFAULT_EXTENSION);
return res.startsWith(".") ? res : "." + res;
String extension = getFileExtension(input);
if (!allowedFileExtensions.contains(extension.toLowerCase())) {
throw new ConstraintViolationException(
"File type not allowed. Accepted extensions: " + String.join(", ", allowedFileExtensions),
Set.of()
);
}
}
}

View File

@@ -1,12 +0,0 @@
package io.kestra.core.models.flows.sla;
import java.time.Instant;
import java.util.function.Consumer;
public interface SLAMonitorStorage {
void save(SLAMonitor slaMonitor);
void purge(String executionId);
void processExpired(Instant now, Consumer<SLAMonitor> consumer);
}

View File

@@ -1,34 +0,0 @@
package io.kestra.core.models.hierarchies;
import lombok.Getter;
import java.util.List;
@SuppressWarnings("this-escape")
@Getter
public class CustomGraphCluster extends GraphCluster {
public CustomGraphCluster(String uid, GraphTask rootTask, List<CustomGraphNode> nodes) {
super(rootTask, uid, RelationType.SEQUENTIAL); // TODO should we add a custom relation type?
this.getGraph().addNode(rootTask);
this.addEdge(this.getRoot(), rootTask, new Relation());
this.getGraph().removeEdge(rootTask, this.getFinally());
this.getGraph().removeEdge(rootTask, this.getAfterExecution());
this.getGraph().removeNode(this.getFinally());
this.getGraph().removeNode(this.getAfterExecution());
nodes.forEach(node -> {
this.getGraph().addNode(node);
this.addEdge(rootTask, node, new Relation(RelationType.SEQUENTIAL, null));
this.addEdge(node, this.getEnd(), new Relation());
});
}
@Override
public void updateUidWithChildren(String uid) {
// as children are not "regular children with parent -> child relationship as with flowable task"
// we fall back to the existing UID.
this.uid = uid;
}
}

View File

@@ -1,24 +0,0 @@
package io.kestra.core.models.hierarchies;
import io.kestra.core.models.Plugin;
public class CustomGraphNode extends AbstractGraph {
private final String label;
private final Plugin plugin;
public CustomGraphNode(String uid, String label, Plugin plugin) {
super(uid);
this.label = label;
this.plugin = plugin;
}
@Override
public String getLabel() {
return label;
}
public Plugin getPlugin() {
return plugin;
}
}

View File

@@ -0,0 +1,79 @@
package io.kestra.core.models.kv;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.storages.kv.KVEntry;
import io.kestra.core.utils.IdUtils;
import io.swagger.v3.oas.annotations.Hidden;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import lombok.*;
import lombok.experimental.FieldDefaults;
import lombok.extern.slf4j.Slf4j;
import java.time.Instant;
import java.util.Optional;
@Builder(toBuilder = true)
@Slf4j
@Getter
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
@AllArgsConstructor
@ToString
@EqualsAndHashCode
public class PersistedKvMetadata implements DeletedInterface, TenantInterface, HasUID {
@With
@Hidden
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
private String tenantId;
@NotNull
private String namespace;
@NotNull
private String name;
private String description;
@NotNull
private Integer version;
@Builder.Default
private boolean last = true;
@Nullable
private Instant expirationDate;
@Nullable
private Instant created;
@Nullable
private Instant updated;
private boolean deleted;
public static PersistedKvMetadata from(String tenantId, KVEntry kvEntry) {
return PersistedKvMetadata.builder()
.tenantId(tenantId)
.namespace(kvEntry.namespace())
.name(kvEntry.key())
.version(kvEntry.version())
.description(kvEntry.description())
.created(kvEntry.creationDate())
.updated(kvEntry.updateDate())
.expirationDate(kvEntry.expirationDate())
.build();
}
public PersistedKvMetadata asLast() {
Instant saveDate = Instant.now();
return this.toBuilder().created(Optional.ofNullable(this.created).orElse(saveDate)).updated(saveDate).last(true).build();
}
@Override
public String uid() {
return IdUtils.fromParts(getTenantId(), getNamespace(), getName(), getVersion().toString());
}
}

View File

@@ -1,25 +0,0 @@
package io.kestra.core.models.listeners;
import io.micronaut.core.annotation.Introspected;
import lombok.Builder;
import lombok.Value;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.tasks.Task;
import java.util.List;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
@Value
@Builder
@Introspected
public class Listener {
String description;
@Valid
List<Condition> conditions;
@Valid
@NotEmpty
List<Task> tasks;
}

View File

@@ -54,12 +54,7 @@ public class Property<T> {
private String expression;
private T value;
/**
* @deprecated use {@link #ofExpression(String)} instead.
*/
@Deprecated
// Note: when not used, this constructor would not be deleted but made private so it can only be used by ofExpression(String) and the deserializer
public Property(String expression) {
private Property(String expression) {
this.expression = expression;
}
@@ -123,14 +118,6 @@ public class Property<T> {
return p;
}
/**
* @deprecated use {@link #ofValue(Object)} instead.
*/
@Deprecated
public static <V> Property<V> of(V value) {
return ofValue(value);
}
/**
* Build a new Property object with a Pebble expression.<br>
* <p>

View File

@@ -2,14 +2,8 @@ package io.kestra.core.models.tasks;
import io.kestra.core.models.Plugin;
import io.kestra.core.models.WorkerJobLifecycle;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.hierarchies.AbstractGraph;
import io.kestra.core.models.hierarchies.GraphTask;
import io.kestra.core.models.hierarchies.RelationType;
import io.kestra.core.runners.RunContext;
import java.util.List;
/**
* Interface for tasks that are run in the Worker.
*/
@@ -18,13 +12,4 @@ public interface RunnableTask <T extends Output> extends Plugin, WorkerJobLifecy
* This method is called inside the Worker to run (execute) the task.
*/
T run(RunContext runContext) throws Exception;
/**
* Create the topology representation of a runnable task.
* <p>
* By default, it returns a single GraphTask, tasks may override it to provide a custom topology representation.
*/
default AbstractGraph graph(TaskRun taskRun, List<String> values, RelationType relationType) {
return new GraphTask((Task) this, taskRun, values, relationType);
}
}

View File

@@ -15,31 +15,10 @@ public class TaskException extends Exception {
private transient AbstractLogConsumer logConsumer;
/**
* This constructor will certainly be removed in 0.21 as we keep it only because all task runners must be impacted.
* @deprecated use {@link #TaskException(int, AbstractLogConsumer)} instead.
*/
@Deprecated(forRemoval = true, since = "0.20.0")
public TaskException(int exitCode, int stdOutCount, int stdErrCount) {
this("Command failed with exit code " + exitCode, exitCode, stdOutCount, stdErrCount);
}
public TaskException(int exitCode, AbstractLogConsumer logConsumer) {
this("Command failed with exit code " + exitCode, exitCode, logConsumer);
}
/**
* This constructor will certainly be removed in 0.21 as we keep it only because all task runners must be impacted.
* @deprecated use {@link #TaskException(String, int, AbstractLogConsumer)} instead.
*/
@Deprecated(forRemoval = true, since = "0.20.0")
public TaskException(String message, int exitCode, int stdOutCount, int stdErrCount) {
super(message);
this.exitCode = exitCode;
this.stdOutCount = stdOutCount;
this.stdErrCount = stdErrCount;
}
public TaskException(String message, int exitCode, AbstractLogConsumer logConsumer) {
super(message);
this.exitCode = exitCode;

View File

@@ -1,156 +0,0 @@
package io.kestra.core.models.templates;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotatedMember;
import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.IdUtils;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.Hidden;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.util.*;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
@SuperBuilder(toBuilder = true)
@Getter
@AllArgsConstructor
@NoArgsConstructor
@Introspected
@ToString
@EqualsAndHashCode
public class Template implements DeletedInterface, TenantInterface, HasUID {
private static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml().copy()
.setAnnotationIntrospector(new JacksonAnnotationIntrospector() {
@Override
public boolean hasIgnoreMarker(final AnnotatedMember m) {
List<String> exclusions = Arrays.asList("revision", "deleted", "source");
return exclusions.contains(m.getName()) || super.hasIgnoreMarker(m);
}
})
.setSerializationInclusion(JsonInclude.Include.NON_DEFAULT);
@Setter
@Hidden
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
private String tenantId;
@NotNull
@NotBlank
@Pattern(regexp = "^[a-zA-Z0-9][a-zA-Z0-9._-]*")
private String id;
@NotNull
@Pattern(regexp="^[a-z0-9][a-z0-9._-]*")
private String namespace;
String description;
@Valid
@NotEmpty
private List<Task> tasks;
@Valid
private List<Task> errors;
@Valid
@JsonProperty("finally")
@Getter(AccessLevel.NONE)
protected List<Task> _finally;
public List<Task> getFinally() {
return this._finally;
}
@NotNull
@Builder.Default
private final boolean deleted = false;
/** {@inheritDoc **/
@Override
@JsonIgnore
public String uid() {
return Template.uid(
this.getTenantId(),
this.getNamespace(),
this.getId()
);
}
@JsonIgnore
public static String uid(String tenantId, String namespace, String id) {
return IdUtils.fromParts(
tenantId,
namespace,
id
);
}
public Optional<ConstraintViolationException> validateUpdate(Template updated) {
Set<ConstraintViolation<?>> violations = new HashSet<>();
if (!updated.getId().equals(this.getId())) {
violations.add(ManualConstraintViolation.of(
"Illegal template id update",
updated,
Template.class,
"template.id",
updated.getId()
));
}
if (!updated.getNamespace().equals(this.getNamespace())) {
violations.add(ManualConstraintViolation.of(
"Illegal namespace update",
updated,
Template.class,
"template.namespace",
updated.getNamespace()
));
}
if (!violations.isEmpty()) {
return Optional.of(new ConstraintViolationException(violations));
} else {
return Optional.empty();
}
}
public String generateSource() {
try {
return YAML_MAPPER.writeValueAsString(this);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public Template toDeleted() {
return new Template(
this.tenantId,
this.id,
this.namespace,
this.description,
this.tasks,
this.errors,
this._finally,
true
);
}
}

View File

@@ -1,15 +0,0 @@
package io.kestra.core.models.templates;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.util.StringUtils;
import java.lang.annotation.*;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PACKAGE, ElementType.TYPE})
@Requires(property = "kestra.templates.enabled", value = StringUtils.TRUE, defaultValue = StringUtils.FALSE)
@Inherited
public @interface TemplateEnabled {
}

View File

@@ -1,18 +0,0 @@
package io.kestra.core.models.templates;
import io.micronaut.core.annotation.Introspected;
import lombok.*;
import lombok.experimental.SuperBuilder;
import lombok.extern.jackson.Jacksonized;
@SuperBuilder
@Getter
@AllArgsConstructor
@NoArgsConstructor
@Introspected
@ToString
@EqualsAndHashCode
public class TemplateSource extends Template {
String source;
String exception;
}

View File

@@ -1,5 +1,6 @@
package io.kestra.core.plugins;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Builder;
import java.io.File;
@@ -33,7 +34,7 @@ public record PluginArtifact(
String version,
URI uri
) implements Comparable<PluginArtifact> {
private static final Pattern ARTIFACT_PATTERN = Pattern.compile(
"([^: ]+):([^: ]+)(:([^: ]*)(:([^: ]+))?)?:([^: ]+)"
);
@@ -42,7 +43,8 @@ public record PluginArtifact(
);
public static final String JAR_EXTENSION = "jar";
public static final String KESTRA_GROUP_ID = "io.kestra";
/**
* Static helper method for constructing a new {@link PluginArtifact} from a JAR file.
*
@@ -135,6 +137,11 @@ public record PluginArtifact(
public String toString() {
return toCoordinates();
}
@JsonIgnore
public boolean isOfficial() {
return groupId.startsWith(KESTRA_GROUP_ID);
}
public String toCoordinates() {
return Stream.of(groupId, artifactId, extension, classifier, version)

View File

@@ -1,9 +1,13 @@
package io.kestra.core.plugins;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.Version;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpMethod;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.HttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -15,9 +19,12 @@ import java.util.Base64;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Services for retrieving available plugin artifacts for Kestra.
@@ -39,6 +46,8 @@ public class PluginCatalogService {
private final boolean icons;
private final boolean oss;
private final Version currentStableVersion;
/**
* Creates a new {@link PluginCatalogService} instance.
@@ -53,11 +62,55 @@ public class PluginCatalogService {
this.httpClient = httpClient;
this.icons = icons;
this.oss = communityOnly;
Version version = Version.of(KestraContext.getContext().getVersion());
this.currentStableVersion = new Version(version.majorVersion(), version.minorVersion(), version.patchVersion(), null);
// Immediately trigger an async load of plugin artifacts.
this.isLoaded.set(true);
this.plugins = CompletableFuture.supplyAsync(this::load);
}
/**
* Resolves the version for the given artifacts.
*
* @param artifacts The list of artifacts to resolve.
* @return The list of results.
*/
public List<PluginResolutionResult> resolveVersions(List<PluginArtifact> artifacts) {
if (ListUtils.isEmpty(artifacts)) {
return List.of();
}
final Map<String, ApiPluginArtifact> pluginsByGroupAndArtifactId = getAllCompatiblePlugins().stream()
.collect(Collectors.toMap(it -> it.groupId() + ":" + it.artifactId(), Function.identity()));
return artifacts.stream().map(it -> {
// Get all compatible versions for current artifact
List<String> versions = Optional
.ofNullable(pluginsByGroupAndArtifactId.get(it.groupId() + ":" + it.artifactId()))
.map(ApiPluginArtifact::versions)
.orElse(List.of());
// Try to resolve the version
String resolvedVersion = null;
if (!versions.isEmpty()) {
if (it.version().equalsIgnoreCase("LATEST")) {
resolvedVersion = versions.getFirst();
} else {
resolvedVersion = versions.contains(it.version()) ? it.version() : null;
}
}
// Build the PluginResolutionResult
return new PluginResolutionResult(
it,
resolvedVersion,
versions,
resolvedVersion != null
);
}).toList();
}
public synchronized List<PluginManifest> get() {
try {
@@ -140,7 +193,27 @@ public class PluginCatalogService {
isLoaded.set(false);
}
}
private List<ApiPluginArtifact> getAllCompatiblePlugins() {
MutableHttpRequest<Object> request = HttpRequest.create(
HttpMethod.GET,
"/v1/plugins/artifacts/core-compatibility/" + currentStableVersion
);
if (oss) {
request.getParameters().add("license", "OPENSOURCE");
}
try {
return httpClient
.toBlocking()
.exchange(request, Argument.listOf(ApiPluginArtifact.class))
.body();
} catch (Exception e) {
log.debug("Failed to retrieve available plugins from Kestra API. Cause: ", e);
return List.of();
}
}
public record PluginManifest(
String title,
String icon,
@@ -153,4 +226,11 @@ public class PluginCatalogService {
return groupId + ":" + artifactId + ":LATEST";
}
}
public record ApiPluginArtifact(
String groupId,
String artifactId,
String license,
List<String> versions
) {}
}

View File

@@ -5,22 +5,19 @@ import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.runners.*;
public interface QueueFactoryInterface {
String EXECUTION_NAMED = "executionQueue";
String EXECUTOR_NAMED = "executorQueue";
String EXECUTION_EVENT_NAMED = "executionEventQueue";
String WORKERJOB_NAMED = "workerJobQueue";
String WORKERTASKRESULT_NAMED = "workerTaskResultQueue";
String WORKERTRIGGERRESULT_NAMED = "workerTriggerResultQueue";
String FLOW_NAMED = "flowQueue";
String TEMPLATE_NAMED = "templateQueue";
String WORKERTASKLOG_NAMED = "workerTaskLogQueue";
String METRIC_QUEUE = "workerTaskMetricQueue";
String KILL_NAMED = "executionKilledQueue";
String WORKERINSTANCE_NAMED = "workerInstanceQueue";
String WORKERJOBRUNNING_NAMED = "workerJobRunningQueue";
String TRIGGER_NAMED = "triggerQueue";
String SUBFLOWEXECUTIONRESULT_NAMED = "subflowExecutionResultQueue";
@@ -30,7 +27,7 @@ public interface QueueFactoryInterface {
QueueInterface<Execution> execution();
QueueInterface<Executor> executor();
QueueInterface<ExecutionEvent> executionEvent();
WorkerJobQueueInterface workerJob();
@@ -46,10 +43,6 @@ public interface QueueFactoryInterface {
QueueInterface<ExecutionKilled> kill();
QueueInterface<Template> template();
QueueInterface<WorkerInstance> workerInstance();
QueueInterface<WorkerJobRunning> workerJobRunning();
QueueInterface<Trigger> trigger();

View File

@@ -35,6 +35,24 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
void delete(String consumerGroup, T message) throws QueueException;
/**
* Delete all messages of the queue for this key.
* This is used to purge a queue for a specific key.
* A queue implementation may omit to implement it and purge records differently.
*/
default void deleteByKey(String key) throws QueueException {
// by default do nothing
}
/**
* Delete all messages of the queue for a set of keys.
* This is used to purge a queue for specific keys.
* A queue implementation may omit to implement it and purge records differently.
*/
default void deleteByKeys(List<String> keys) throws QueueException {
// by default do nothing
}
default Runnable receive(Consumer<Either<T, DeserializationException>> consumer) {
return receive(null, consumer, false);
}
@@ -54,4 +72,20 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
}
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate);
default Runnable receiveBatch(Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer) {
return receiveBatch(null, queueType, consumer);
}
default Runnable receiveBatch(String consumerGroup, Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer) {
return receiveBatch(consumerGroup, queueType, consumer, true);
}
/**
* Consumer a batch of messages.
* By default, it consumes a single message, a queue implementation may implement it to support batch consumption.
*/
default Runnable receiveBatch(String consumerGroup, Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer, boolean forUpdate) {
return receive(consumerGroup, either -> consumer.accept(List.of(either)), forUpdate);
}
}

View File

@@ -19,12 +19,8 @@ public class QueueService {
return ((SubflowExecution<?>) object).getExecution().getId();
} else if (object.getClass() == SubflowExecutionResult.class) {
return ((SubflowExecutionResult) object).getExecutionId();
} else if (object.getClass() == ExecutorState.class) {
return ((ExecutorState) object).getExecutionId();
} else if (object.getClass() == Setting.class) {
return ((Setting) object).getKey();
} else if (object.getClass() == Executor.class) {
return ((Executor) object).getExecution().getId();
} else if (object.getClass() == MetricEntry.class) {
return null;
} else if (object.getClass() == SubflowExecutionEnd.class) {

View File

@@ -14,19 +14,19 @@ import java.time.Instant;
@Requires(property = "kestra.server-type")
@Slf4j
public class ReportableScheduler {
private final ReportableRegistry registry;
private final ServerEventSender sender;
private final Clock clock;
@Inject
public ReportableScheduler(ReportableRegistry registry, ServerEventSender sender) {
this.registry = registry;
this.sender = sender;
this.clock = Clock.systemDefaultZone();
}
@Scheduled(fixedDelay = "5m", initialDelay = "${kestra.anonymous-usage-report.initial-delay}")
@Scheduled(fixedDelay = "5m", initialDelay = "${kestra.anonymous-usage-report.initial-delay:5m}")
public void tick() {
Instant now = clock.instant();
for (Reportable<?> r : registry.getAll()) {

View File

@@ -18,6 +18,7 @@ import io.micronaut.http.hateoas.JsonError;
import io.micronaut.reactor.http.client.ReactorHttpClient;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.net.URI;
@@ -28,29 +29,30 @@ import java.util.UUID;
@Singleton
@Slf4j
public class ServerEventSender {
private static final String SESSION_UUID = IdUtils.create();
private static final ObjectMapper OBJECT_MAPPER = JacksonMapper.ofJson();
@Inject
@Client
private ReactorHttpClient client;
@Inject
private VersionProvider versionProvider;
@Inject
private InstanceService instanceService;
private final ServerType serverType;
@Value("${kestra.anonymous-usage-report.uri}")
@Setter
@Value("${kestra.anonymous-usage-report.uri:'https://api.kestra.io/v1/reports/server-events'}")
protected URI url;
public ServerEventSender( ) {
this.serverType = KestraContext.getContext().getServerType();
}
public void send(final Instant now, final Type type, Object event) {
ServerEvent serverEvent = ServerEvent
.builder()
@@ -65,11 +67,11 @@ public class ServerEventSender {
.build();
try {
MutableHttpRequest<ServerEvent> request = this.request(serverEvent, type);
if (log.isTraceEnabled()) {
log.trace("Report anonymous usage: '{}'", OBJECT_MAPPER.writeValueAsString(serverEvent));
}
this.handleResponse(client.toBlocking().retrieve(request, Argument.of(Result.class), Argument.of(JsonError.class)));
} catch (HttpClientResponseException t) {
log.trace("Unable to report anonymous usage with body '{}'", t.getResponse().getBody(String.class), t);
@@ -77,11 +79,11 @@ public class ServerEventSender {
log.trace("Unable to handle anonymous usage", t);
}
}
private void handleResponse (Result result){
}
protected MutableHttpRequest<ServerEvent> request(ServerEvent event, Type type) throws Exception {
URI baseUri = URI.create(this.url.toString().endsWith("/") ? this.url.toString() : this.url + "/");
URI resolvedUri = baseUri.resolve(type.name().toLowerCase());

View File

@@ -0,0 +1,25 @@
package io.kestra.core.repositories;
import io.kestra.core.runners.ConcurrencyLimit;
import jakarta.validation.constraints.NotNull;
import java.util.List;
import java.util.Optional;
public interface ConcurrencyLimitRepositoryInterface {
/**
* Update a concurrency limit
* WARNING: this is inherently unsafe and must only be used for administration
*/
ConcurrencyLimit update(ConcurrencyLimit concurrencyLimit);
/**
* Returns all concurrency limits from the database for a given tenant
*/
List<ConcurrencyLimit> find(String tenantId);
/**
* Find a concurrency limit by its id
*/
Optional<ConcurrencyLimit> findById(@NotNull String tenantId, @NotNull String namespace, @NotNull String flowId);
}

View File

@@ -2,7 +2,6 @@ package io.kestra.core.repositories;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.statistics.DailyExecutionStatistics;
import io.kestra.core.models.executions.statistics.ExecutionCount;
import io.kestra.core.models.executions.statistics.Flow;

View File

@@ -1,6 +1,7 @@
package io.kestra.core.repositories;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.topologies.FlowTopology;
import java.util.List;
@@ -13,4 +14,6 @@ public interface FlowTopologyRepositoryInterface {
List<FlowTopology> findAll(String tenantId);
FlowTopology save(FlowTopology flowTopology);
void save(FlowInterface flow, List<FlowTopology> flowTopologies);
}

View File

@@ -0,0 +1,48 @@
package io.kestra.core.repositories;
import io.kestra.core.models.FetchVersion;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.kv.PersistedKvMetadata;
import io.micronaut.data.model.Pageable;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
public interface KvMetadataRepositoryInterface extends SaveRepositoryInterface<PersistedKvMetadata> {
Optional<PersistedKvMetadata> findByName(
String tenantId,
String namespace,
String name
) throws IOException;
default ArrayListTotal<PersistedKvMetadata> find(
Pageable pageable,
String tenantId,
List<QueryFilter> filters,
boolean allowDeleted,
boolean allowExpired
) {
return this.find(pageable, tenantId, filters, allowDeleted, allowExpired, FetchVersion.LATEST);
}
ArrayListTotal<PersistedKvMetadata> find(
Pageable pageable,
String tenantId,
List<QueryFilter> filters,
boolean allowDeleted,
boolean allowExpired,
FetchVersion fetchBehavior
);
default PersistedKvMetadata delete(PersistedKvMetadata persistedKvMetadata) throws IOException {
return this.save(persistedKvMetadata.toBuilder().deleted(true).build());
}
/**
* Purge (hard delete) a list of persisted kv metadata. If no version is specified, all versions are purged.
* @param persistedKvsMetadata the list of persisted kv metadata to purge
* @return the number of purged persisted kv metadata
*/
Integer purge(List<PersistedKvMetadata> persistedKvsMetadata);
}

View File

@@ -0,0 +1,24 @@
package io.kestra.core.repositories;
import io.kestra.core.lock.Lock;
import java.util.List;
import java.util.Optional;
/**
* Low lever repository for locks.
* It should never be used directly but only via the {@link io.kestra.core.lock.LockService}.
*/
public interface LockRepositoryInterface {
Optional<Lock> findById(String category, String id);
boolean create(Lock newLock);
default void delete(Lock existing) {
deleteById(existing.getCategory(), existing.getId());
}
void deleteById(String category, String id);
List<Lock> deleteByOwner(String owner);
}

View File

@@ -5,7 +5,5 @@ import java.util.List;
public interface SaveRepositoryInterface<T> {
T save(T item);
default int saveBatch(List<T> items) {
throw new UnsupportedOperationException();
}
int saveBatch(List<T> items);
}

View File

@@ -1,7 +1,9 @@
package io.kestra.core.repositories;
import io.kestra.core.runners.TransactionContext;
import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceInstance;
import io.kestra.core.server.ServiceStateTransition;
import io.kestra.core.server.ServiceType;
import io.micronaut.data.model.Pageable;
@@ -9,6 +11,7 @@ import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Function;
/**
@@ -58,20 +61,6 @@ public interface ServiceInstanceRepositoryInterface {
*/
ServiceInstance save(ServiceInstance service);
/**
* Finds all service instances which are in the given state.
*
* @return the list of {@link ServiceInstance}.
*/
List<ServiceInstance> findAllInstancesInState(final Service.ServiceState state);
/**
* Finds all service instances which are in the given state.
*
* @return the list of {@link ServiceInstance}.
*/
List<ServiceInstance> findAllInstancesInStates(final Set<Service.ServiceState> states);
/**
* Finds all service active instances between the given dates.
*
@@ -84,6 +73,28 @@ public interface ServiceInstanceRepositoryInterface {
final Instant from,
final Instant to);
/**
* Finds all service instances which are NOT {@link Service.ServiceState#RUNNING}, then process them using the consumer.
*/
void processAllNonRunningInstances(BiConsumer<TransactionContext, ServiceInstance> consumer);
/**
* Attempt to transition the state of a given service to a given new state.
* This method may not update the service if the transition is not valid.
*
* @param instance the service instance.
* @param newState the new state of the service.
* @return an optional of the {@link ServiceInstance} or {@link Optional#empty()} if the service is not running.
*/
ServiceStateTransition.Response mayTransitServiceTo(final TransactionContext txContext,
final ServiceInstance instance,
final Service.ServiceState newState,
final String reason);
/**
* Finds all service instances that are in the states, then process them using the consumer.
*/
void processInstanceInStates(Set<Service.ServiceState> states, BiConsumer<TransactionContext, ServiceInstance> consumer);
/**
* Purge all instances in the EMPTY state older than the until date.
*

View File

@@ -1,42 +0,0 @@
package io.kestra.core.repositories;
import io.micronaut.data.model.Pageable;
import io.kestra.core.models.templates.Template;
import java.util.List;
import java.util.Optional;
import jakarta.annotation.Nullable;
public interface TemplateRepositoryInterface {
Optional<Template> findById(String tenantId, String namespace, String id);
List<Template> findAll(String tenantId);
List<Template> findAllWithNoAcl(String tenantId);
List<Template> findAllForAllTenants();
ArrayListTotal<Template> find(
Pageable pageable,
@Nullable String query,
@Nullable String tenantId,
@Nullable String namespace
);
// Should normally be TemplateWithSource but it didn't exist yet
List<Template> find(
@Nullable String query,
@Nullable String tenantId,
@Nullable String namespace
);
List<Template> findByNamespace(String tenantId, String namespace);
Template create(Template template);
Template update(Template template, Template previous);
void delete(Template template);
List<String> findDistinctNamespace(String tenantId);
}

View File

@@ -1,13 +0,0 @@
package io.kestra.core.repositories;
import io.kestra.core.runners.WorkerJobRunning;
import java.util.Optional;
public interface WorkerJobRunningRepositoryInterface {
Optional<WorkerJobRunning> findByKey(String uid);
void deleteByKey(String uid);
}

View File

@@ -1,9 +1,11 @@
package io.kestra.core.runners;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.services.PluginDefaultService;
import jakarta.inject.Singleton;
import java.util.Objects;
import lombok.Setter;
@@ -15,12 +17,14 @@ import java.util.Optional;
@Singleton
public class DefaultFlowMetaStore implements FlowMetaStoreInterface {
private final FlowRepositoryInterface flowRepository;
private final PluginDefaultService pluginDefaultService;
@Setter
private List<FlowWithSource> allFlows;
public DefaultFlowMetaStore(FlowListenersInterface flowListeners, FlowRepositoryInterface flowRepository) {
public DefaultFlowMetaStore(FlowListenersInterface flowListeners, FlowRepositoryInterface flowRepository, PluginDefaultService pluginDefaultService) {
this.flowRepository = flowRepository;
this.pluginDefaultService = pluginDefaultService;
flowListeners.listen(flows -> allFlows = flows);
}
@@ -53,4 +57,9 @@ public class DefaultFlowMetaStore implements FlowMetaStoreInterface {
public Boolean isReady() {
return true;
}
@Override
public Optional<FlowWithSource> findByExecutionThenInjectDefaults(Execution execution) {
return findByExecution(execution).map(it -> pluginDefaultService.injectDefaults(it, execution));
}
}

Some files were not shown because too many files have changed in this diff Show More