Compare commits

...

148 Commits

Author SHA1 Message Date
Loïc Mathieu
8746c24170 chore(system): extract queue consumers processing into message handlers 2025-11-19 11:40:56 +01:00
Loïc Mathieu
a09f61fcfd fix(flow): flow trigger with both conditions and preconditions
When a flow have both a condition and a precondition, the condition was evaluated twice which lead to double execution triggered.

Fixes
2025-11-14 18:16:32 +01:00
Loïc Mathieu
687ce00d33 fix(test): increase indexing waiting sleep 2025-11-13 18:08:39 +01:00
Loïc Mathieu
133828bdf1 feat(core): remove deprecated runner property in favor or taskRunner
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-13 18:08:39 +01:00
Loïc Mathieu
94d0975b78 feat(core): remove Property deprecated methdso and constructors
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-13 18:08:39 +01:00
Loïc Mathieu
b6e44954c6 feat(flow): remove FILE input extension
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-13 18:08:39 +01:00
Loïc Mathieu
0167f5f806 feat(flow): remove JSON flow support
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-13 18:08:39 +01:00
Loïc Mathieu
97769faba7 feat(flow): remove state store
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-13 18:08:39 +01:00
Loïc Mathieu
e6b5c8ec77 chore(system): remove kafka stream 2025-11-10 12:26:46 +01:00
Loïc Mathieu
052120766e fix(system): trigger an execution once per condition on flow triggers
Fixes #12560
2025-11-10 12:26:46 +01:00
Loïc Mathieu
999719ea69 fix(core): remove PostgresSchedulerScheduleTest as other JDBC impl didn't have it 2025-11-10 12:26:46 +01:00
Loïc Mathieu
f0790af2e5 feat(system): refactor concurrency limit 2025-11-10 12:26:46 +01:00
Loïc Mathieu
8323691aa3 feat(system): move the DefaultServiceLivenessCoordinator to the executor
As it is only started by the executor it should be inside this module
2025-11-10 12:26:46 +01:00
Loïc Mathieu
1f50be8828 feat(system): move flow topoloigy in its own component 2025-11-10 12:26:46 +01:00
Loïc Mathieu
93de3ecbb0 fix(system): MySQL migration 2025-11-10 12:26:46 +01:00
Loïc Mathieu
a88db9b0ad feat(system): rename WorkerGroupExecutor to WorkerGroupMetaStore 2025-11-10 12:26:46 +01:00
Loïc Mathieu
1401cac418 feat(services): use a single service liveness coordinator 2025-11-10 12:26:46 +01:00
Loïc Mathieu
6e2aaaf8a0 feat(system): un-couple queues and repositories 2025-11-10 12:26:46 +01:00
Loïc Mathieu
ff5d07cef8 feat(system): queue indexer 2025-11-10 12:26:46 +01:00
Loïc Mathieu
b44a855aa5 feat(system): Executor v2 2025-11-10 12:26:46 +01:00
Loïc Mathieu
d499c621d6 fix(locks): tryLock should release the lock 2025-11-04 14:25:08 +01:00
Loïc Mathieu
f6944d4e45 feat(system): improve locks
- Switch LockException to be a runtime exception
- Implements a tryLock() mechanism so skip the runnable if it's already locked
2025-11-04 14:25:08 +01:00
Florian Hussonnois
7f17e42da2 refactor(system): extract JdbcQueuePoller class from JdbcQueue
Extract a JdbcQueueConfiguration and JdbcQueuePoller classes from
JdbcQueue to improve clarity, testability and reuse of the code.
2025-11-04 14:25:08 +01:00
Loïc Mathieu
9bea470010 feat(flows): remove deprecated Schedule.scheduleConditions
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:25:08 +01:00
Loïc Mathieu
9baf648a24 feat(flows): remove deprecated FlowCondition and FlowNamespaceCondition
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:25:08 +01:00
Loïc Mathieu
0bfbee9a8a feat(flows): remove deprecated MultipleCondition condition
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:25:08 +01:00
Loïc Mathieu
7f11774a5c fix(tests): add a sleep to be sure ES indexation happens before deleting 2025-11-04 14:25:08 +01:00
Loïc Mathieu
9693206374 feat(system): add a lock mechanism 2025-11-04 14:25:08 +01:00
Loïc Mathieu
2b1f81047a feat(flows): remove deprecated LocalFiles task
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Loïc Mathieu
9ce2541497 feat(flows): remove deprecated Pebble json function and filter
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Loïc Mathieu
354ee5b233 feat(flows): remove deprecated EachParallel task
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Loïc Mathieu
7208aeec59 feat(flows): remove deprecated EachSequential
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Loïc Mathieu
52a81a7547 feat(flows): remove deprecated flow update task endpoint
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Florian Hussonnois
a108d89c86 chore(core): add a core Disposable interface 2025-11-04 14:24:48 +01:00
Loïc Mathieu
e3a8811ed2 chore(system): switch new migrations to V3 2025-11-04 14:24:48 +01:00
Loïc Mathieu
efcd68dfd5 feat(system): remove deprecated code not used anymore 2025-11-04 14:24:48 +01:00
Loïc Mathieu
c5eccb6476 feat(system): remove task defaults
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Loïc Mathieu
2a1118473e feat(flows): remove flow expand helper
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Loïc Mathieu
d4244a4eb4 feat(flows): remove Templates
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:24:48 +01:00
Loïc Mathieu
5e4be69dc9 feat(flows): remove the deprecated Echo task
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:22:25 +01:00
Loïc Mathieu
3b702597f5 feat(flows): remove deprecated ENUM inputs
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:22:25 +01:00
Loïc Mathieu
03883bbeff feat(flows): remove deprecated BOOLEAN inputs
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:22:25 +01:00
Loïc Mathieu
3231cd8b9c feat(flows): remove deprecated flow listeners
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:22:25 +01:00
Loïc Mathieu
35b8364071 feat(flows): remove deprecated input name
Part-of: https://github.com/kestra-io/kestra-ee/issues/3238
2025-11-04 14:22:25 +01:00
brian.mulier
9294c9f885 chore(version): upgrade to v1.2.0-SNAPSHOT 2025-11-04 14:04:13 +01:00
brian.mulier
ee63c33ef3 fix(ci): failsafe global-create-new-release-branch.yml 2025-11-04 13:49:59 +01:00
Roman Acevedo
d620dd7dec test: set retryWithFlowableErrors as FlakyTest 2025-11-04 13:46:14 +01:00
brian.mulier
02425586d6 fix(ci): failsafe global-create-new-release-branch.yml 2025-11-04 13:44:44 +01:00
brian.mulier
56d48ddf32 fix(ci): failsafe global-create-new-release-branch.yml 2025-11-04 13:41:32 +01:00
brian.mulier
1a5c79827b fix(ci): failsafe global-create-new-release-branch.yml 2025-11-04 13:36:06 +01:00
Florian Hussonnois
08b20fda68 fix(core): resolution of plugin must be done with a stable version 2025-11-04 11:29:47 +01:00
François Delbrayelle
7192ad1494 doc(http/request): fix doc about basic auth (#12626) 2025-11-04 10:44:36 +01:00
YannC
f164cddf7a Fix/sdk changes (#12411) (#12617)
* Fix/sdk changes (#12411)

* fix: kv controller remove namespace check

* clean(API): add query to filter parameter

* fix: flow update not deprecated

* clean(API): add deprecated on open api

* feat: executions annotations for skipping, follow method generation in sdk

* feat: add typing indication to validateTask

* fix(flowController): set correct hidden for json method in

* fix: optional params in delete executions endpoints

* fix: inputs/outputs as object

* change KV schema type to be object

* add back , deprecated = false on flow update, otherwise its marked as deprecated

* Revert "add back , deprecated = false on flow update, otherwise its marked as deprecated"

This reverts commit 3772404b68f14f0a80af9e0adb9952d58e9102b4.

* feat(API): add multipart to openAPI

* feat(API): add multipart to openAPI

* fix: only use plain-text for setKeyValue endpoint

* fix: KV command test

* chore: add multipart vendor annotations for custom generation on SDK

---------

Co-authored-by: YannC. <ycoornaert@kestra.io>
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>

* fix: kv test remove content type

---------

Co-authored-by: Roman Acevedo <roman.acevedo62@gmail.com>
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-11-04 08:37:08 +01:00
Barthélémy Ledoux
c1e18eb490 refactor: make typescript progress on logs (#12603) 2025-11-03 14:31:26 +01:00
Miloš Paunović
4365a108ac chore(core): enhance github issue templates (#12572) 2025-11-03 10:48:03 +00:00
brian-mulier-p
bb0e15a2cc fix(cli): avoid resaving existing metadata upon migration (#12607) 2025-11-03 11:16:03 +01:00
brian-mulier-p
3ab6d6a94f feat(cli): have separate commands for KV & secrets metadata migrations (#12585) 2025-11-03 09:33:26 +01:00
Krie
e116186201 feat: make charts default duration configurable, add kestra.ui.charts… (#12599)
* feat: make charts default duration configurable, add kestra.ui.charts.default-duration config parameter (default: PT720H/30 days)

* Update cli/src/main/resources/application.yml

Co-authored-by: YannC <37600690+Skraye@users.noreply.github.com>

* Update ui/src/components/flows/FlowRoot.vue

Co-authored-by: YannC <37600690+Skraye@users.noreply.github.com>

* Update ui/src/routes/routes.js

Co-authored-by: YannC <37600690+Skraye@users.noreply.github.com>

* feat: set P30D instead of PT720H for readability

---------

Co-authored-by: YannC <37600690+Skraye@users.noreply.github.com>
Co-authored-by: YannC. <ycoornaert@kestra.io>
2025-11-03 09:21:39 +01:00
Barthélémy Ledoux
6439671b91 fix: on producion, switch appear without fields (#12579) 2025-11-01 22:37:53 +01:00
Piyush Bhaskar
c044634381 fix(core): allow to show multiple labels and few cleanup (#12587) 2025-11-01 01:06:39 +05:30
Roman Acevedo
776ea0a93c ci: add dry run to release-docker.yml (#12586) 2025-10-31 20:02:21 +01:00
Piyush Bhaskar
a799ef8b64 fix(core): fix the pagination (#12569) 2025-10-31 20:57:17 +05:30
Loïc Mathieu
e2e4335771 fix(flows): fail flow validation for duplicate preconditions ID 2025-10-31 16:22:13 +01:00
Loïc Mathieu
f8b0d4217f fix(executions): Flow triggered twice when there are two multiple conditions
Fixes #12560
2025-10-31 16:22:13 +01:00
Pradumna Saraf
c594aa6764 chore(plugin): improve the OutputValues example 2025-10-31 15:15:44 +01:00
Piyush Bhaskar
d09bf5ac96 fix(core): lets have separate key and value input for labels for EQUALS, NOT_EQUALS operator (#12577) 2025-10-31 19:24:58 +05:30
Ishani Kundu
ef0a4e6b1a fix: Decrease the spacing between top of the filters and the bottom of the header (#12573)
Co-authored-by: Piyush Bhaskar <impiyush0012@gmail.com>
2025-10-31 17:35:34 +05:30
Piyush Bhaskar
5f81c19fc7 refactor(triggers): clean usage of trigger_state (#12568) 2025-10-31 16:36:03 +05:30
Mohammad Zaki
701f7e22d8 refactor(core): convert LogLevelSelector.vue to TypeScript and Composition API (#12556)
Co-authored-by: Piyush Bhaskar <impiyush0012@gmail.com>
2025-10-31 15:13:34 +05:30
Miloš Paunović
4bf469c992 fix(core): ensure "clicking fix with ai" button always shows and focuses the code panel (#12565)
Closes https://github.com/kestra-io/kestra/issues/12504.
2025-10-31 10:27:11 +01:00
Roman Acevedo
71e49f9eb5 feat(executions): add IN, NOT_IN, CONTAINS LABELS #11916
- advance on https://github.com/kestra-io/kestra/issues/11587
- companion PR: https://github.com/kestra-io/kestra-ee/pull/5617
2025-10-31 10:20:05 +01:00
Ishani Kundu
76e9b2269f refactor(core): convert Plugin.vue component to ts with Composition API (#12559)
Co-authored-by: Piyush Bhaskar <impiyush0012@gmail.com>
2025-10-31 14:45:48 +05:30
github-actions[bot]
c3f34e1c2a chore(core): localize to languages other than english (#12555)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-10-31 08:10:36 +01:00
Ludovic DEHON
e01e8d8fe0 feat(tasks): add a params for http tasks
close #12507
2025-10-31 00:40:49 +01:00
Dnyanesh Pise
7c5092f281 fix(ui): prevent marking fields as error on login (Fix #12548) (#12554) 2025-10-30 23:34:34 +05:30
Loïc Mathieu
e025677e70 fix(executions): set the execution to KILLING and not RESTARTED when killing a paused flow
Fixes https://github.com/kestra-io/kestra/issues/12417
2025-10-30 18:11:17 +01:00
Bart Ledoux
a3195c8e64 feat(ui): add concurency limit page and route 2025-10-30 17:49:33 +01:00
Loïc Mathieu
9920d190c8 feat(system): add an administration page for concurrency limit
Closes  #11250
2025-10-30 17:49:33 +01:00
brian.mulier
2b29a36850 fix(kv): get value doesn't need metadata migration
Also purge expired kv in metadata migrate command
2025-10-30 16:57:57 +01:00
brian.mulier
07e90de835 fix(core): CrudEvent should not be done on the repository side for KV 2025-10-30 16:57:57 +01:00
Loïc Mathieu
1c097209ac fix(flows): subflow validation could fail in Elasticsearch
As you cannot eq on a null field
2025-10-30 16:29:41 +01:00
Krie
ca70743329 chore(core): clarify usage of vite environment variables in docs (#12520)
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-10-30 16:18:37 +01:00
github-actions[bot]
5d2c93b232 chore(core): localize to languages other than english (#12550)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-10-30 16:16:04 +01:00
Loïc Mathieu
bc7291b8e3 chore(deps): fix OpenTelemetry proto so it works with Protobuf 3
Fixes https://github.com/kestra-io/kestra/issues/12298
2025-10-30 15:47:10 +01:00
Loïc Mathieu
c06ffb3063 feat(system): set taskrun attempt to resubmitted when a taskrun is resubmitted to a worker
Closes https://github.com/kestra-io/kestra/issues/12481
2025-10-30 15:46:05 +01:00
Barthélémy Ledoux
7c89eec500 fix(nocode): switch statements should display corectly (#12509)
Co-authored-by: brian.mulier <bmmulier@hotmail.fr>
2025-10-30 15:34:15 +01:00
Ritoban Dutta
45592597e7 chore: convert StateChart.vue component to TypeScript with Composition API (#12537)
Co-authored-by: Piyush Bhaskar <impiyush0012@gmail.com>
2025-10-30 17:32:47 +05:30
Hemant M Mehta
313fda153a fix: avoid reusing jq scope and revert unnecessary test changes
Signed-off-by: Hemant M Mehta <hemant29mehta@gmail.com>
2025-10-30 12:53:44 +01:00
Hemant M Mehta
6c3bbcea4d fix: Move Scope initialization to static block
Signed-off-by: Hemant M Mehta <hemant29mehta@gmail.com>
2025-10-30 12:53:44 +01:00
Hemant M Mehta
53b46f11aa fix: Increase wait timeout to 120s for restartFlowable tests to fix timing issues
Signed-off-by: Hemant M Mehta <hemant29mehta@gmail.com>
2025-10-30 12:53:44 +01:00
Hemant M Mehta
9396e73f5a fix: Update ExecutionServiceTest replayFlowable
Signed-off-by: Hemant M Mehta <hemant29mehta@gmail.com>
2025-10-30 12:53:44 +01:00
Hemant M Mehta
d02b6b0470 fix: static call issue
Signed-off-by: Hemant M Mehta <hemant29mehta@gmail.com>
2025-10-30 12:53:44 +01:00
Hemant M Mehta
bdfd324a7d fix: update to version
Signed-off-by: Hemant M Mehta <hemant29mehta@gmail.com>
2025-10-30 12:53:44 +01:00
Hemant M Mehta
551f6fe033 fix: updated the version
Signed-off-by: Hemant M Mehta <hemant29mehta@gmail.com>
2025-10-30 12:53:44 +01:00
Hemant M Mehta
7a0b3843e1 fix: jq-filter-zip-exception
closes: #11683
2025-10-30 12:53:44 +01:00
Dheeraj_R_Gowda
d713f2753b refactor(ui): convert TemplateEdit.vue to TypeScript using script setup (#12530)
Co-authored-by: Piyush Bhaskar <impiyush0012@gmail.com>
2025-10-30 16:55:35 +05:30
Loïc Mathieu
bc27e0ea9e chore(executions): deprecate the state store for the KVStore 2025-10-30 11:30:36 +01:00
Piyush Bhaskar
08f4b2ea22 fix(core): properly apply the saved filter containing date range. (#12541) 2025-10-30 15:31:43 +05:30
Barthélémy Ledoux
b64168f115 fix: save tabs state onCreate (#12517) 2025-10-30 10:44:53 +01:00
Piyush Bhaskar
b23aa3eb1a fix(core): make popper open fast (#12533) 2025-10-30 12:51:47 +05:30
Piyush Bhaskar
70b5c03fb2 fix(core): remove the scope from Logs and show the chart by default (#12532) 2025-10-30 12:39:22 +05:30
Piyush Bhaskar
094802dd85 fix(filter): make reset and reset to default of pre applied filter robust. (#12508) 2025-10-30 12:32:20 +05:30
brian.mulier
d9144c8c4f feat(core): introduce KV Metadata in-repository storing (#12342)
part of https://github.com/kestra-io/kestra/issues/12341
2025-10-29 17:18:43 +01:00
brian.mulier
b18d304b77 fix(kv): properly serialize durations 2025-10-29 17:18:43 +01:00
brian.mulier
c38cac5a9d fix(tests): concurrency-safe Template emits 2025-10-29 17:18:43 +01:00
brian.mulier
4ed44754ab fix(core): use index by adding deleted everytime in query 2025-10-29 17:18:43 +01:00
Florian Hussonnois
e62baaabe4 fix(core): fix PluginCatalogService resolve method 2025-10-29 17:10:32 +01:00
Nicolas K.
efac416863 feat(core): force telemetry when license requires it (#12512)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-10-29 17:03:45 +01:00
François Delbrayelle
d26956fc89 doc(http/request): fix doc about basic auth (#12510) 2025-10-29 16:56:27 +01:00
Diksha Ajaykumar Nigam
03a5c52445 refactor(ui): convert LeftMenu.vue to TypeScript using script setup (#12440) 2025-10-29 15:49:24 +01:00
Barthélémy Ledoux
290e0c5ded fix: avoid refreshing token when impersonated (#12476) 2025-10-29 15:47:52 +01:00
Ashwini Kumar
1c0e0fd926 refactor(ui): Convert LeftMenuLink.vue to TypeScript (#12431) 2025-10-29 15:27:19 +01:00
YannC
9042e86f12 fix: make sure taskOutputs is never set as a Variables map (#12484)
close #11967
2025-10-29 15:25:09 +01:00
Barthélémy Ledoux
c6be8798d6 fix: show images in editor (#12503)
Co-authored-by: Piyush Bhaskar <102078527+Piyush-r-bhaskar@users.noreply.github.com>
2025-10-29 15:16:23 +01:00
Barthélémy Ledoux
452ac83b01 fix: open tasks from topo in other panel (#12432) 2025-10-29 15:15:33 +01:00
dependabot[bot]
3dd198f036 build(deps): bump software.amazon.awssdk:bom from 2.35.11 to 2.36.3
Bumps software.amazon.awssdk:bom from 2.35.11 to 2.36.3.

---
updated-dependencies:
- dependency-name: software.amazon.awssdk:bom
  dependency-version: 2.36.3
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-10-29 15:01:51 +01:00
Florian Hussonnois
228863d91a fix(test): fix test on FlowValidationTest 2025-10-29 14:19:56 +01:00
dependabot[bot]
8b17a7c36d build(deps): bump dev.langchain4j:langchain4j-community-bom
Bumps [dev.langchain4j:langchain4j-community-bom](https://github.com/langchain4j/langchain4j-community) from 1.7.1-beta14 to 1.8.0-beta15.
- [Release notes](https://github.com/langchain4j/langchain4j-community/releases)
- [Commits](https://github.com/langchain4j/langchain4j-community/compare/1.7.1-beta14...1.8.0-beta15)

---
updated-dependencies:
- dependency-name: dev.langchain4j:langchain4j-community-bom
  dependency-version: 1.8.0-beta15
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-10-29 14:17:02 +01:00
Loïc Mathieu
55a8896181 chore(tests): set FlowControllerTest.updateFlowFlowFromJsonFromString as flaky
It fails often in CI and as the tested endpoint is deprecated it is not important that the test pass.
2025-10-29 14:12:32 +01:00
dependabot[bot]
fc600cc1e3 build(deps): bump dev.langchain4j:langchain4j-bom from 1.7.1 to 1.8.0
Bumps [dev.langchain4j:langchain4j-bom](https://github.com/langchain4j/langchain4j) from 1.7.1 to 1.8.0.
- [Release notes](https://github.com/langchain4j/langchain4j/releases)
- [Commits](https://github.com/langchain4j/langchain4j/compare/1.7.1...1.8.0)

---
updated-dependencies:
- dependency-name: dev.langchain4j:langchain4j-bom
  dependency-version: 1.8.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-10-29 14:11:54 +01:00
dependabot[bot]
fa23081207 build(deps): bump @swc/core-linux-x64-gnu from 1.13.21 to 1.14.0 in /ui (#12494)
Bumps [@swc/core-linux-x64-gnu](https://github.com/swc-project/swc) from 1.13.21 to 1.14.0.
- [Release notes](https://github.com/swc-project/swc/releases)
- [Changelog](https://github.com/swc-project/swc/blob/main/CHANGELOG.md)
- [Commits](https://github.com/swc-project/swc/compare/v1.13.21...v1.14.0)

---
updated-dependencies:
- dependency-name: "@swc/core-linux-x64-gnu"
  dependency-version: 1.14.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-10-29 13:16:38 +01:00
Piyush Bhaskar
2b04192d1b fix(core): avoid hot reload on enter in input (#12490) 2025-10-29 17:46:29 +05:30
dependabot[bot]
b7fbdf8aed build(deps): bump @swc/core-darwin-x64 from 1.13.21 to 1.14.0 in /ui (#12495)
Bumps [@swc/core-darwin-x64](https://github.com/swc-project/swc) from 1.13.21 to 1.14.0.
- [Release notes](https://github.com/swc-project/swc/releases)
- [Changelog](https://github.com/swc-project/swc/blob/main/CHANGELOG.md)
- [Commits](https://github.com/swc-project/swc/compare/v1.13.21...v1.14.0)

---
updated-dependencies:
- dependency-name: "@swc/core-darwin-x64"
  dependency-version: 1.14.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-10-29 13:15:40 +01:00
dependabot[bot]
5a95fcf1ff build(deps): bump @swc/core-darwin-arm64 from 1.13.21 to 1.14.0 in /ui (#12489)
Bumps [@swc/core-darwin-arm64](https://github.com/swc-project/swc) from 1.13.21 to 1.14.0.
- [Release notes](https://github.com/swc-project/swc/releases)
- [Changelog](https://github.com/swc-project/swc/blob/main/CHANGELOG.md)
- [Commits](https://github.com/swc-project/swc/compare/v1.13.21...v1.14.0)

---
updated-dependencies:
- dependency-name: "@swc/core-darwin-arm64"
  dependency-version: 1.14.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-10-29 13:13:27 +01:00
Miloš Paunović
558ca24dac chore(deps): regular dependency update (#12473)
Performing a weekly round of dependency updates in the NPM ecosystem to keep everything up to date.
2025-10-29 13:03:00 +01:00
dependabot[bot]
1ffc60fe07 build(deps): bump org.sonarqube from 7.0.0.6105 to 7.0.1.6134
Bumps org.sonarqube from 7.0.0.6105 to 7.0.1.6134.

---
updated-dependencies:
- dependency-name: org.sonarqube
  dependency-version: 7.0.1.6134
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-10-29 12:39:14 +01:00
Your Name
4cdbb5f57e Remove duplicate test and explicitly use Integer.valueOf in chunkWithIntegerVariable 2025-10-29 12:36:00 +01:00
Your Name
3f27645b3c test(core): add reproducer for Integer to Long casting issue in chunk filter 2025-10-29 12:36:00 +01:00
Your Name
a897618108 fix(core): handle integer size in chunk Pebble filter 2025-10-29 12:36:00 +01:00
Piyush Bhaskar
cb9662cbd7 fix(core): tweaks for dropdown bg and shadow (#12479) 2025-10-29 16:14:27 +05:30
Naveen Gowda MY
c60be5c9f8 feat: add error feedback and validation to login form Fixes #12361 (#12472)
Co-authored-by: Piyush Bhaskar <impiyush0012@gmail.com>
Co-authored-by: Piyush Bhaskar <102078527+Piyush-r-bhaskar@users.noreply.github.com>
2025-10-29 15:44:15 +05:30
dependabot[bot]
ec74c1ae51 build(deps): bump com.mysql:mysql-connector-j from 9.4.0 to 9.5.0
Bumps [com.mysql:mysql-connector-j](https://github.com/mysql/mysql-connector-j) from 9.4.0 to 9.5.0.
- [Changelog](https://github.com/mysql/mysql-connector-j/blob/release/9.x/CHANGES)
- [Commits](https://github.com/mysql/mysql-connector-j/compare/9.4.0...9.5.0)

---
updated-dependencies:
- dependency-name: com.mysql:mysql-connector-j
  dependency-version: 9.5.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-10-29 11:02:06 +01:00
dependabot[bot]
ded9e8c13a build(deps): bump software.amazon.awssdk.crt:aws-crt
Bumps [software.amazon.awssdk.crt:aws-crt](https://github.com/awslabs/aws-crt-java) from 0.39.3 to 0.39.4.
- [Release notes](https://github.com/awslabs/aws-crt-java/releases)
- [Commits](https://github.com/awslabs/aws-crt-java/compare/v0.39.3...v0.39.4)

---
updated-dependencies:
- dependency-name: software.amazon.awssdk.crt:aws-crt
  dependency-version: 0.39.4
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-10-29 11:00:40 +01:00
dependabot[bot]
fcb2d18beb build(deps): bump actions/upload-artifact from 4 to 5
Bumps [actions/upload-artifact](https://github.com/actions/upload-artifact) from 4 to 5.
- [Release notes](https://github.com/actions/upload-artifact/releases)
- [Commits](https://github.com/actions/upload-artifact/compare/v4...v5)

---
updated-dependencies:
- dependency-name: actions/upload-artifact
  dependency-version: '5'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-10-29 11:00:09 +01:00
dependabot[bot]
c3bc919891 build(deps): bump com.google.cloud:libraries-bom from 26.70.0 to 26.71.0
Bumps [com.google.cloud:libraries-bom](https://github.com/googleapis/java-cloud-bom) from 26.70.0 to 26.71.0.
- [Release notes](https://github.com/googleapis/java-cloud-bom/releases)
- [Changelog](https://github.com/googleapis/java-cloud-bom/blob/main/release-please-config.json)
- [Commits](https://github.com/googleapis/java-cloud-bom/compare/v26.70.0...v26.71.0)

---
updated-dependencies:
- dependency-name: com.google.cloud:libraries-bom
  dependency-version: 26.71.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-10-29 10:59:40 +01:00
brian-mulier-p
03542e91f3 fix(core): show tasks in JSON Schema for Switch.cases (#12478)
part of #10508
2025-10-29 10:26:38 +01:00
Florian Hussonnois
958ee1ef8a fix(system): add resolveVersions method to PluginCatalogService
Related-to: kestra-io/kestra-ee#5171
2025-10-29 10:16:59 +01:00
Piyush Bhaskar
a27348b872 feat(core): add support for single datetime as well (#12471) 2025-10-29 13:35:37 +05:30
Roman Acevedo
36aedec8f0 ci: add skip test param to pre-release.yml 2025-10-28 17:53:57 +01:00
Pratik Murari
9499cfc955 refactor: convert RouterMd.vue component to TypeScript (#12429)
Co-authored-by: Barthélémy Ledoux <ledouxb@me.com>
2025-10-28 16:41:32 +01:00
Anna Geller
d3d14a252b feat: add cloud formation template to deploy Kestra (#12412)
* feat: add cloud formation template to deploy Kestra

* Update README.md
2025-10-28 15:45:43 +01:00
Ridham Anand
425af2a530 refactor(layout): convert DefaultLayout.vue to TypeScript using Composition API (#12424)
Co-authored-by: Bart Ledoux <bledoux@kestra.io>
2025-10-28 15:32:57 +01:00
Arya Soni
0bae8cdbe9 Convert ErrorToastContainer.vue component to TS (#12418)
Co-authored-by: Bart Ledoux <bledoux@kestra.io>
2025-10-28 15:28:41 +01:00
Ravi kumar
b9a5a74674 refactor(ui): convert Settings.vue to TS composition API (#12413)
Co-authored-by: Bart Ledoux <bledoux@kestra.io>
2025-10-28 15:24:19 +01:00
Piyush Bhaskar
222fae2a22 fix(core): add dynamic rowKey for selection handling (#12428) 2025-10-28 19:11:14 +05:30
Loïc Mathieu
4502c52d2b fix(executions): remove errors and finally tasks when restarting
Otherwize we would detect that an error or a finally branch is processing and the flowable state would not be correctly taken.

Moreover, it prevent this branch to be taken again after a restart.

Fixes #11731
2025-10-28 14:29:27 +01:00
Florian Hussonnois
153ac27040 fix(flows): KV pebble expressions with input defaults (#12314)
Fixes: #12314
2025-10-28 14:29:03 +01:00
Florian Hussonnois
6361a02deb feat(core): add prefill prop to input to allow nullable value (#11819)
Added a new 'prefill' property for all inputs
to specify an optional UI hint for pre-filling the input,while
allowing the input to be nullable.

Fixes: #11819
2025-10-28 14:21:26 +01:00
600 changed files with 12249 additions and 14402 deletions

View File

@@ -32,11 +32,6 @@ In the meantime, you can move onto the next step...
### Development:
- (Optional) By default, your dev server will target `localhost:8080`. If your backend is running elsewhere, you can create `.env.development.local` under `ui` folder with this content:
```
VITE_APP_API_URL={myApiUrl}
```
- Navigate into the `ui` folder and run `npm install` to install the dependencies for the frontend project.
- Now go to the `cli/src/main/resources` folder and create a `application-override.yml` file.

View File

@@ -1,5 +1,8 @@
name: Bug report
description: File a bug report
description: Report a bug or unexpected behavior in the project
labels: ["bug", "area/backend", "area/frontend"]
body:
- type: markdown
attributes:
@@ -20,7 +23,3 @@ body:
- Kestra Version: develop
validations:
required: false
labels:
- bug
- area/backend
- area/frontend

View File

@@ -1,4 +1,4 @@
contact_links:
- name: Chat
url: https://kestra.io/slack
about: Chat with us on Slack.
about: Chat with us on Slack

View File

@@ -1,5 +1,8 @@
name: Feature request
description: Create a new feature request
description: Suggest a new feature or improvement to enhance the project
labels: ["enhancement", "area/backend", "area/frontend"]
body:
- type: textarea
attributes:
@@ -7,7 +10,3 @@ body:
placeholder: Tell us more about your feature request. Don't forget to give us a star! ⭐
validations:
required: true
labels:
- enhancement
- area/backend
- area/frontend

View File

@@ -64,7 +64,8 @@ jobs:
cd kestra
# Create and push release branch
git checkout -b "$PUSH_RELEASE_BRANCH";
git checkout -B "$PUSH_RELEASE_BRANCH";
git pull origin "$PUSH_RELEASE_BRANCH" --rebase || echo "No existing branch to pull";
git push -u origin "$PUSH_RELEASE_BRANCH";
# Run gradle release

View File

@@ -5,6 +5,15 @@ on:
tags:
- 'v*'
workflow_dispatch:
inputs:
skip-test:
description: 'Skip test'
type: choice
required: true
default: 'false'
options:
- "true"
- "false"
jobs:
build-artifacts:
@@ -14,6 +23,7 @@ jobs:
backend-tests:
name: Backend tests
uses: kestra-io/actions/.github/workflows/kestra-oss-backend-tests.yml@main
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
secrets:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
@@ -23,6 +33,7 @@ jobs:
frontend-tests:
name: Frontend tests
uses: kestra-io/actions/.github/workflows/kestra-oss-frontend-tests.yml@main
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
secrets:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

View File

@@ -13,11 +13,11 @@ on:
required: true
type: boolean
default: false
plugin-version:
description: 'Plugin version'
required: false
type: string
default: "LATEST"
dry-run:
description: 'Dry run mode that will not write or release anything'
required: true
type: boolean
default: false
jobs:
publish-docker:
@@ -25,9 +25,9 @@ jobs:
if: startsWith(github.ref, 'refs/tags/v')
uses: kestra-io/actions/.github/workflows/kestra-oss-publish-docker.yml@main
with:
plugin-version: ${{ inputs.plugin-version }}
retag-latest: ${{ inputs.retag-latest }}
retag-lts: ${{ inputs.retag-lts }}
dry-run: ${{ inputs.dry-run }}
secrets:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}

View File

@@ -43,7 +43,7 @@ jobs:
# Upload dependency check report
- name: Upload dependency check report
uses: actions/upload-artifact@v4
uses: actions/upload-artifact@v5
if: ${{ always() }}
with:
name: dependency-check-report

View File

@@ -68,6 +68,12 @@ Kestra is an open-source, event-driven orchestration platform that makes both **
## 🚀 Quick Start
### Launch on AWS (CloudFormation)
Deploy Kestra on AWS using our CloudFormation template:
[![Launch Stack](https://cdn.rawgit.com/buildkite/cloudformation-launch-stack-button-svg/master/launch-stack.svg)](https://console.aws.amazon.com/cloudformation/home#/stacks/create/review?templateURL=https://kestra-deployment-templates.s3.eu-west-3.amazonaws.com/aws/cloudformation/ec2-rds-s3/kestra-oss.yaml&stackName=kestra-oss)
### Get Started Locally in 5 Minutes
#### Launch Kestra in Docker

View File

@@ -21,7 +21,7 @@ plugins {
// test
id "com.adarshr.test-logger" version "4.0.0"
id "org.sonarqube" version "7.0.0.6105"
id "org.sonarqube" version "7.0.1.6134"
id 'jacoco-report-aggregation'
// helper

View File

@@ -7,7 +7,6 @@ import io.kestra.cli.commands.namespaces.NamespaceCommand;
import io.kestra.cli.commands.plugins.PluginCommand;
import io.kestra.cli.commands.servers.ServerCommand;
import io.kestra.cli.commands.sys.SysCommand;
import io.kestra.cli.commands.templates.TemplateCommand;
import io.micronaut.configuration.picocli.MicronautFactory;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
@@ -39,7 +38,6 @@ import java.util.concurrent.Callable;
PluginCommand.class,
ServerCommand.class,
FlowCommand.class,
TemplateCommand.class,
SysCommand.class,
ConfigCommand.class,
NamespaceCommand.class,

View File

@@ -4,6 +4,7 @@ import io.kestra.core.runners.*;
import io.kestra.core.server.Service;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.executor.DefaultExecutor;
import io.kestra.worker.DefaultWorker;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Value;
@@ -49,7 +50,7 @@ public class StandAloneRunner implements Runnable, AutoCloseable {
running.set(true);
poolExecutor = executorsUtils.cachedThreadPool("standalone-runner");
poolExecutor.execute(applicationContext.getBean(ExecutorInterface.class));
poolExecutor.execute(applicationContext.getBean(DefaultExecutor.class));
if (workerEnabled) {
// FIXME: For backward-compatibility with Kestra 0.15.x and earliest we still used UUID for Worker ID instead of IdUtils

View File

@@ -1,36 +0,0 @@
package io.kestra.cli.commands.flows;
import io.kestra.cli.AbstractCommand;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.serializers.YamlParser;
import jakarta.inject.Inject;
import picocli.CommandLine;
import java.nio.file.Files;
import java.nio.file.Path;
@CommandLine.Command(
name = "expand",
description = "Deprecated - expand a flow"
)
@Deprecated
public class FlowExpandCommand extends AbstractCommand {
@CommandLine.Parameters(index = "0", description = "The flow file to expand")
private Path file;
@Inject
private ModelValidator modelValidator;
@Override
public Integer call() throws Exception {
super.call();
stdErr("Warning, this functionality is deprecated and will be removed at some point.");
String content = IncludeHelperExpander.expand(Files.readString(file), file.getParent());
Flow flow = YamlParser.parse(content, Flow.class);
modelValidator.validate(flow);
stdOut(content);
return 0;
}
}

View File

@@ -21,6 +21,8 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import static io.kestra.core.utils.Rethrow.throwFunction;
@CommandLine.Command(
name = "updates",
description = "Create or update flows from a folder, and optionally delete the ones not present",
@@ -41,7 +43,6 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
@Inject
private TenantIdSelectorService tenantIdSelectorService;
@SuppressWarnings("deprecation")
@Override
public Integer call() throws Exception {
super.call();
@@ -50,13 +51,7 @@ public class FlowUpdatesCommand extends AbstractApiCommand {
List<String> flows = files
.filter(Files::isRegularFile)
.filter(YamlParser::isValidExtension)
.map(path -> {
try {
return IncludeHelperExpander.expand(Files.readString(path, Charset.defaultCharset()), path.getParent());
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.map(throwFunction(path -> Files.readString(path, Charset.defaultCharset())))
.toList();
String body = "";

View File

@@ -1,40 +0,0 @@
package io.kestra.cli.commands.flows;
import com.google.common.io.Files;
import lombok.SneakyThrows;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.util.List;
import java.util.stream.Collectors;
@Deprecated
public abstract class IncludeHelperExpander {
public static String expand(String value, Path directory) throws IOException {
return value.lines()
.map(line -> line.contains("[[>") && line.contains("]]") ? expandLine(line, directory) : line)
.collect(Collectors.joining("\n"));
}
@SneakyThrows
private static String expandLine(String line, Path directory) {
String prefix = line.substring(0, line.indexOf("[[>"));
String suffix = line.substring(line.indexOf("]]") + 2, line.length());
String file = line.substring(line.indexOf("[[>") + 3 , line.indexOf("]]")).strip();
Path includePath = directory.resolve(file);
List<String> include = Files.readLines(includePath.toFile(), Charset.defaultCharset());
// handle single line directly with the suffix (should be between quotes or double-quotes
if(include.size() == 1) {
String singleInclude = include.getFirst();
return prefix + singleInclude + suffix;
}
// multi-line will be expanded with the prefix but no suffix
return include.stream()
.map(includeLine -> prefix + includeLine)
.collect(Collectors.joining("\n"));
}
}

View File

@@ -2,7 +2,6 @@ package io.kestra.cli.commands.flows.namespaces;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.commands.AbstractServiceNamespaceUpdateCommand;
import io.kestra.cli.commands.flows.IncludeHelperExpander;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.serializers.YamlParser;
import io.micronaut.core.type.Argument;
@@ -21,6 +20,8 @@ import java.nio.charset.Charset;
import java.nio.file.Files;
import java.util.List;
import static io.kestra.core.utils.Rethrow.throwFunction;
@CommandLine.Command(
name = "update",
description = "Update flows in namespace",
@@ -44,13 +45,7 @@ public class FlowNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCo
List<String> flows = files
.filter(Files::isRegularFile)
.filter(YamlParser::isValidExtension)
.map(path -> {
try {
return IncludeHelperExpander.expand(Files.readString(path, Charset.defaultCharset()), path.getParent());
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.map(throwFunction(path -> Files.readString(path, Charset.defaultCharset())))
.toList();
String body = "";

View File

@@ -1,28 +0,0 @@
package io.kestra.cli.commands.migrations;
import io.kestra.cli.AbstractCommand;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "metadata",
description = "populate metadata for entities"
)
@Slf4j
public class MetadataMigrationCommand extends AbstractCommand {
@Inject
private MetadataMigrationService metadataMigrationService;
@Override
public Integer call() throws Exception {
super.call();
try {
metadataMigrationService.migrateMetadata();
System.out.println("✅ Metadata migration complete.");
return 0;
} catch (Exception e) {
return 1;
}
}
}

View File

@@ -1,11 +0,0 @@
package io.kestra.cli.commands.migrations;
import jakarta.inject.Singleton;
@Singleton
public class MetadataMigrationService {
public int migrateMetadata() {
// no-op
return 0;
}
}

View File

@@ -2,6 +2,7 @@ package io.kestra.cli.commands.migrations;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.cli.commands.migrations.metadata.MetadataMigrationCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

View File

@@ -0,0 +1,30 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.AbstractCommand;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "kv",
description = "populate metadata for KV"
)
@Slf4j
public class KvMetadataMigrationCommand extends AbstractCommand {
@Inject
private MetadataMigrationService metadataMigrationService;
@Override
public Integer call() throws Exception {
super.call();
try {
metadataMigrationService.kvMigration();
} catch (Exception e) {
System.err.println("❌ KV Metadata migration failed: " + e.getMessage());
e.printStackTrace();
return 1;
}
System.out.println("✅ KV Metadata migration complete.");
return 0;
}
}

View File

@@ -0,0 +1,23 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.AbstractCommand;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "metadata",
description = "populate metadata for entities",
subcommands = {
KvMetadataMigrationCommand.class,
SecretsMetadataMigrationCommand.class
}
)
@Slf4j
public class MetadataMigrationCommand extends AbstractCommand {
@Override
public Integer call() throws Exception {
super.call();
return 0;
}
}

View File

@@ -0,0 +1,89 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.core.models.kv.PersistedKvMetadata;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.KvMetadataRepositoryInterface;
import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.kv.InternalKVStore;
import io.kestra.core.storages.kv.KVEntry;
import io.kestra.core.tenant.TenantService;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import static io.kestra.core.utils.Rethrow.throwConsumer;
import static io.kestra.core.utils.Rethrow.throwFunction;
@Singleton
public class MetadataMigrationService {
@Inject
private TenantService tenantService;
@Inject
private FlowRepositoryInterface flowRepository;
@Inject
private KvMetadataRepositoryInterface kvMetadataRepository;
@Inject
private StorageInterface storageInterface;
protected Map<String, List<String>> namespacesPerTenant() {
String tenantId = tenantService.resolveTenant();
return Map.of(tenantId, flowRepository.findDistinctNamespace(tenantId));
}
public void kvMigration() throws IOException {
this.namespacesPerTenant().entrySet().stream()
.flatMap(namespacesForTenant -> namespacesForTenant.getValue().stream().map(namespace -> Map.entry(namespacesForTenant.getKey(), namespace)))
.flatMap(throwFunction(namespaceForTenant -> {
InternalKVStore kvStore = new InternalKVStore(namespaceForTenant.getKey(), namespaceForTenant.getValue(), storageInterface, kvMetadataRepository);
List<FileAttributes> list = listAllFromStorage(storageInterface, namespaceForTenant.getKey(), namespaceForTenant.getValue());
Map<Boolean, List<KVEntry>> entriesByIsExpired = list.stream()
.map(throwFunction(fileAttributes -> KVEntry.from(namespaceForTenant.getValue(), fileAttributes)))
.collect(Collectors.partitioningBy(kvEntry -> Optional.ofNullable(kvEntry.expirationDate()).map(expirationDate -> Instant.now().isAfter(expirationDate)).orElse(false)));
entriesByIsExpired.get(true).forEach(kvEntry -> {
try {
storageInterface.delete(
namespaceForTenant.getKey(),
namespaceForTenant.getValue(),
kvStore.storageUri(kvEntry.key())
);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
return entriesByIsExpired.get(false).stream().map(kvEntry -> PersistedKvMetadata.from(namespaceForTenant.getKey(), kvEntry));
}))
.forEach(throwConsumer(kvMetadata -> {
if (kvMetadataRepository.findByName(kvMetadata.getTenantId(), kvMetadata.getNamespace(), kvMetadata.getName()).isEmpty()) {
kvMetadataRepository.save(kvMetadata);
}
}));
}
public void secretMigration() throws Exception {
throw new UnsupportedOperationException("Secret migration is not needed in the OSS version");
}
private static List<FileAttributes> listAllFromStorage(StorageInterface storage, String tenant, String namespace) throws IOException {
try {
return storage.list(tenant, namespace, URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.kvPrefix(namespace)));
} catch (FileNotFoundException e) {
return Collections.emptyList();
}
}
}

View File

@@ -0,0 +1,30 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.AbstractCommand;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "secrets",
description = "populate metadata for secrets"
)
@Slf4j
public class SecretsMetadataMigrationCommand extends AbstractCommand {
@Inject
private MetadataMigrationService metadataMigrationService;
@Override
public Integer call() throws Exception {
super.call();
try {
metadataMigrationService.secretMigration();
} catch (Exception e) {
System.err.println("❌ Secrets Metadata migration failed: " + e.getMessage());
e.printStackTrace();
return 1;
}
System.out.println("✅ Secrets Metadata migration complete.");
return 0;
}
}

View File

@@ -62,7 +62,7 @@ public class KvUpdateCommand extends AbstractApiCommand {
Duration ttl = expiration == null ? null : Duration.parse(expiration);
MutableHttpRequest<String> request = HttpRequest
.PUT(apiUri("/namespaces/", tenantService.getTenantId(tenantId)) + namespace + "/kv/" + key, value)
.contentType(MediaType.APPLICATION_JSON_TYPE);
.contentType(MediaType.TEXT_PLAIN);
if (ttl != null) {
request.header("ttl", ttl.toString());

View File

@@ -2,7 +2,7 @@ package io.kestra.cli.commands.servers;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.ServerType;
import io.kestra.core.runners.ExecutorInterface;
import io.kestra.core.runners.Executor;
import io.kestra.core.services.SkipExecutionService;
import io.kestra.core.services.StartExecutorService;
import io.kestra.core.utils.Await;
@@ -64,7 +64,7 @@ public class ExecutorCommand extends AbstractServerCommand {
super.call();
ExecutorInterface executorService = applicationContext.getBean(ExecutorInterface.class);
Executor executorService = applicationContext.getBean(Executor.class);
executorService.run();
Await.until(() -> !this.applicationContext.isRunning());

View File

@@ -7,7 +7,7 @@ import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.ExecutionQueued;
import io.kestra.core.services.ConcurrencyLimitService;
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStorage;
import io.kestra.jdbc.runner.AbstractJdbcExecutionQueuedStateStore;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
@@ -47,7 +47,7 @@ public class SubmitQueuedCommand extends AbstractCommand {
return 1;
}
else if (queueType.get().equals("postgres") || queueType.get().equals("mysql") || queueType.get().equals("h2")) {
var executionQueuedStorage = applicationContext.getBean(AbstractJdbcExecutionQueuedStorage.class);
var executionQueuedStorage = applicationContext.getBean(AbstractJdbcExecutionQueuedStateStore.class);
var concurrencyLimitService = applicationContext.getBean(ConcurrencyLimitService.class);
for (ExecutionQueued queued : executionQueuedStorage.getAllForAllTenants()) {

View File

@@ -1,7 +1,6 @@
package io.kestra.cli.commands.sys;
import io.kestra.cli.commands.sys.database.DatabaseCommand;
import io.kestra.cli.commands.sys.statestore.StateStoreCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.extern.slf4j.Slf4j;
import io.kestra.cli.AbstractCommand;
@@ -16,7 +15,6 @@ import picocli.CommandLine;
ReindexCommand.class,
DatabaseCommand.class,
SubmitQueuedCommand.class,
StateStoreCommand.class
}
)
@Slf4j

View File

@@ -1,27 +0,0 @@
package io.kestra.cli.commands.sys.statestore;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import picocli.CommandLine;
@CommandLine.Command(
name = "state-store",
description = "Manage Kestra State Store",
mixinStandardHelpOptions = true,
subcommands = {
StateStoreMigrateCommand.class,
}
)
public class StateStoreCommand extends AbstractCommand {
@SneakyThrows
@Override
public Integer call() throws Exception {
super.call();
PicocliRunner.call(App.class, "sys", "state-store", "--help");
return 0;
}
}

View File

@@ -1,81 +0,0 @@
package io.kestra.cli.commands.sys.statestore;
import io.kestra.cli.AbstractCommand;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.StateStore;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.Slugify;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
@CommandLine.Command(
name = "migrate",
description = "Migrate old state store files to use the new KV Store implementation.",
mixinStandardHelpOptions = true
)
@Slf4j
public class StateStoreMigrateCommand extends AbstractCommand {
@Inject
private ApplicationContext applicationContext;
@Override
public Integer call() throws Exception {
super.call();
FlowRepositoryInterface flowRepository = this.applicationContext.getBean(FlowRepositoryInterface.class);
StorageInterface storageInterface = this.applicationContext.getBean(StorageInterface.class);
RunContextFactory runContextFactory = this.applicationContext.getBean(RunContextFactory.class);
flowRepository.findAllForAllTenants().stream().map(flow -> Map.entry(flow, List.of(
URI.create("/" + flow.getNamespace().replace(".", "/") + "/" + Slugify.of(flow.getId()) + "/states"),
URI.create("/" + flow.getNamespace().replace(".", "/") + "/states")
))).map(potentialStateStoreUrisForAFlow -> Map.entry(potentialStateStoreUrisForAFlow.getKey(), potentialStateStoreUrisForAFlow.getValue().stream().flatMap(uri -> {
try {
return storageInterface.allByPrefix(potentialStateStoreUrisForAFlow.getKey().getTenantId(), potentialStateStoreUrisForAFlow.getKey().getNamespace(), uri, false).stream();
} catch (IOException e) {
return Stream.empty();
}
}).toList())).forEach(stateStoreFileUrisForAFlow -> stateStoreFileUrisForAFlow.getValue().forEach(stateStoreFileUri -> {
Flow flow = stateStoreFileUrisForAFlow.getKey();
String[] flowQualifierWithStateQualifiers = stateStoreFileUri.getPath().split("/states/");
String[] statesUriPart = flowQualifierWithStateQualifiers[1].split("/");
String stateName = statesUriPart[0];
String taskRunValue = statesUriPart.length > 2 ? statesUriPart[1] : null;
String stateSubName = statesUriPart[statesUriPart.length - 1];
boolean flowScoped = flowQualifierWithStateQualifiers[0].endsWith("/" + flow.getId());
StateStore stateStore = new StateStore(runContext(runContextFactory, flow), false);
try (InputStream is = storageInterface.get(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri)) {
stateStore.putState(flowScoped, stateName, stateSubName, taskRunValue, is.readAllBytes());
storageInterface.delete(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
stdOut("Successfully ran the state-store migration.");
return 0;
}
private RunContext runContext(RunContextFactory runContextFactory, Flow flow) {
Map<String, String> flowVariables = new HashMap<>();
flowVariables.put("tenantId", flow.getTenantId());
flowVariables.put("id", flow.getId());
flowVariables.put("namespace", flow.getNamespace());
return runContextFactory.of(flow, Map.of("flow", flowVariables));
}
}

View File

@@ -1,34 +0,0 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.cli.commands.templates.namespaces.TemplateNamespaceCommand;
import io.kestra.core.models.templates.TemplateEnabled;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "template",
description = "Manage templates",
mixinStandardHelpOptions = true,
subcommands = {
TemplateNamespaceCommand.class,
TemplateValidateCommand.class,
TemplateExportCommand.class,
}
)
@Slf4j
@TemplateEnabled
public class TemplateCommand extends AbstractCommand {
@SneakyThrows
@Override
public Integer call() throws Exception {
super.call();
PicocliRunner.call(App.class, "template", "--help");
return 0;
}
}

View File

@@ -1,61 +0,0 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.AbstractApiCommand;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.templates.TemplateEnabled;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MediaType;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import java.nio.file.Files;
import java.nio.file.Path;
@CommandLine.Command(
name = "export",
description = "Export templates to a ZIP file",
mixinStandardHelpOptions = true
)
@Slf4j
@TemplateEnabled
public class TemplateExportCommand extends AbstractApiCommand {
private static final String DEFAULT_FILE_NAME = "templates.zip";
@Inject
private TenantIdSelectorService tenantService;
@CommandLine.Option(names = {"--namespace"}, description = "The namespace of templates to export")
public String namespace;
@CommandLine.Parameters(index = "0", description = "The directory to export the file to")
public Path directory;
@Override
public Integer call() throws Exception {
super.call();
try(DefaultHttpClient client = client()) {
MutableHttpRequest<Object> request = HttpRequest
.GET(apiUri("/templates/export/by-query", tenantService.getTenantId(tenantId)) + (namespace != null ? "?namespace=" + namespace : ""))
.accept(MediaType.APPLICATION_OCTET_STREAM);
HttpResponse<byte[]> response = client.toBlocking().exchange(this.requestOptions(request), byte[].class);
Path zipFile = Path.of(directory.toString(), DEFAULT_FILE_NAME);
zipFile.toFile().createNewFile();
Files.write(zipFile, response.body());
stdOut("Exporting template(s) for namespace '" + namespace + "' successfully done !");
} catch (HttpClientResponseException e) {
AbstractValidateCommand.handleHttpException(e, "template");
return 1;
}
return 0;
}
}

View File

@@ -1,35 +0,0 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.core.models.validations.ModelValidator;
import jakarta.inject.Inject;
import picocli.CommandLine;
import java.util.Collections;
@CommandLine.Command(
name = "validate",
description = "Validate a template"
)
@TemplateEnabled
public class TemplateValidateCommand extends AbstractValidateCommand {
@Inject
private ModelValidator modelValidator;
@Override
public Integer call() throws Exception {
return this.call(
Template.class,
modelValidator,
(Object object) -> {
Template template = (Template) object;
return template.getNamespace() + " / " + template.getId();
},
(Object object) -> Collections.emptyList(),
(Object object) -> Collections.emptyList()
);
}
}

View File

@@ -1,31 +0,0 @@
package io.kestra.cli.commands.templates.namespaces;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.core.models.templates.TemplateEnabled;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "namespace",
description = "Manage namespace templates",
mixinStandardHelpOptions = true,
subcommands = {
TemplateNamespaceUpdateCommand.class,
}
)
@Slf4j
@TemplateEnabled
public class TemplateNamespaceCommand extends AbstractCommand {
@SneakyThrows
@Override
public Integer call() throws Exception {
super.call();
PicocliRunner.call(App.class, "template", "namespace", "--help");
return 0;
}
}

View File

@@ -1,74 +0,0 @@
package io.kestra.cli.commands.templates.namespaces;
import io.kestra.cli.AbstractValidateCommand;
import io.kestra.cli.commands.AbstractServiceNamespaceUpdateCommand;
import io.kestra.cli.services.TenantIdSelectorService;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.templates.TemplateEnabled;
import io.kestra.core.serializers.YamlParser;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.netty.DefaultHttpClient;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import java.nio.file.Files;
import java.util.List;
import jakarta.validation.ConstraintViolationException;
@CommandLine.Command(
name = "update",
description = "Update namespace templates",
mixinStandardHelpOptions = true
)
@Slf4j
@TemplateEnabled
public class TemplateNamespaceUpdateCommand extends AbstractServiceNamespaceUpdateCommand {
@Inject
private TenantIdSelectorService tenantService;
@Override
public Integer call() throws Exception {
super.call();
try (var files = Files.walk(directory)) {
List<Template> templates = files
.filter(Files::isRegularFile)
.filter(YamlParser::isValidExtension)
.map(path -> YamlParser.parse(path.toFile(), Template.class))
.toList();
if (templates.isEmpty()) {
stdOut("No template found on '{}'", directory.toFile().getAbsolutePath());
}
try (DefaultHttpClient client = client()) {
MutableHttpRequest<List<Template>> request = HttpRequest
.POST(apiUri("/templates/", tenantService.getTenantIdAndAllowEETenants(tenantId)) + namespace + "?delete=" + delete, templates);
List<UpdateResult> updated = client.toBlocking().retrieve(
this.requestOptions(request),
Argument.listOf(UpdateResult.class)
);
stdOut(updated.size() + " template(s) for namespace '" + namespace + "' successfully updated !");
updated.forEach(template -> stdOut("- " + template.getNamespace() + "." + template.getId()));
} catch (HttpClientResponseException e) {
AbstractValidateCommand.handleHttpException(e, "template");
return 1;
}
} catch (ConstraintViolationException e) {
AbstractValidateCommand.handleException(e, "template");
return 1;
}
return 0;
}
}

View File

@@ -243,6 +243,10 @@ kestra:
ui-anonymous-usage-report:
enabled: true
ui:
charts:
default-duration: P30D
anonymous-usage-report:
enabled: true
uri: https://api.kestra.io/v1/reports/server-events

View File

@@ -14,7 +14,7 @@ import static org.assertj.core.api.Assertions.assertThat;
class FlowDotCommandTest {
@Test
void run() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("flows/same/first.yaml");
URL directory = FlowDotCommandTest.class.getClassLoader().getResource("flows/same/first.yaml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));

View File

@@ -1,41 +0,0 @@
package io.kestra.cli.commands.flows;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
class FlowExpandCommandTest {
@SuppressWarnings("deprecation")
@Test
void run() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
String[] args = {
"src/test/resources/helper/include.yaml"
};
Integer call = PicocliRunner.call(FlowExpandCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).isEqualTo("id: include\n" +
"namespace: io.kestra.cli\n" +
"\n" +
"# The list of tasks\n" +
"tasks:\n" +
"- id: t1\n" +
" type: io.kestra.plugin.core.debug.Return\n" +
" format: \"Lorem ipsum dolor sit amet\"\n" +
"- id: t2\n" +
" type: io.kestra.plugin.core.debug.Return\n" +
" format: |\n" +
" Lorem ipsum dolor sit amet\n" +
" Lorem ipsum dolor sit amet\n");
}
}
}

View File

@@ -61,7 +61,6 @@ class FlowValidateCommandTest {
assertThat(call).isZero();
assertThat(out.toString()).contains("✓ - system / warning");
assertThat(out.toString()).contains("⚠ - tasks[0] is deprecated");
assertThat(out.toString()).contains(" - io.kestra.core.tasks.log.Log is replaced by io.kestra.plugin.core.log.Log");
}
}

View File

@@ -1,62 +0,0 @@
package io.kestra.cli.commands.flows;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateValidateCommandTest {
@Test
void runLocal() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalids/empty.yaml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
String[] args = {
"--local",
directory.getPath()
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse flow");
assertThat(out.toString()).contains("must not be empty");
}
}
@Test
void runServer() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalids/empty.yaml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--plugins",
"/tmp", // pass this arg because it can cause failure
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
directory.getPath()
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse flow");
assertThat(out.toString()).contains("must not be empty");
}
}
}

View File

@@ -0,0 +1,147 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.App;
import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.models.kv.PersistedKvMetadata;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.KvMetadataRepositoryInterface;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.StorageObject;
import io.kestra.core.storages.kv.*;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.log.Log;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.core.annotation.NonNull;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
public class KvMetadataMigrationCommandTest {
@Test
void run() throws IOException, ResourceExpiredException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
ByteArrayOutputStream err = new ByteArrayOutputStream();
System.setErr(new PrintStream(err));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
/* Initial setup:
* - namespace 1: key, description, value
* - namespace 1: expiredKey
* - namespace 2: anotherKey, anotherDescription
* - Nothing in database */
String namespace = TestsUtils.randomNamespace();
String key = "myKey";
StorageInterface storage = ctx.getBean(StorageInterface.class);
String description = "Some description";
String value = "someValue";
putOldKv(storage, namespace, key, description, value);
String anotherNamespace = TestsUtils.randomNamespace();
String anotherKey = "anotherKey";
String anotherDescription = "another description";
putOldKv(storage, anotherNamespace, anotherKey, anotherDescription, "anotherValue");
String tenantId = TenantService.MAIN_TENANT;
// Expired KV should not be migrated + should be purged from the storage
String expiredKey = "expiredKey";
putOldKv(storage, namespace, expiredKey, Instant.now().minus(Duration.ofMinutes(5)), "some expired description", "expiredValue");
assertThat(storage.exists(tenantId, null, getKvStorageUri(namespace, expiredKey))).isTrue();
KvMetadataRepositoryInterface kvMetadataRepository = ctx.getBean(KvMetadataRepositoryInterface.class);
assertThat(kvMetadataRepository.findByName(tenantId, namespace, key).isPresent()).isFalse();
/* Expected outcome from the migration command:
* - no KV has been migrated because no flow exist in the namespace so they are not picked up because we don't know they exist */
String[] kvMetadataMigrationCommand = {
"migrate", "metadata", "kv"
};
PicocliRunner.call(App.class, ctx, kvMetadataMigrationCommand);
assertThat(out.toString()).contains("✅ KV Metadata migration complete.");
// Still it's not in the metadata repository because no flow exist to find that kv
assertThat(kvMetadataRepository.findByName(tenantId, namespace, key).isPresent()).isFalse();
assertThat(kvMetadataRepository.findByName(tenantId, anotherNamespace, anotherKey).isPresent()).isFalse();
// A flow is created from namespace 1, so the KV in this namespace should be migrated
FlowRepositoryInterface flowRepository = ctx.getBean(FlowRepositoryInterface.class);
flowRepository.create(GenericFlow.of(Flow.builder()
.tenantId(tenantId)
.id("a-flow")
.namespace(namespace)
.tasks(List.of(Log.builder().id("log").type(Log.class.getName()).message("logging").build()))
.build()));
/* We run the migration again:
* - namespace 1 KV is seen and metadata is migrated to database
* - namespace 2 KV is not seen because no flow exist in this namespace
* - expiredKey is deleted from storage and not migrated */
out.reset();
PicocliRunner.call(App.class, ctx, kvMetadataMigrationCommand);
assertThat(out.toString()).contains("✅ KV Metadata migration complete.");
Optional<PersistedKvMetadata> foundKv = kvMetadataRepository.findByName(tenantId, namespace, key);
assertThat(foundKv.isPresent()).isTrue();
assertThat(foundKv.get().getDescription()).isEqualTo(description);
assertThat(kvMetadataRepository.findByName(tenantId, anotherNamespace, anotherKey).isPresent()).isFalse();
KVStore kvStore = new InternalKVStore(tenantId, namespace, storage, kvMetadataRepository);
Optional<KVEntry> actualKv = kvStore.get(key);
assertThat(actualKv.isPresent()).isTrue();
assertThat(actualKv.get().description()).isEqualTo(description);
Optional<KVValue> actualValue = kvStore.getValue(key);
assertThat(actualValue.isPresent()).isTrue();
assertThat(actualValue.get().value()).isEqualTo(value);
assertThat(kvMetadataRepository.findByName(tenantId, namespace, expiredKey).isPresent()).isFalse();
assertThat(storage.exists(tenantId, null, getKvStorageUri(namespace, expiredKey))).isFalse();
/* We run one last time the migration without any change to verify that we don't resave an existing metadata.
* It covers the case where user didn't perform the migrate command yet but they played and added some KV from the UI (so those ones will already be in metadata database). */
out.reset();
PicocliRunner.call(App.class, ctx, kvMetadataMigrationCommand);
assertThat(out.toString()).contains("✅ KV Metadata migration complete.");
foundKv = kvMetadataRepository.findByName(tenantId, namespace, key);
assertThat(foundKv.get().getVersion()).isEqualTo(1);
}
}
private static void putOldKv(StorageInterface storage, String namespace, String key, String description, String value) throws IOException {
putOldKv(storage, namespace, key, Instant.now().plus(Duration.ofMinutes(5)), description, value);
}
private static void putOldKv(StorageInterface storage, String namespace, String key, Instant expirationDate, String description, String value) throws IOException {
URI kvStorageUri = getKvStorageUri(namespace, key);
KVValueAndMetadata kvValueAndMetadata = new KVValueAndMetadata(new KVMetadata(description, expirationDate), value);
storage.put(TenantService.MAIN_TENANT, namespace, kvStorageUri, new StorageObject(
kvValueAndMetadata.metadataAsMap(),
new ByteArrayInputStream(JacksonMapper.ofIon().writeValueAsBytes(kvValueAndMetadata.value()))
));
}
private static @NonNull URI getKvStorageUri(String namespace, String key) {
return URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.kvPrefix(namespace) + "/" + key + ".ion");
}
}

View File

@@ -0,0 +1,29 @@
package io.kestra.cli.commands.migrations.metadata;
import io.kestra.cli.App;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
public class SecretsMetadataMigrationCommandTest {
@Test
void run() {
ByteArrayOutputStream err = new ByteArrayOutputStream();
System.setErr(new PrintStream(err));
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
String[] secretMetadataMigrationCommand = {
"migrate", "metadata", "secrets"
};
PicocliRunner.call(App.class, ctx, secretMetadataMigrationCommand);
assertThat(err.toString()).contains("❌ Secrets Metadata migration failed: Secret migration is not needed in the OSS version");
}
}
}

View File

@@ -1,27 +0,0 @@
package io.kestra.cli.commands.sys.statestore;
import io.kestra.cli.commands.sys.database.DatabaseCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
class StateStoreCommandTest {
@Test
void runWithNoParam() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
String[] args = {};
Integer call = PicocliRunner.call(StateStoreCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra sys state-store");
}
}
}

View File

@@ -1,75 +0,0 @@
package io.kestra.cli.commands.sys.statestore;
import io.kestra.core.exceptions.MigrationRequiredException;
import io.kestra.core.exceptions.ResourceExpiredException;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.StateStore;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.Hashing;
import io.kestra.core.utils.Slugify;
import io.kestra.plugin.core.log.Log;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class StateStoreMigrateCommandTest {
@Test
void runMigration() throws IOException, ResourceExpiredException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).environments("test").start()) {
FlowRepositoryInterface flowRepository = ctx.getBean(FlowRepositoryInterface.class);
Flow flow = Flow.builder()
.tenantId("my-tenant")
.id("a-flow")
.namespace("some.valid.namespace." + ((int) (Math.random() * 1000000)))
.tasks(List.of(Log.builder().id("log").type(Log.class.getName()).message("logging").build()))
.build();
flowRepository.create(GenericFlow.of(flow));
StorageInterface storage = ctx.getBean(StorageInterface.class);
String tenantId = flow.getTenantId();
URI oldStateStoreUri = URI.create("/" + flow.getNamespace().replace(".", "/") + "/" + Slugify.of("a-flow") + "/states/my-state/" + Hashing.hashToString("my-taskrun-value") + "/sub-name");
storage.put(
tenantId,
flow.getNamespace(),
oldStateStoreUri,
new ByteArrayInputStream("my-value".getBytes())
);
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isTrue();
RunContext runContext = ctx.getBean(RunContextFactory.class).of(flow, Map.of("flow", Map.of(
"tenantId", tenantId,
"id", flow.getId(),
"namespace", flow.getNamespace()
)));
StateStore stateStore = new StateStore(runContext, true);
Assertions.assertThrows(MigrationRequiredException.class, () -> stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value"));
String[] args = {};
Integer call = PicocliRunner.call(StateStoreMigrateCommand.class, ctx, args);
assertThat(new String(stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value").readAllBytes())).isEqualTo("my-value");
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isFalse();
assertThat(call).isZero();
}
}
}

View File

@@ -1,65 +0,0 @@
package io.kestra.cli.commands.templates;
import io.kestra.cli.commands.templates.namespaces.TemplateNamespaceUpdateCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URL;
import java.util.Map;
import java.util.zip.ZipFile;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateExportCommandTest {
@Test
void run() throws IOException {
URL directory = TemplateExportCommandTest.class.getClassLoader().getResource("templates");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
// we use the update command to add templates to extract
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("3 template(s)");
// then we export them
String[] exportArgs = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"--namespace",
"io.kestra.tests",
"/tmp",
};
PicocliRunner.call(TemplateExportCommand.class, ctx, exportArgs);
File file = new File("/tmp/templates.zip");
assertThat(file.exists()).isTrue();
ZipFile zipFile = new ZipFile(file);
assertThat(zipFile.stream().count()).isEqualTo(3L);
file.delete();
}
}
}

View File

@@ -1,61 +0,0 @@
package io.kestra.cli.commands.templates;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateValidateCommandTest {
@Test
void runLocal() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalidsTemplates/template.yml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
String[] args = {
"--local",
directory.getPath()
};
Integer call = PicocliRunner.call(TemplateValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse template");
assertThat(out.toString()).contains("must not be empty");
}
}
@Test
void runServer() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalidsTemplates/template.yml");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
directory.getPath()
};
Integer call = PicocliRunner.call(TemplateValidateCommand.class, ctx, args);
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse template");
assertThat(out.toString()).contains("must not be empty");
}
}
}

View File

@@ -1,26 +0,0 @@
package io.kestra.cli.commands.templates.namespaces;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateNamespaceCommandTest {
@Test
void runWithNoParam() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.builder().deduceEnvironment(false).start()) {
String[] args = {};
Integer call = PicocliRunner.call(TemplateNamespaceCommand.class, ctx, args);
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra template namespace");
}
}
}

View File

@@ -1,112 +0,0 @@
package io.kestra.cli.commands.templates.namespaces;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateNamespaceUpdateCommandTest {
@Test
void run() {
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("3 template(s)");
}
}
@Test
void invalid() {
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("invalidsTemplates");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setErr(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
Integer call = PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
// assertThat(call, is(1));
assertThat(out.toString()).contains("Unable to parse templates");
assertThat(out.toString()).contains("must not be empty");
}
}
@Test
void runNoDelete() {
URL directory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates");
URL subDirectory = TemplateNamespaceUpdateCommandTest.class.getClassLoader().getResource("templates/templatesSubFolder");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));
try (ApplicationContext ctx = ApplicationContext.run(Map.of("kestra.templates.enabled", "true"), Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();
String[] args = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
directory.getPath(),
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString()).contains("3 template(s)");
String[] newArgs = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.tests",
subDirectory.getPath(),
"--no-delete"
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, newArgs);
assertThat(out.toString()).contains("1 template(s)");
}
}
}

View File

@@ -3,8 +3,8 @@ namespace: system
tasks:
- id: deprecated
type: io.kestra.plugin.core.debug.Echo
format: Hello World
type: io.kestra.plugin.core.log.Log
message: Hello World
- id: alias
type: io.kestra.core.tasks.log.Log
message: I'm an alias

View File

@@ -4,7 +4,6 @@ import io.kestra.core.models.dashboards.Dashboard;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.PluginDefault;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.triggers.AbstractTrigger;
import jakarta.inject.Singleton;
@@ -36,7 +35,6 @@ public class JsonSchemaCache {
public JsonSchemaCache(final JsonSchemaGenerator jsonSchemaGenerator) {
this.jsonSchemaGenerator = Objects.requireNonNull(jsonSchemaGenerator, "JsonSchemaGenerator cannot be null");
registerClassForType(SchemaType.FLOW, Flow.class);
registerClassForType(SchemaType.TEMPLATE, Template.class);
registerClassForType(SchemaType.TASK, Task.class);
registerClassForType(SchemaType.TRIGGER, AbstractTrigger.class);
registerClassForType(SchemaType.PLUGINDEFAULT, PluginDefault.class);

View File

@@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ScheduleCondition;
import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.DataFilterKPI;
import io.kestra.core.models.dashboards.charts.Chart;
@@ -64,7 +63,7 @@ import static io.kestra.core.serializers.JacksonMapper.MAP_TYPE_REFERENCE;
@Singleton
@Slf4j
public class JsonSchemaGenerator {
private static final List<Class<?>> TYPES_RESOLVED_AS_STRING = List.of(Duration.class, LocalTime.class, LocalDate.class, LocalDateTime.class, ZonedDateTime.class, OffsetDateTime.class, OffsetTime.class);
private static final List<Class<?>> SUBTYPE_RESOLUTION_EXCLUSION_FOR_PLUGIN_SCHEMA = List.of(Task.class, AbstractTrigger.class);
@@ -277,8 +276,8 @@ public class JsonSchemaGenerator {
.with(Option.DEFINITION_FOR_MAIN_SCHEMA)
.with(Option.PLAIN_DEFINITION_KEYS)
.with(Option.ALLOF_CLEANUP_AT_THE_END);
// HACK: Registered a custom JsonUnwrappedDefinitionProvider prior to the JacksonModule
// HACK: Registered a custom JsonUnwrappedDefinitionProvider prior to the JacksonModule
// to be able to return an CustomDefinition with an empty node when the ResolvedType can't be found.
builder.forTypesInGeneral().withCustomDefinitionProvider(new JsonUnwrappedDefinitionProvider(){
@Override
@@ -320,7 +319,7 @@ public class JsonSchemaGenerator {
// inline some type
builder.forTypesInGeneral()
.withCustomDefinitionProvider(new CustomDefinitionProviderV2() {
@Override
public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) {
if (javaType.isInstanceOf(Map.class) || javaType.isInstanceOf(Enum.class)) {
@@ -688,15 +687,6 @@ public class JsonSchemaGenerator {
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
.flatMap(clz -> safelyResolveSubtype(declaredType, clz, typeContext).stream())
.toList();
} else if (declaredType.getErasedType() == ScheduleCondition.class) {
return getRegisteredPlugins()
.stream()
.flatMap(registeredPlugin -> registeredPlugin.getConditions().stream())
.filter(ScheduleCondition.class::isAssignableFrom)
.filter(p -> allowedPluginTypes.isEmpty() || allowedPluginTypes.contains(p.getName()))
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
.flatMap(clz -> safelyResolveSubtype(declaredType, clz, typeContext).stream())
.toList();
} else if (declaredType.getErasedType() == TaskRunner.class) {
return getRegisteredPlugins()
.stream()

View File

@@ -6,7 +6,6 @@ import io.kestra.core.utils.Enums;
public enum SchemaType {
FLOW,
TEMPLATE,
TASK,
TRIGGER,
PLUGINDEFAULT,

View File

@@ -91,11 +91,13 @@ public class HttpConfiguration {
@Deprecated
private final String proxyPassword;
@Schema(title = "The username for HTTP basic authentication.")
@Schema(title = "The username for HTTP basic authentication. " +
"Deprecated, use `auth` property with a `BasicAuthConfiguration` instance instead.")
@Deprecated
private final String basicAuthUser;
@Schema(title = "The password for HTTP basic authentication.")
@Schema(title = "The password for HTTP basic authentication. " +
"Deprecated, use `auth` property with a `BasicAuthConfiguration` instance instead.")
@Deprecated
private final String basicAuthPassword;

View File

@@ -0,0 +1,27 @@
package io.kestra.core.lock;
import io.kestra.core.models.HasUID;
import io.kestra.core.utils.IdUtils;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import java.time.Instant;
import java.time.LocalDateTime;
@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Lock implements HasUID {
private String category;
private String id;
private String owner;
private Instant createdAt;
@Override
public String uid() {
return IdUtils.fromParts(this.category, this.id);
}
}

View File

@@ -0,0 +1,13 @@
package io.kestra.core.lock;
import io.kestra.core.exceptions.KestraRuntimeException;
public class LockException extends KestraRuntimeException {
public LockException(String message) {
super(message);
}
public LockException(Throwable cause) {
super(cause);
}
}

View File

@@ -0,0 +1,195 @@
package io.kestra.core.lock;
import io.kestra.core.repositories.LockRepositoryInterface;
import io.kestra.core.server.ServerInstance;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
/**
* This service provides facility for executing Runnable and Callable tasks inside a lock.
* Note: it may be handy to provide a tryLock facility that, if locked, skips executing the Runnable or Callable and exits immediately.
*
* @implNote There is no expiry for locks, so a service may hold a lock infinitely until the service is restarted as the
* liveness mechanism releases all locks when the service is unreachable.
* This may be improved at some point by adding an expiry (for ex 30s) and running a thread that will periodically
* increase the expiry for all exiting locks. This should allow quicker recovery of zombie locks than relying on the liveness mechanism,
* as a service wanted to lock an expired lock would be able to take it over.
*/
@Slf4j
@Singleton
public class LockService {
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(300);
private static final int DEFAULT_SLEEP_MS = 1;
private final LockRepositoryInterface lockRepository;
@Inject
public LockService(LockRepositoryInterface lockRepository) {
this.lockRepository = lockRepository;
}
/**
* Executes a Runnable inside a lock.
* If the lock is already taken, it will wait for at most the default lock timeout of 5mn.
* @see #doInLock(String, String, Duration, Runnable)
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public void doInLock(String category, String id, Runnable runnable) {
doInLock(category, id, DEFAULT_TIMEOUT, runnable);
}
/**
* Executes a Runnable inside a lock.
* If the lock is already taken, it will wait for at most the <code>timeout</code> duration.
* @see #doInLock(String, String, Runnable)
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
* @param timeout how much time to wait for the lock if another process already holds the same lock
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public void doInLock(String category, String id, Duration timeout, Runnable runnable) {
if (!lock(category, id, timeout)) {
throw new LockException("Unable to hold the lock inside the configured timeout of " + timeout);
}
try {
runnable.run();
} finally {
unlock(category, id);
}
}
/**
* Attempts to execute the provided {@code runnable} within a lock.
* If the lock is already held by another process, the execution is skipped.
*
* @param category the category of the lock, e.g., 'executions'
* @param id the identifier of the lock within the specified category, e.g., an execution ID
* @param runnable the task to be executed if the lock is successfully acquired
*/
public void tryLock(String category, String id, Runnable runnable) {
if (lock(category, id, Duration.ZERO)) {
try {
runnable.run();
} finally {
unlock(category, id);
}
} else {
log.debug("Lock '{}'.'{}' already hold, skipping", category, id);
}
}
/**
* Executes a Callable inside a lock.
* If the lock is already taken, it will wait for at most the default lock timeout of 5mn.
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public <T> T callInLock(String category, String id, Callable<T> callable) throws Exception {
return callInLock(category, id, DEFAULT_TIMEOUT, callable);
}
/**
* Executes a Callable inside a lock.
* If the lock is already taken, it will wait for at most the <code>timeout</code> duration.
*
* @param category lock category, ex 'executions'
* @param id identifier of the lock identity inside the category, ex an execution ID
* @param timeout how much time to wait for the lock if another process already holds the same lock
*
* @throws LockException if the lock cannot be hold before the timeout or the thread is interrupted.
*/
public <T> T callInLock(String category, String id, Duration timeout, Callable<T> callable) throws Exception {
if (!lock(category, id, timeout)) {
throw new LockException("Unable to hold the lock inside the configured timeout of " + timeout);
}
try {
return callable.call();
} finally {
unlock(category, id);
}
}
/**
* Release all locks hold by this service identifier.
*/
public List<Lock> releaseAllLocks(String serviceId) {
return lockRepository.deleteByOwner(serviceId);
}
/**
* @return true if the lock identified by this category and identifier already exist.
*/
public boolean isLocked(String category, String id) {
return lockRepository.findById(category, id).isPresent();
}
private boolean lock(String category, String id, Duration timeout) throws LockException {
log.debug("Locking '{}'.'{}'", category, id);
long deadline = System.currentTimeMillis() + timeout.toMillis();
do {
Optional<Lock> existing = lockRepository.findById(category, id);
if (existing.isEmpty()) {
// we can try to lock!
Lock newLock = new Lock(category, id, ServerInstance.INSTANCE_ID, Instant.now());
if (lockRepository.create(newLock)) {
return true;
} else {
log.debug("Cannot create the lock, it may have been created after we check for its existence and before we create it");
}
} else {
log.debug("Already locked by: {}", existing.get().getOwner());
}
// fast path for when we don't want to wait for the lock
if (timeout.isZero()) {
return false;
}
try {
Thread.sleep(DEFAULT_SLEEP_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new LockException(e);
}
} while (System.currentTimeMillis() < deadline);
log.debug("Lock already hold, waiting for it to be released");
return false;
}
private void unlock(String category, String id) {
log.debug("Unlocking '{}'.'{}'", category, id);
Optional<Lock> existing = lockRepository.findById(category, id);
if (existing.isEmpty()) {
log.warn("Try to unlock unknown lock '{}'.'{}', ignoring it", category, id);
return;
}
if (!existing.get().getOwner().equals(ServerInstance.INSTANCE_ID)) {
log.warn("Try to unlock a lock we no longer own '{}'.'{}', ignoring it", category, id);
return;
}
lockRepository.deleteById(category, id);
}
}

View File

@@ -0,0 +1,7 @@
package io.kestra.core.models;
public enum FetchVersion {
LATEST,
OLD,
ALL
}

View File

@@ -100,7 +100,7 @@ public record QueryFilter(
LABELS("labels") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS);
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.IN, Op.NOT_IN, Op.CONTAINS);
}
},
FLOW_ID("flowId") {
@@ -109,6 +109,12 @@ public record QueryFilter(
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
}
},
UPDATED("updated") {
@Override
public List<Op> supportedOp() {
return List.of(Op.GREATER_THAN_OR_EQUAL_TO, Op.GREATER_THAN, Op.LESS_THAN_OR_EQUAL_TO, Op.LESS_THAN, Op.EQUALS, Op.NOT_EQUALS);
}
},
START_DATE("startDate") {
@Override
public List<Op> supportedOp() {
@@ -259,6 +265,16 @@ public record QueryFilter(
Field.NAMESPACE
);
}
},
KV_METADATA {
@Override
public List<Field> supportedField() {
return List.of(
Field.QUERY,
Field.NAMESPACE,
Field.UPDATED
);
}
};
public abstract List<Field> supportedField();

View File

@@ -0,0 +1,3 @@
package io.kestra.core.models;
public record TenantAndNamespace(String tenantId, String namespace) {}

View File

@@ -2,7 +2,11 @@ package io.kestra.core.models.conditions;
import io.kestra.core.exceptions.InternalException;
/**
* Conditions of type ScheduleCondition have a special behavior inside the {@link io.kestra.plugin.core.trigger.Schedule} trigger.
* They are evaluated specifically and would be taken into account when computing the next evaluation date.
* Only conditions based on date should be marked as ScheduleCondition.
*/
public interface ScheduleCondition {
boolean test(ConditionContext conditionContext) throws InternalException;
}

View File

@@ -28,6 +28,7 @@ import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.MapUtils;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
@@ -77,10 +78,12 @@ public class Execution implements DeletedInterface, TenantInterface {
@With
@JsonInclude(JsonInclude.Include.NON_EMPTY)
@Schema(implementation = Object.class)
Map<String, Object> inputs;
@With
@JsonInclude(JsonInclude.Include.NON_EMPTY)
@Schema(implementation = Object.class)
Map<String, Object> outputs;
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
@@ -88,6 +91,7 @@ public class Execution implements DeletedInterface, TenantInterface {
List<Label> labels;
@With
@Schema(implementation = Object.class)
Map<String, Object> variables;
@NotNull
@@ -941,7 +945,15 @@ public class Execution implements DeletedInterface, TenantInterface {
for (TaskRun current : taskRuns) {
if (!MapUtils.isEmpty(current.getOutputs())) {
if (current.getIteration() != null) {
taskOutputs = MapUtils.merge(taskOutputs, outputs(current, byIds));
Map<String, Object> merged = MapUtils.merge(taskOutputs, outputs(current, byIds));
// If one of two of the map is null in the merge() method, we just return the other
// And if the not null map is a Variables (= read only), we cast it back to a simple
// hashmap to avoid taskOutputs becoming read-only
// i.e this happen in nested loopUntil tasks
if (merged instanceof Variables) {
merged = new HashMap<>(merged);
}
taskOutputs = merged;
} else {
taskOutputs.putAll(outputs(current, byIds));
}

View File

@@ -9,6 +9,7 @@ import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.utils.IdUtils;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
@@ -55,6 +56,7 @@ public class TaskRun implements TenantInterface {
@With
@JsonInclude(JsonInclude.Include.ALWAYS)
@Nullable
@Schema(implementation = Object.class)
Variables outputs;
@NotNull
@@ -195,17 +197,17 @@ public class TaskRun implements TenantInterface {
taskRunBuilder.attempts = new ArrayList<>();
taskRunBuilder.attempts.add(TaskRunAttempt.builder()
.state(new State(this.state, State.Type.KILLED))
.state(new State(this.state, State.Type.RESUBMITTED))
.build()
);
} else {
ArrayList<TaskRunAttempt> taskRunAttempts = new ArrayList<>(taskRunBuilder.attempts);
TaskRunAttempt lastAttempt = taskRunAttempts.get(taskRunBuilder.attempts.size() - 1);
if (!lastAttempt.getState().isTerminated()) {
taskRunAttempts.set(taskRunBuilder.attempts.size() - 1, lastAttempt.withState(State.Type.KILLED));
taskRunAttempts.set(taskRunBuilder.attempts.size() - 1, lastAttempt.withState(State.Type.RESUBMITTED));
} else {
taskRunAttempts.add(TaskRunAttempt.builder()
.state(new State().withState(State.Type.KILLED))
.state(new State().withState(State.Type.RESUBMITTED))
.build()
);
}

View File

@@ -12,7 +12,6 @@ import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.flows.sla.SLA;
import io.kestra.core.models.listeners.Listener;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
@@ -85,10 +84,6 @@ public class Flow extends AbstractFlow implements HasUID {
return this._finally;
}
@Valid
@Deprecated
List<Listener> listeners;
@Valid
List<Task> afterExecution;
@@ -98,20 +93,6 @@ public class Flow extends AbstractFlow implements HasUID {
@Valid
List<PluginDefault> pluginDefaults;
@Valid
List<PluginDefault> taskDefaults;
@Deprecated
public void setTaskDefaults(List<PluginDefault> taskDefaults) {
this.pluginDefaults = taskDefaults;
this.taskDefaults = taskDefaults;
}
@Deprecated
public List<PluginDefault> getTaskDefaults() {
return this.taskDefaults;
}
@Valid
Concurrency concurrency;
@@ -144,7 +125,7 @@ public class Flow extends AbstractFlow implements HasUID {
this.tasks != null ? this.tasks : Collections.<Task>emptyList(),
this.errors != null ? this.errors : Collections.<Task>emptyList(),
this._finally != null ? this._finally : Collections.<Task>emptyList(),
this.afterExecutionTasks()
this.afterExecution != null ? this.afterExecution : Collections.<Task>emptyList()
)
.flatMap(Collection::stream);
}
@@ -245,55 +226,6 @@ public class Flow extends AbstractFlow implements HasUID {
.orElse(null);
}
/**
* @deprecated should not be used
*/
@Deprecated(forRemoval = true, since = "0.21.0")
public Flow updateTask(String taskId, Task newValue) throws InternalException {
Task task = this.findTaskByTaskId(taskId);
Flow flow = this instanceof FlowWithSource flowWithSource ? flowWithSource.toFlow() : this;
Map<String, Object> map = NON_DEFAULT_OBJECT_MAPPER.convertValue(flow, JacksonMapper.MAP_TYPE_REFERENCE);
return NON_DEFAULT_OBJECT_MAPPER.convertValue(
recursiveUpdate(map, task, newValue),
Flow.class
);
}
private static Object recursiveUpdate(Object object, Task previous, Task newValue) {
if (object instanceof Map<?, ?> value) {
if (value.containsKey("id") && value.get("id").equals(previous.getId()) &&
value.containsKey("type") && value.get("type").equals(previous.getType())
) {
return NON_DEFAULT_OBJECT_MAPPER.convertValue(newValue, JacksonMapper.MAP_TYPE_REFERENCE);
} else {
return value
.entrySet()
.stream()
.map(e -> new AbstractMap.SimpleEntry<>(
e.getKey(),
recursiveUpdate(e.getValue(), previous, newValue)
))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
} else if (object instanceof Collection<?> value) {
return value
.stream()
.map(r -> recursiveUpdate(r, previous, newValue))
.toList();
} else {
return object;
}
}
private List<Task> afterExecutionTasks() {
return ListUtils.concat(
ListUtils.emptyOnNull(this.getListeners()).stream().flatMap(listener -> listener.getTasks().stream()).toList(),
this.getAfterExecution()
);
}
public boolean equalsWithoutRevision(FlowInterface o) {
try {
return WITHOUT_REVISION_OBJECT_MAPPER.writeValueAsString(this).equals(WITHOUT_REVISION_OBJECT_MAPPER.writeValueAsString(o));

View File

@@ -19,7 +19,6 @@ public class FlowWithSource extends Flow {
String source;
@SuppressWarnings("deprecation")
public Flow toFlow() {
return Flow.builder()
.tenantId(this.tenantId)
@@ -34,7 +33,6 @@ public class FlowWithSource extends Flow {
.tasks(this.tasks)
.errors(this.errors)
._finally(this._finally)
.listeners(this.listeners)
.afterExecution(this.afterExecution)
.triggers(this.triggers)
.pluginDefaults(this.pluginDefaults)
@@ -60,7 +58,6 @@ public class FlowWithSource extends Flow {
.build();
}
@SuppressWarnings("deprecation")
public static FlowWithSource of(Flow flow, String source) {
return FlowWithSource.builder()
.tenantId(flow.tenantId)
@@ -76,7 +73,6 @@ public class FlowWithSource extends Flow {
.errors(flow.errors)
._finally(flow._finally)
.afterExecution(flow.afterExecution)
.listeners(flow.listeners)
.triggers(flow.triggers)
.pluginDefaults(flow.pluginDefaults)
.disabled(flow.disabled)

View File

@@ -1,10 +1,10 @@
package io.kestra.core.models.flows;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.kestra.core.models.flows.input.*;
import io.kestra.core.models.property.Property;
import io.kestra.core.validations.InputValidation;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.ConstraintViolationException;
@@ -25,7 +25,6 @@ import lombok.experimental.SuperBuilder;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY)
@JsonSubTypes({
@JsonSubTypes.Type(value = ArrayInput.class, name = "ARRAY"),
@JsonSubTypes.Type(value = BooleanInput.class, name = "BOOLEAN"),
@JsonSubTypes.Type(value = BoolInput.class, name = "BOOL"),
@JsonSubTypes.Type(value = DateInput.class, name = "DATE"),
@JsonSubTypes.Type(value = DateTimeInput.class, name = "DATETIME"),
@@ -36,7 +35,6 @@ import lombok.experimental.SuperBuilder;
@JsonSubTypes.Type(value = JsonInput.class, name = "JSON"),
@JsonSubTypes.Type(value = SecretInput.class, name = "SECRET"),
@JsonSubTypes.Type(value = StringInput.class, name = "STRING"),
@JsonSubTypes.Type(value = EnumInput.class, name = "ENUM"),
@JsonSubTypes.Type(value = SelectInput.class, name = "SELECT"),
@JsonSubTypes.Type(value = TimeInput.class, name = "TIME"),
@JsonSubTypes.Type(value = URIInput.class, name = "URI"),
@@ -44,6 +42,7 @@ import lombok.experimental.SuperBuilder;
@JsonSubTypes.Type(value = YamlInput.class, name = "YAML"),
@JsonSubTypes.Type(value = EmailInput.class, name = "EMAIL"),
})
@InputValidation
public abstract class Input<T> implements Data {
@Schema(
title = "The ID of the input."
@@ -53,9 +52,6 @@ public abstract class Input<T> implements Data {
@Pattern(regexp="^[a-zA-Z0-9][.a-zA-Z0-9_-]*")
String id;
@Deprecated
String name;
@Schema(
title = "The type of the input."
)
@@ -80,20 +76,17 @@ public abstract class Input<T> implements Data {
title = "The default value to use if no value is specified."
)
Property<T> defaults;
@Schema(
title = "The suggested value for the input.",
description = "Optional UI hint for pre-filling the input. Cannot be used together with a default value."
)
Property<T> prefill;
@Schema(
title = "The display name of the input."
)
String displayName;
public abstract void validate(T input) throws ConstraintViolationException;
@JsonSetter
public void setName(String name) {
if (this.id == null) {
this.id = name;
}
this.name = name;
}
}

View File

@@ -236,14 +236,15 @@ public class State {
RETRYING,
RETRIED,
SKIPPED,
BREAKPOINT;
BREAKPOINT,
RESUBMITTED;
public boolean isTerminated() {
return this == Type.FAILED || this == Type.WARNING || this == Type.SUCCESS || this == Type.KILLED || this == Type.CANCELLED || this == Type.RETRIED || this == Type.SKIPPED;
return this == Type.FAILED || this == Type.WARNING || this == Type.SUCCESS || this == Type.KILLED || this == Type.CANCELLED || this == Type.RETRIED || this == Type.SKIPPED || this == Type.RESUBMITTED;
}
public boolean isTerminatedNoFail() {
return this == Type.WARNING || this == Type.SUCCESS || this == Type.RETRIED || this == Type.SKIPPED;
return this == Type.WARNING || this == Type.SUCCESS || this == Type.RETRIED || this == Type.SKIPPED || this == Type.RESUBMITTED;
}
public boolean isCreated() {

View File

@@ -9,11 +9,9 @@ import io.micronaut.core.annotation.Introspected;
@Introspected
public enum Type {
STRING(StringInput.class.getName()),
ENUM(EnumInput.class.getName()),
SELECT(SelectInput.class.getName()),
INT(IntInput.class.getName()),
FLOAT(FloatInput.class.getName()),
BOOLEAN(BooleanInput.class.getName()),
BOOL(BoolInput.class.getName()),
DATETIME(DateTimeInput.class.getName()),
DATE(DateInput.class.getName()),

View File

@@ -1,19 +0,0 @@
package io.kestra.core.models.flows.input;
import io.kestra.core.models.flows.Input;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import jakarta.validation.ConstraintViolationException;
@SuperBuilder
@Getter
@NoArgsConstructor
@Deprecated
public class BooleanInput extends Input<Boolean> {
@Override
public void validate(Boolean input) throws ConstraintViolationException {
// no validation yet
}
}

View File

@@ -1,39 +0,0 @@
package io.kestra.core.models.flows.input;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.validations.Regex;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.constraints.NotNull;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.List;
@SuperBuilder
@Getter
@NoArgsConstructor
@Deprecated
public class EnumInput extends Input<String> {
@Schema(
title = "List of values.",
description = "DEPRECATED; use 'SELECT' instead."
)
@NotNull
List<@Regex String> values;
@Override
public void validate(String input) throws ConstraintViolationException {
if (!values.contains(input) && this.getRequired()) {
throw ManualConstraintViolation.toConstraintViolationException(
"it must match the values `" + values + "`",
this,
EnumInput.class,
getId(),
input
);
}
}
}

View File

@@ -4,8 +4,6 @@ import java.util.Set;
import io.kestra.core.models.flows.Input;
import io.kestra.core.validations.FileInputValidation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@@ -19,17 +17,14 @@ import java.util.List;
@FileInputValidation
public class FileInput extends Input<URI> {
private static final String DEFAULT_EXTENSION = ".upl";
public static final String DEFAULT_EXTENSION = ".upl";
@Deprecated(since = "0.24", forRemoval = true)
public String extension;
/**
* List of allowed file extensions (e.g., [".csv", ".txt", ".pdf"]).
* Each extension must start with a dot.
*/
private List<String> allowedFileExtensions;
/**
* Gets the file extension from the URI's path
*/
@@ -53,15 +48,4 @@ public class FileInput extends Input<URI> {
);
}
}
public static String findFileInputExtension(@NotNull final List<Input<?>> inputs, @NotNull final String fileName) {
String res = inputs.stream()
.filter(in -> in instanceof FileInput)
.filter(in -> in.getId().equals(fileName))
.filter(flowInput -> ((FileInput) flowInput).getExtension() != null)
.map(flowInput -> ((FileInput) flowInput).getExtension())
.findFirst()
.orElse(FileInput.DEFAULT_EXTENSION);
return res.startsWith(".") ? res : "." + res;
}
}

View File

@@ -1,12 +0,0 @@
package io.kestra.core.models.flows.sla;
import java.time.Instant;
import java.util.function.Consumer;
public interface SLAMonitorStorage {
void save(SLAMonitor slaMonitor);
void purge(String executionId);
void processExpired(Instant now, Consumer<SLAMonitor> consumer);
}

View File

@@ -0,0 +1,79 @@
package io.kestra.core.models.kv;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.storages.kv.KVEntry;
import io.kestra.core.utils.IdUtils;
import io.swagger.v3.oas.annotations.Hidden;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import lombok.*;
import lombok.experimental.FieldDefaults;
import lombok.extern.slf4j.Slf4j;
import java.time.Instant;
import java.util.Optional;
@Builder(toBuilder = true)
@Slf4j
@Getter
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
@AllArgsConstructor
@ToString
@EqualsAndHashCode
public class PersistedKvMetadata implements DeletedInterface, TenantInterface, HasUID {
@With
@Hidden
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
private String tenantId;
@NotNull
private String namespace;
@NotNull
private String name;
private String description;
@NotNull
private Integer version;
@Builder.Default
private boolean last = true;
@Nullable
private Instant expirationDate;
@Nullable
private Instant created;
@Nullable
private Instant updated;
private boolean deleted;
public static PersistedKvMetadata from(String tenantId, KVEntry kvEntry) {
return PersistedKvMetadata.builder()
.tenantId(tenantId)
.namespace(kvEntry.namespace())
.name(kvEntry.key())
.version(kvEntry.version())
.description(kvEntry.description())
.created(kvEntry.creationDate())
.updated(kvEntry.updateDate())
.expirationDate(kvEntry.expirationDate())
.build();
}
public PersistedKvMetadata asLast() {
Instant saveDate = Instant.now();
return this.toBuilder().created(Optional.ofNullable(this.created).orElse(saveDate)).updated(saveDate).last(true).build();
}
@Override
public String uid() {
return IdUtils.fromParts(getTenantId(), getNamespace(), getName(), getVersion().toString());
}
}

View File

@@ -1,25 +0,0 @@
package io.kestra.core.models.listeners;
import io.micronaut.core.annotation.Introspected;
import lombok.Builder;
import lombok.Value;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.tasks.Task;
import java.util.List;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
@Value
@Builder
@Introspected
public class Listener {
String description;
@Valid
List<Condition> conditions;
@Valid
@NotEmpty
List<Task> tasks;
}

View File

@@ -54,12 +54,7 @@ public class Property<T> {
private String expression;
private T value;
/**
* @deprecated use {@link #ofExpression(String)} instead.
*/
@Deprecated
// Note: when not used, this constructor would not be deleted but made private so it can only be used by ofExpression(String) and the deserializer
public Property(String expression) {
private Property(String expression) {
this.expression = expression;
}
@@ -123,14 +118,6 @@ public class Property<T> {
return p;
}
/**
* @deprecated use {@link #ofValue(Object)} instead.
*/
@Deprecated
public static <V> Property<V> of(V value) {
return ofValue(value);
}
/**
* Build a new Property object with a Pebble expression.<br>
* <p>

View File

@@ -15,31 +15,10 @@ public class TaskException extends Exception {
private transient AbstractLogConsumer logConsumer;
/**
* This constructor will certainly be removed in 0.21 as we keep it only because all task runners must be impacted.
* @deprecated use {@link #TaskException(int, AbstractLogConsumer)} instead.
*/
@Deprecated(forRemoval = true, since = "0.20.0")
public TaskException(int exitCode, int stdOutCount, int stdErrCount) {
this("Command failed with exit code " + exitCode, exitCode, stdOutCount, stdErrCount);
}
public TaskException(int exitCode, AbstractLogConsumer logConsumer) {
this("Command failed with exit code " + exitCode, exitCode, logConsumer);
}
/**
* This constructor will certainly be removed in 0.21 as we keep it only because all task runners must be impacted.
* @deprecated use {@link #TaskException(String, int, AbstractLogConsumer)} instead.
*/
@Deprecated(forRemoval = true, since = "0.20.0")
public TaskException(String message, int exitCode, int stdOutCount, int stdErrCount) {
super(message);
this.exitCode = exitCode;
this.stdOutCount = stdOutCount;
this.stdErrCount = stdErrCount;
}
public TaskException(String message, int exitCode, AbstractLogConsumer logConsumer) {
super(message);
this.exitCode = exitCode;

View File

@@ -1,156 +0,0 @@
package io.kestra.core.models.templates;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotatedMember;
import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.IdUtils;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.Hidden;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.util.*;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
@SuperBuilder(toBuilder = true)
@Getter
@AllArgsConstructor
@NoArgsConstructor
@Introspected
@ToString
@EqualsAndHashCode
public class Template implements DeletedInterface, TenantInterface, HasUID {
private static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml().copy()
.setAnnotationIntrospector(new JacksonAnnotationIntrospector() {
@Override
public boolean hasIgnoreMarker(final AnnotatedMember m) {
List<String> exclusions = Arrays.asList("revision", "deleted", "source");
return exclusions.contains(m.getName()) || super.hasIgnoreMarker(m);
}
})
.setDefaultPropertyInclusion(JsonInclude.Include.NON_DEFAULT);
@Setter
@Hidden
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
private String tenantId;
@NotNull
@NotBlank
@Pattern(regexp = "^[a-zA-Z0-9][a-zA-Z0-9._-]*")
private String id;
@NotNull
@Pattern(regexp="^[a-z0-9][a-z0-9._-]*")
private String namespace;
String description;
@Valid
@NotEmpty
private List<Task> tasks;
@Valid
private List<Task> errors;
@Valid
@JsonProperty("finally")
@Getter(AccessLevel.NONE)
protected List<Task> _finally;
public List<Task> getFinally() {
return this._finally;
}
@NotNull
@Builder.Default
private final boolean deleted = false;
/** {@inheritDoc **/
@Override
@JsonIgnore
public String uid() {
return Template.uid(
this.getTenantId(),
this.getNamespace(),
this.getId()
);
}
@JsonIgnore
public static String uid(String tenantId, String namespace, String id) {
return IdUtils.fromParts(
tenantId,
namespace,
id
);
}
public Optional<ConstraintViolationException> validateUpdate(Template updated) {
Set<ConstraintViolation<?>> violations = new HashSet<>();
if (!updated.getId().equals(this.getId())) {
violations.add(ManualConstraintViolation.of(
"Illegal template id update",
updated,
Template.class,
"template.id",
updated.getId()
));
}
if (!updated.getNamespace().equals(this.getNamespace())) {
violations.add(ManualConstraintViolation.of(
"Illegal namespace update",
updated,
Template.class,
"template.namespace",
updated.getNamespace()
));
}
if (!violations.isEmpty()) {
return Optional.of(new ConstraintViolationException(violations));
} else {
return Optional.empty();
}
}
public String generateSource() {
try {
return YAML_MAPPER.writeValueAsString(this);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public Template toDeleted() {
return new Template(
this.tenantId,
this.id,
this.namespace,
this.description,
this.tasks,
this.errors,
this._finally,
true
);
}
}

View File

@@ -1,15 +0,0 @@
package io.kestra.core.models.templates;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.util.StringUtils;
import java.lang.annotation.*;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PACKAGE, ElementType.TYPE})
@Requires(property = "kestra.templates.enabled", value = StringUtils.TRUE, defaultValue = StringUtils.FALSE)
@Inherited
public @interface TemplateEnabled {
}

View File

@@ -1,18 +0,0 @@
package io.kestra.core.models.templates;
import io.micronaut.core.annotation.Introspected;
import lombok.*;
import lombok.experimental.SuperBuilder;
import lombok.extern.jackson.Jacksonized;
@SuperBuilder
@Getter
@AllArgsConstructor
@NoArgsConstructor
@Introspected
@ToString
@EqualsAndHashCode
public class TemplateSource extends Template {
String source;
String exception;
}

View File

@@ -1,5 +1,6 @@
package io.kestra.core.plugins;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Builder;
import java.io.File;
@@ -33,7 +34,7 @@ public record PluginArtifact(
String version,
URI uri
) implements Comparable<PluginArtifact> {
private static final Pattern ARTIFACT_PATTERN = Pattern.compile(
"([^: ]+):([^: ]+)(:([^: ]*)(:([^: ]+))?)?:([^: ]+)"
);
@@ -42,7 +43,8 @@ public record PluginArtifact(
);
public static final String JAR_EXTENSION = "jar";
public static final String KESTRA_GROUP_ID = "io.kestra";
/**
* Static helper method for constructing a new {@link PluginArtifact} from a JAR file.
*
@@ -135,6 +137,11 @@ public record PluginArtifact(
public String toString() {
return toCoordinates();
}
@JsonIgnore
public boolean isOfficial() {
return groupId.startsWith(KESTRA_GROUP_ID);
}
public String toCoordinates() {
return Stream.of(groupId, artifactId, extension, classifier, version)

View File

@@ -1,9 +1,13 @@
package io.kestra.core.plugins;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.Version;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpMethod;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.HttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -15,9 +19,12 @@ import java.util.Base64;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Services for retrieving available plugin artifacts for Kestra.
@@ -39,6 +46,8 @@ public class PluginCatalogService {
private final boolean icons;
private final boolean oss;
private final Version currentStableVersion;
/**
* Creates a new {@link PluginCatalogService} instance.
@@ -53,11 +62,55 @@ public class PluginCatalogService {
this.httpClient = httpClient;
this.icons = icons;
this.oss = communityOnly;
Version version = Version.of(KestraContext.getContext().getVersion());
this.currentStableVersion = new Version(version.majorVersion(), version.minorVersion(), version.patchVersion(), null);
// Immediately trigger an async load of plugin artifacts.
this.isLoaded.set(true);
this.plugins = CompletableFuture.supplyAsync(this::load);
}
/**
* Resolves the version for the given artifacts.
*
* @param artifacts The list of artifacts to resolve.
* @return The list of results.
*/
public List<PluginResolutionResult> resolveVersions(List<PluginArtifact> artifacts) {
if (ListUtils.isEmpty(artifacts)) {
return List.of();
}
final Map<String, ApiPluginArtifact> pluginsByGroupAndArtifactId = getAllCompatiblePlugins().stream()
.collect(Collectors.toMap(it -> it.groupId() + ":" + it.artifactId(), Function.identity()));
return artifacts.stream().map(it -> {
// Get all compatible versions for current artifact
List<String> versions = Optional
.ofNullable(pluginsByGroupAndArtifactId.get(it.groupId() + ":" + it.artifactId()))
.map(ApiPluginArtifact::versions)
.orElse(List.of());
// Try to resolve the version
String resolvedVersion = null;
if (!versions.isEmpty()) {
if (it.version().equalsIgnoreCase("LATEST")) {
resolvedVersion = versions.getFirst();
} else {
resolvedVersion = versions.contains(it.version()) ? it.version() : null;
}
}
// Build the PluginResolutionResult
return new PluginResolutionResult(
it,
resolvedVersion,
versions,
resolvedVersion != null
);
}).toList();
}
public synchronized List<PluginManifest> get() {
try {
@@ -140,7 +193,27 @@ public class PluginCatalogService {
isLoaded.set(false);
}
}
private List<ApiPluginArtifact> getAllCompatiblePlugins() {
MutableHttpRequest<Object> request = HttpRequest.create(
HttpMethod.GET,
"/v1/plugins/artifacts/core-compatibility/" + currentStableVersion
);
if (oss) {
request.getParameters().add("license", "OPENSOURCE");
}
try {
return httpClient
.toBlocking()
.exchange(request, Argument.listOf(ApiPluginArtifact.class))
.body();
} catch (Exception e) {
log.debug("Failed to retrieve available plugins from Kestra API. Cause: ", e);
return List.of();
}
}
public record PluginManifest(
String title,
String icon,
@@ -153,4 +226,11 @@ public class PluginCatalogService {
return groupId + ":" + artifactId + ":LATEST";
}
}
public record ApiPluginArtifact(
String groupId,
String artifactId,
String license,
List<String> versions
) {}
}

View File

@@ -5,22 +5,19 @@ import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.runners.*;
public interface QueueFactoryInterface {
String EXECUTION_NAMED = "executionQueue";
String EXECUTOR_NAMED = "executorQueue";
String EXECUTION_EVENT_NAMED = "executionEventQueue";
String WORKERJOB_NAMED = "workerJobQueue";
String WORKERTASKRESULT_NAMED = "workerTaskResultQueue";
String WORKERTRIGGERRESULT_NAMED = "workerTriggerResultQueue";
String FLOW_NAMED = "flowQueue";
String TEMPLATE_NAMED = "templateQueue";
String WORKERTASKLOG_NAMED = "workerTaskLogQueue";
String METRIC_QUEUE = "workerTaskMetricQueue";
String KILL_NAMED = "executionKilledQueue";
String WORKERINSTANCE_NAMED = "workerInstanceQueue";
String WORKERJOBRUNNING_NAMED = "workerJobRunningQueue";
String TRIGGER_NAMED = "triggerQueue";
String SUBFLOWEXECUTIONRESULT_NAMED = "subflowExecutionResultQueue";
@@ -30,7 +27,7 @@ public interface QueueFactoryInterface {
QueueInterface<Execution> execution();
QueueInterface<Executor> executor();
QueueInterface<ExecutionEvent> executionEvent();
WorkerJobQueueInterface workerJob();
@@ -46,10 +43,6 @@ public interface QueueFactoryInterface {
QueueInterface<ExecutionKilled> kill();
QueueInterface<Template> template();
QueueInterface<WorkerInstance> workerInstance();
QueueInterface<WorkerJobRunning> workerJobRunning();
QueueInterface<Trigger> trigger();

View File

@@ -35,6 +35,24 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
void delete(String consumerGroup, T message) throws QueueException;
/**
* Delete all messages of the queue for this key.
* This is used to purge a queue for a specific key.
* A queue implementation may omit to implement it and purge records differently.
*/
default void deleteByKey(String key) throws QueueException {
// by default do nothing
}
/**
* Delete all messages of the queue for a set of keys.
* This is used to purge a queue for specific keys.
* A queue implementation may omit to implement it and purge records differently.
*/
default void deleteByKeys(List<String> keys) throws QueueException {
// by default do nothing
}
default Runnable receive(Consumer<Either<T, DeserializationException>> consumer) {
return receive(null, consumer, false);
}
@@ -54,4 +72,20 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
}
Runnable receive(String consumerGroup, Class<?> queueType, Consumer<Either<T, DeserializationException>> consumer, boolean forUpdate);
default Runnable receiveBatch(Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer) {
return receiveBatch(null, queueType, consumer);
}
default Runnable receiveBatch(String consumerGroup, Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer) {
return receiveBatch(consumerGroup, queueType, consumer, true);
}
/**
* Consumer a batch of messages.
* By default, it consumes a single message, a queue implementation may implement it to support batch consumption.
*/
default Runnable receiveBatch(String consumerGroup, Class<?> queueType, Consumer<List<Either<T, DeserializationException>>> consumer, boolean forUpdate) {
return receive(consumerGroup, either -> consumer.accept(List.of(either)), forUpdate);
}
}

View File

@@ -19,12 +19,8 @@ public class QueueService {
return ((SubflowExecution<?>) object).getExecution().getId();
} else if (object.getClass() == SubflowExecutionResult.class) {
return ((SubflowExecutionResult) object).getExecutionId();
} else if (object.getClass() == ExecutorState.class) {
return ((ExecutorState) object).getExecutionId();
} else if (object.getClass() == Setting.class) {
return ((Setting) object).getKey();
} else if (object.getClass() == Executor.class) {
return ((Executor) object).getExecution().getId();
} else if (object.getClass() == MetricEntry.class) {
return null;
} else if (object.getClass() == SubflowExecutionEnd.class) {

View File

@@ -14,19 +14,19 @@ import java.time.Instant;
@Requires(property = "kestra.server-type")
@Slf4j
public class ReportableScheduler {
private final ReportableRegistry registry;
private final ServerEventSender sender;
private final Clock clock;
@Inject
public ReportableScheduler(ReportableRegistry registry, ServerEventSender sender) {
this.registry = registry;
this.sender = sender;
this.clock = Clock.systemDefaultZone();
}
@Scheduled(fixedDelay = "5m", initialDelay = "${kestra.anonymous-usage-report.initial-delay}")
@Scheduled(fixedDelay = "5m", initialDelay = "${kestra.anonymous-usage-report.initial-delay:5m}")
public void tick() {
Instant now = clock.instant();
for (Reportable<?> r : registry.getAll()) {

View File

@@ -18,6 +18,7 @@ import io.micronaut.http.hateoas.JsonError;
import io.micronaut.reactor.http.client.ReactorHttpClient;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.net.URI;
@@ -28,29 +29,30 @@ import java.util.UUID;
@Singleton
@Slf4j
public class ServerEventSender {
private static final String SESSION_UUID = IdUtils.create();
private static final ObjectMapper OBJECT_MAPPER = JacksonMapper.ofJson();
@Inject
@Client
private ReactorHttpClient client;
@Inject
private VersionProvider versionProvider;
@Inject
private InstanceService instanceService;
private final ServerType serverType;
@Value("${kestra.anonymous-usage-report.uri}")
@Setter
@Value("${kestra.anonymous-usage-report.uri:'https://api.kestra.io/v1/reports/server-events'}")
protected URI url;
public ServerEventSender( ) {
this.serverType = KestraContext.getContext().getServerType();
}
public void send(final Instant now, final Type type, Object event) {
ServerEvent serverEvent = ServerEvent
.builder()
@@ -65,11 +67,11 @@ public class ServerEventSender {
.build();
try {
MutableHttpRequest<ServerEvent> request = this.request(serverEvent, type);
if (log.isTraceEnabled()) {
log.trace("Report anonymous usage: '{}'", OBJECT_MAPPER.writeValueAsString(serverEvent));
}
this.handleResponse(client.toBlocking().retrieve(request, Argument.of(Result.class), Argument.of(JsonError.class)));
} catch (HttpClientResponseException t) {
log.trace("Unable to report anonymous usage with body '{}'", t.getResponse().getBody(String.class), t);
@@ -77,11 +79,11 @@ public class ServerEventSender {
log.trace("Unable to handle anonymous usage", t);
}
}
private void handleResponse (Result result){
}
protected MutableHttpRequest<ServerEvent> request(ServerEvent event, Type type) throws Exception {
URI baseUri = URI.create(this.url.toString().endsWith("/") ? this.url.toString() : this.url + "/");
URI resolvedUri = baseUri.resolve(type.name().toLowerCase());

View File

@@ -0,0 +1,25 @@
package io.kestra.core.repositories;
import io.kestra.core.runners.ConcurrencyLimit;
import jakarta.validation.constraints.NotNull;
import java.util.List;
import java.util.Optional;
public interface ConcurrencyLimitRepositoryInterface {
/**
* Update a concurrency limit
* WARNING: this is inherently unsafe and must only be used for administration
*/
ConcurrencyLimit update(ConcurrencyLimit concurrencyLimit);
/**
* Returns all concurrency limits from the database for a given tenant
*/
List<ConcurrencyLimit> find(String tenantId);
/**
* Find a concurrency limit by its id
*/
Optional<ConcurrencyLimit> findById(@NotNull String tenantId, @NotNull String namespace, @NotNull String flowId);
}

View File

@@ -2,7 +2,6 @@ package io.kestra.core.repositories;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.statistics.DailyExecutionStatistics;
import io.kestra.core.models.executions.statistics.ExecutionCount;
import io.kestra.core.models.executions.statistics.Flow;

View File

@@ -1,6 +1,7 @@
package io.kestra.core.repositories;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.topologies.FlowTopology;
import java.util.List;
@@ -13,4 +14,6 @@ public interface FlowTopologyRepositoryInterface {
List<FlowTopology> findAll(String tenantId);
FlowTopology save(FlowTopology flowTopology);
void save(FlowInterface flow, List<FlowTopology> flowTopologies);
}

View File

@@ -0,0 +1,48 @@
package io.kestra.core.repositories;
import io.kestra.core.models.FetchVersion;
import io.kestra.core.models.QueryFilter;
import io.kestra.core.models.kv.PersistedKvMetadata;
import io.micronaut.data.model.Pageable;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
public interface KvMetadataRepositoryInterface extends SaveRepositoryInterface<PersistedKvMetadata> {
Optional<PersistedKvMetadata> findByName(
String tenantId,
String namespace,
String name
) throws IOException;
default ArrayListTotal<PersistedKvMetadata> find(
Pageable pageable,
String tenantId,
List<QueryFilter> filters,
boolean allowDeleted,
boolean allowExpired
) {
return this.find(pageable, tenantId, filters, allowDeleted, allowExpired, FetchVersion.LATEST);
}
ArrayListTotal<PersistedKvMetadata> find(
Pageable pageable,
String tenantId,
List<QueryFilter> filters,
boolean allowDeleted,
boolean allowExpired,
FetchVersion fetchBehavior
);
default PersistedKvMetadata delete(PersistedKvMetadata persistedKvMetadata) throws IOException {
return this.save(persistedKvMetadata.toBuilder().deleted(true).build());
}
/**
* Purge (hard delete) a list of persisted kv metadata. If no version is specified, all versions are purged.
* @param persistedKvsMetadata the list of persisted kv metadata to purge
* @return the number of purged persisted kv metadata
*/
Integer purge(List<PersistedKvMetadata> persistedKvsMetadata);
}

View File

@@ -0,0 +1,24 @@
package io.kestra.core.repositories;
import io.kestra.core.lock.Lock;
import java.util.List;
import java.util.Optional;
/**
* Low lever repository for locks.
* It should never be used directly but only via the {@link io.kestra.core.lock.LockService}.
*/
public interface LockRepositoryInterface {
Optional<Lock> findById(String category, String id);
boolean create(Lock newLock);
default void delete(Lock existing) {
deleteById(existing.getCategory(), existing.getId());
}
void deleteById(String category, String id);
List<Lock> deleteByOwner(String owner);
}

View File

@@ -5,7 +5,5 @@ import java.util.List;
public interface SaveRepositoryInterface<T> {
T save(T item);
default int saveBatch(List<T> items) {
throw new UnsupportedOperationException();
}
int saveBatch(List<T> items);
}

View File

@@ -1,7 +1,9 @@
package io.kestra.core.repositories;
import io.kestra.core.runners.TransactionContext;
import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceInstance;
import io.kestra.core.server.ServiceStateTransition;
import io.kestra.core.server.ServiceType;
import io.micronaut.data.model.Pageable;
@@ -9,6 +11,7 @@ import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Function;
/**
@@ -58,20 +61,6 @@ public interface ServiceInstanceRepositoryInterface {
*/
ServiceInstance save(ServiceInstance service);
/**
* Finds all service instances which are in the given state.
*
* @return the list of {@link ServiceInstance}.
*/
List<ServiceInstance> findAllInstancesInState(final Service.ServiceState state);
/**
* Finds all service instances which are in the given state.
*
* @return the list of {@link ServiceInstance}.
*/
List<ServiceInstance> findAllInstancesInStates(final Set<Service.ServiceState> states);
/**
* Finds all service active instances between the given dates.
*
@@ -84,6 +73,28 @@ public interface ServiceInstanceRepositoryInterface {
final Instant from,
final Instant to);
/**
* Finds all service instances which are NOT {@link Service.ServiceState#RUNNING}, then process them using the consumer.
*/
void processAllNonRunningInstances(BiConsumer<TransactionContext, ServiceInstance> consumer);
/**
* Attempt to transition the state of a given service to a given new state.
* This method may not update the service if the transition is not valid.
*
* @param instance the service instance.
* @param newState the new state of the service.
* @return an optional of the {@link ServiceInstance} or {@link Optional#empty()} if the service is not running.
*/
ServiceStateTransition.Response mayTransitServiceTo(final TransactionContext txContext,
final ServiceInstance instance,
final Service.ServiceState newState,
final String reason);
/**
* Finds all service instances that are in the states, then process them using the consumer.
*/
void processInstanceInStates(Set<Service.ServiceState> states, BiConsumer<TransactionContext, ServiceInstance> consumer);
/**
* Purge all instances in the EMPTY state older than the until date.
*

View File

@@ -1,42 +0,0 @@
package io.kestra.core.repositories;
import io.micronaut.data.model.Pageable;
import io.kestra.core.models.templates.Template;
import java.util.List;
import java.util.Optional;
import jakarta.annotation.Nullable;
public interface TemplateRepositoryInterface {
Optional<Template> findById(String tenantId, String namespace, String id);
List<Template> findAll(String tenantId);
List<Template> findAllWithNoAcl(String tenantId);
List<Template> findAllForAllTenants();
ArrayListTotal<Template> find(
Pageable pageable,
@Nullable String query,
@Nullable String tenantId,
@Nullable String namespace
);
// Should normally be TemplateWithSource but it didn't exist yet
List<Template> find(
@Nullable String query,
@Nullable String tenantId,
@Nullable String namespace
);
List<Template> findByNamespace(String tenantId, String namespace);
Template create(Template template);
Template update(Template template, Template previous);
void delete(Template template);
List<String> findDistinctNamespace(String tenantId);
}

View File

@@ -1,13 +0,0 @@
package io.kestra.core.repositories;
import io.kestra.core.runners.WorkerJobRunning;
import java.util.Optional;
public interface WorkerJobRunningRepositoryInterface {
Optional<WorkerJobRunning> findByKey(String uid);
void deleteByKey(String uid);
}

View File

@@ -1,9 +1,11 @@
package io.kestra.core.runners;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.services.PluginDefaultService;
import jakarta.inject.Singleton;
import java.util.Objects;
import lombok.Setter;
@@ -15,12 +17,14 @@ import java.util.Optional;
@Singleton
public class DefaultFlowMetaStore implements FlowMetaStoreInterface {
private final FlowRepositoryInterface flowRepository;
private final PluginDefaultService pluginDefaultService;
@Setter
private List<FlowWithSource> allFlows;
public DefaultFlowMetaStore(FlowListenersInterface flowListeners, FlowRepositoryInterface flowRepository) {
public DefaultFlowMetaStore(FlowListenersInterface flowListeners, FlowRepositoryInterface flowRepository, PluginDefaultService pluginDefaultService) {
this.flowRepository = flowRepository;
this.pluginDefaultService = pluginDefaultService;
flowListeners.listen(flows -> allFlows = flows);
}
@@ -53,4 +57,9 @@ public class DefaultFlowMetaStore implements FlowMetaStoreInterface {
public Boolean isReady() {
return true;
}
@Override
public Optional<FlowWithSource> findByExecutionThenInjectDefaults(Execution execution) {
return findByExecution(execution).map(it -> pluginDefaultService.injectDefaults(it, execution));
}
}

View File

@@ -1,4 +1,4 @@
package io.kestra.jdbc.runner;
package io.kestra.core.runners;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.LogEntry;
@@ -9,7 +9,6 @@ import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.repositories.MetricRepositoryInterface;
import io.kestra.core.repositories.SaveRepositoryInterface;
import io.kestra.core.runners.Indexer;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.server.ServiceType;
import io.kestra.core.utils.IdUtils;
@@ -29,20 +28,20 @@ import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
// FIXME move that to a new indexer module
/**
* This class is responsible to batch-indexed asynchronously queue messages.<p>
* Some queue messages are indexed synchronously via the {@link JdbcQueueIndexer}.
* Some queue messages are indexed synchronously via the {@link QueueIndexer}.
*/
@SuppressWarnings("this-escape")
@Slf4j
@Singleton
@JdbcRunnerEnabled
public class JdbcIndexer implements Indexer {
public class DefaultIndexer implements Indexer {
private final LogRepositoryInterface logRepository;
private final JdbcQueue<LogEntry> logQueue;
private final QueueInterface<LogEntry> logQueue;
private final MetricRepositoryInterface metricRepository;
private final JdbcQueue<MetricEntry> metricQueue;
private final QueueInterface<MetricEntry> metricQueue;
private final MetricRegistry metricRegistry;
private final List<Runnable> receiveCancellations = new ArrayList<>();
@@ -56,7 +55,7 @@ public class JdbcIndexer implements Indexer {
private final QueueService queueService;
@Inject
public JdbcIndexer(
public DefaultIndexer(
LogRepositoryInterface logRepository,
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED) QueueInterface<LogEntry> logQueue,
MetricRepositoryInterface metricRepositor,
@@ -67,9 +66,9 @@ public class JdbcIndexer implements Indexer {
QueueService queueService
) {
this.logRepository = logRepository;
this.logQueue = (JdbcQueue<LogEntry>) logQueue;
this.logQueue = logQueue;
this.metricRepository = metricRepositor;
this.metricQueue = (JdbcQueue<MetricEntry>) metricQueue;
this.metricQueue = metricQueue;
this.metricRegistry = metricRegistry;
this.eventPublisher = eventPublisher;
this.skipExecutionService = skipExecutionService;
@@ -91,7 +90,7 @@ public class JdbcIndexer implements Indexer {
this.sendBatch(metricQueue, metricRepository);
}
protected <T> void sendBatch(JdbcQueue<T> queueInterface, SaveRepositoryInterface<T> saveRepositoryInterface) {
protected <T> void sendBatch(QueueInterface<T> queueInterface, SaveRepositoryInterface<T> saveRepositoryInterface) {
this.receiveCancellations.addFirst(queueInterface.receiveBatch(Indexer.class, eithers -> {
// first, log all deserialization issues
eithers.stream().filter(either -> either.isRight()).forEach(either -> log.error("unable to deserialize an item: {}", either.getRight().getMessage()));

View File

@@ -0,0 +1,89 @@
package io.kestra.core.runners;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.repositories.FlowTopologyRepositoryInterface;
import io.micronaut.context.ApplicationContext;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
/**
* This class is responsible to index the queue synchronously at message production time. It is used by the Queue itself<p>
* Some queue messages are batch-indexed asynchronously via the regular {@link io.kestra.core.runners.Indexer}
* which listen to (receive) those queue messages.
*/
@Slf4j
@Singleton
public class DefaultQueueIndexer implements QueueIndexer {
private volatile Map<Class<?>, QueueIndexerRepository<?>> repositories;
private final MetricRegistry metricRegistry;
private final ApplicationContext applicationContext;
@Inject
public DefaultQueueIndexer(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
this.metricRegistry = applicationContext.getBean(MetricRegistry.class);
}
private Map<Class<?>, QueueIndexerRepository<?>> getRepositories() {
if (repositories == null) {
synchronized (this) {
if (repositories == null) {
repositories = new HashMap<>();
applicationContext.getBeansOfType(QueueIndexerRepository.class)
.forEach(saveRepositoryInterface -> {
repositories.put(saveRepositoryInterface.getItemClass(), saveRepositoryInterface);
});
}
}
}
return repositories;
}
// FIXME this today limit this indexer to only JDBC queue and repository.
// to be able to use JDBC queue with another repository we would need to check in each QueueIndexerRepository if it's a Jdbc transaction before casting
@Override
public void accept(TransactionContext txContext, Object item) {
Map<Class<?>, QueueIndexerRepository<?>> repositories = getRepositories();
if (repositories.containsKey(item.getClass())) {
this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_REQUEST_COUNT, MetricRegistry.METRIC_INDEXER_REQUEST_COUNT_DESCRIPTION, "type", item.getClass().getName()).increment();
this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_MESSAGE_IN_COUNT, MetricRegistry.METRIC_INDEXER_MESSAGE_IN_COUNT_DESCRIPTION, "type", item.getClass().getName()).increment();
this.metricRegistry.timer(MetricRegistry.METRIC_INDEXER_REQUEST_DURATION, MetricRegistry.METRIC_INDEXER_REQUEST_DURATION_DESCRIPTION, "type", item.getClass().getName()).record(() -> {
QueueIndexerRepository<?> indexerRepository = repositories.get(item.getClass());
if (indexerRepository instanceof FlowTopologyRepositoryInterface) {
// we allow flow topology to fail indexation
try {
save(indexerRepository, txContext, item);
} catch (Exception e) {
log.error("Unable to index a flow topology, skipping it", e);
}
} else {
save(indexerRepository, txContext, cast(item));
}
this.metricRegistry.counter(MetricRegistry.METRIC_INDEXER_MESSAGE_OUT_COUNT, MetricRegistry.METRIC_INDEXER_MESSAGE_OUT_COUNT_DESCRIPTION, "type", item.getClass().getName()).increment();
});
}
}
private void save(QueueIndexerRepository<?> indexerRepository, TransactionContext txContext, Object item) {
if (indexerRepository.supports(txContext.getClass())) {
indexerRepository.save(txContext, cast(item));
} else {
indexerRepository.save(cast(item));
}
}
@SuppressWarnings("unchecked")
private static <T> T cast(Object message) {
return (T) message;
}
}

View File

@@ -511,17 +511,6 @@ public class DefaultRunContext extends RunContext {
}
}
/**
* {@inheritDoc}
*/
@Override
@SuppressWarnings("unchecked")
public String tenantId() {
Map<String, String> flow = (Map<String, String>) this.getVariables().get("flow");
// normally only tests should not have the flow variable
return flow != null ? flow.get("tenantId") : null;
}
/**
* {@inheritDoc}
*/

View File

@@ -0,0 +1,17 @@
package io.kestra.core.runners;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.executions.Execution;
import java.time.Instant;
public record ExecutionEvent(String tenantId, String namespace, String flowId, String executionId, Instant eventDate, ExecutionEventType eventType) implements HasUID {
public ExecutionEvent(Execution execution, ExecutionEventType eventType) {
this(execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId(), Instant.now(), eventType);
}
@Override
public String uid() {
return executionId;
}
}

View File

@@ -0,0 +1,7 @@
package io.kestra.core.runners;
public enum ExecutionEventType {
CREATED,
UPDATED,
TERMINATED,
}

View File

@@ -0,0 +1,29 @@
package io.kestra.core.runners;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.runners.ExecutionQueued;
import io.kestra.core.runners.TransactionContext;
import java.util.function.BiConsumer;
/**
* This state store is used by the {@link Executor} to handle execution queued by flow concurrency limit.
*/
public interface ExecutionQueuedStateStore {
/**
* remove a queued execution.
*/
void remove(Execution execution);
/**
* Save a queued execution.
*
* @implNote Implementors that support transaction must use the provided {@link TransactionContext} to attach to the current transaction.
*/
void save(TransactionContext txContext, ExecutionQueued executionQueued);
/**
* Pop a queued execution: remove the oldest one and process it with the provided consumer.
*/
void pop(String tenantId, String namespace, String flowId, BiConsumer<TransactionContext, Execution> consumer);
}

Some files were not shown because too many files have changed in this diff Show More