Compare commits

...

274 Commits

Author SHA1 Message Date
Loïc Mathieu
2adb008dfb chore(system): simulate FIFO processing for messages of the same execution inside the JDBC executor
The JDBC executor process execution and worker task result messages concurrently for performance.
But the first thing it does is to lock the execution to avoid incosistent processing of the same execution.

By simulating a FIFO queue on execution to avoid processing concurrently the same execution (but keep processing different execution concurrently) we improve performance for concurrent task processing as we avoid the cost of lock & wait on the DB.
2025-08-25 09:16:07 +02:00
Anna Geller
552b3d7476 docs: add agents guidelines (#10875) 2025-08-25 08:53:17 +02:00
Florian Hussonnois
795f9c9a17 fix(core): add missing equals/hashcode methods on UnitTest 2025-08-22 13:29:13 +02:00
Florian Hussonnois
df430ded61 fix(system): fix count in AbstractJdbcRepository 2025-08-22 13:29:13 +02:00
Roman Acevedo
a6844e0ecf ci: fix by making inputs accept both dispatch and callable 2025-08-22 11:13:58 +02:00
Roman Acevedo
f71574cfb5 ci: simplify docker ci and push minor semver (#10848)
This PR modify our existing CI to allow publishing our docker image with 2 semver tags.
Example: for a CI on a tag v0.24.99 it will push both tags v0.24.99 and v0.24.

When this CI is settled on this repo (after one micro release for example), I will do the same for EE.

What has been done:

merge docker.yml into workflow-publish-docker.yml
make workflow-publish-docker.yml handle both tags (releases) and develop CI
when in a tag CI, extract the minor version, push it as well as the full vMAJOR.MINOR.PATCH version (see the related issue Add Multiple Semantic Version (SemVer) Tags for Docker Images #10575)
2025-08-22 10:41:59 +02:00
Nicolas K.
c5341e56e9 fix(tests): flaky consumer test (#10853)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-08-22 09:37:43 +02:00
Piyush Bhaskar
88b0723147 fix(flows): fixes revision restore for a flow (#10841) 2025-08-22 12:50:13 +05:30
Piyush Bhaskar
f79fcf5734 chore(versions) : bump the ui-libs (#10862) 2025-08-22 12:45:12 +05:30
Barthélémy Ledoux
cf27827f20 fix(core): when refreshing a multipanel editor, sizes are not kept (#10858) 2025-08-22 12:16:51 +05:30
Barthélémy Ledoux
408b6b97a7 fix(flows): in no-code refreshing a message will update its value (#10851) 2025-08-21 22:32:39 +02:00
Piyush Bhaskar
d57753e62b fix(core): Choose File button and hover on btn text in light theme (#10857) 2025-08-21 23:20:27 +05:30
Nicolas K.
2571eaf56c fix: #4442 extract tenant id from file path (#10850)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-08-21 17:40:10 +02:00
Florian Hussonnois
37ea7f31a0 feat(flows): add pebble expression support for all input defaults (#9762)
Fix: #9762
2025-08-21 17:14:15 +02:00
Roman Acevedo
478c911718 fix(tests): a TestSuite with any ERROR could not end in ERROR 2025-08-21 16:43:16 +02:00
Piyush Bhaskar
1bce0d673f fix(core): update params for flow navigation (#10847) 2025-08-21 19:43:26 +05:30
Florian Hussonnois
609a5b8066 feat(flow): add support for optional flow outputs
Add the new required property to the flow output
model. By default, all flow's outputs are required

Fixes: kestra-io/kestra-ee#3969
2025-08-21 16:09:22 +02:00
Florian Hussonnois
6182015a6f feat(system): report additional server events
Part-of: kestra-io/kestra-ee#3014
2025-08-21 14:50:11 +02:00
brian.mulier
6f8044f347 fix(ai): make sure accept / decline AI banner doesn't hide code editor (#10835)
closes kestra-io/kestra-ee#4273
2025-08-21 14:45:32 +02:00
brian.mulier
b3b7596bf4 fix(ai): AI Copilot instructions for better results (#10835)
closes kestra-io/kestra-ee#4273
2025-08-21 14:45:32 +02:00
brian.mulier
36b1c14424 fix(ai): add instructions for AI Copilot configuration if not enabled yet (#10835)
closes kestra-io/kestra-ee#4273
2025-08-21 14:45:32 +02:00
brian-mulier-p
1aef9578d9 fix(kv): Set task should convert numbers to string if kvType == STRING (#10836) 2025-08-21 09:33:03 +02:00
Piyush Bhaskar
6a07e3c048 fix(core): truncate the overflowing text from button when zoomed #10775 2025-08-21 01:14:28 +05:30
Owen Warnack
b643954921 fix(ui): show lock icon for namespace in No-Code editor (#10667)
Co-authored-by: Piyush Bhaskar <impiyush0012@gmail.com>
2025-08-21 00:06:23 +05:30
Piyush Bhaskar
fe1ae290d0 fix(core): show validation button icon 2025-08-20 23:48:14 +05:30
Hamza
6ae2fde78f fix(core): truncate the overflowing text from button when zoomed (#10775)
Co-authored-by: Piyush Bhaskar <impiyush0012@gmail.com>
2025-08-20 23:28:53 +05:30
Loïc Mathieu
260f5c427b fix(system): properly close the ScheduledExecutorService tasks
This avoids having running threads while the component is supposed to be closed.
2025-08-20 14:23:13 +02:00
dependabot[bot]
f2dbc41cdb build(deps): bump opensearchRestVersion from 3.1.0 to 3.2.0
Bumps `opensearchRestVersion` from 3.1.0 to 3.2.0.

Updates `org.opensearch.client:opensearch-rest-client` from 3.1.0 to 3.2.0
- [Release notes](https://github.com/opensearch-project/OpenSearch/releases)
- [Changelog](https://github.com/opensearch-project/OpenSearch/blob/main/CHANGELOG.md)
- [Commits](https://github.com/opensearch-project/OpenSearch/compare/3.1.0...3.2.0)

Updates `org.opensearch.client:opensearch-rest-high-level-client` from 3.1.0 to 3.2.0
- [Release notes](https://github.com/opensearch-project/OpenSearch/releases)
- [Changelog](https://github.com/opensearch-project/OpenSearch/blob/main/CHANGELOG.md)
- [Commits](https://github.com/opensearch-project/OpenSearch/compare/3.1.0...3.2.0)

---
updated-dependencies:
- dependency-name: org.opensearch.client:opensearch-rest-client
  dependency-version: 3.2.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
- dependency-name: org.opensearch.client:opensearch-rest-high-level-client
  dependency-version: 3.2.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-20 14:00:48 +02:00
dependabot[bot]
39fdb7ed5d build(deps): bump com.github.oshi:oshi-core from 6.8.2 to 6.8.3
Bumps [com.github.oshi:oshi-core](https://github.com/oshi/oshi) from 6.8.2 to 6.8.3.
- [Release notes](https://github.com/oshi/oshi/releases)
- [Changelog](https://github.com/oshi/oshi/blob/master/CHANGELOG.md)
- [Commits](https://github.com/oshi/oshi/compare/oshi-parent-6.8.2...oshi-parent-6.8.3)

---
updated-dependencies:
- dependency-name: com.github.oshi:oshi-core
  dependency-version: 6.8.3
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-20 13:57:41 +02:00
dependabot[bot]
c6b9c445c5 build(deps): bump com.github.docker-java:docker-java-transport-httpclient5
Bumps [com.github.docker-java:docker-java-transport-httpclient5](https://github.com/docker-java/docker-java) from 3.5.3 to 3.6.0.
- [Release notes](https://github.com/docker-java/docker-java/releases)
- [Changelog](https://github.com/docker-java/docker-java/blob/main/CHANGELOG.md)
- [Commits](https://github.com/docker-java/docker-java/compare/3.5.3...3.6.0)

---
updated-dependencies:
- dependency-name: com.github.docker-java:docker-java-transport-httpclient5
  dependency-version: 3.6.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-20 13:57:10 +02:00
dependabot[bot]
da8992f130 build(deps): bump com.google.cloud:libraries-bom from 26.65.0 to 26.66.0
Bumps [com.google.cloud:libraries-bom](https://github.com/googleapis/java-cloud-bom) from 26.65.0 to 26.66.0.
- [Release notes](https://github.com/googleapis/java-cloud-bom/releases)
- [Changelog](https://github.com/googleapis/java-cloud-bom/blob/main/release-please-config.json)
- [Commits](https://github.com/googleapis/java-cloud-bom/compare/v26.65.0...v26.66.0)

---
updated-dependencies:
- dependency-name: com.google.cloud:libraries-bom
  dependency-version: 26.66.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-20 13:56:43 +02:00
dependabot[bot]
e448690086 build(deps): bump software.amazon.awssdk:bom from 2.32.21 to 2.32.26
Bumps software.amazon.awssdk:bom from 2.32.21 to 2.32.26.

---
updated-dependencies:
- dependency-name: software.amazon.awssdk:bom
  dependency-version: 2.32.26
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-20 13:56:21 +02:00
Florian Hussonnois
3929bf6172 feat(system): add distinct server-events for reporting
Refactor the services used to generate periodic reports on server usage.

Related-to: kestra-io/kestra-ee#3014
2025-08-20 12:20:31 +02:00
Piyush Bhaskar
ab9951466d feat(core): implement tab tracking on editor events (#10781) 2025-08-20 14:26:46 +05:30
brian.mulier
ef59a6de26 fix(tests): add test on task runners to assert they can work and transmit their wdir 2025-08-20 09:55:01 +02:00
lizi3
0a64ae7e63 perf(sql):Optimize SQL performance by replacing SQL_CALC_FOUND_ROWS with COUNT(*) 2025-08-20 09:54:39 +02:00
lizi3
8c3cd2856a perf(sql):Optimize SQL performance by replacing SQL_CALC_FOUND_ROWS with COUNT(*) 2025-08-20 09:54:39 +02:00
AJ Emerich
6def8ef831 fix(webhook-trigger): fix documentation typos (#10790) 2025-08-20 09:18:42 +02:00
Piyush Bhaskar
0cc1bffc20 refactor(core): new No Data page when no versioned plugins (#10751)
* refactor(core): new No Data page when no versioned plugins

* chore(core): localize to languages other than english (#10752)

Co-authored-by: GitHub Action <actions@github.com>

---------

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: GitHub Action <actions@github.com>
2025-08-20 12:23:51 +05:30
Barthélémy Ledoux
3bdf55a649 refactor(flows): rename MultiPanelFlowEditorView and extract MultiPanelEditorTabs component (#10783) 2025-08-19 16:31:34 +02:00
Barthélémy Ledoux
767a375292 fix(ui): update context docs menu links and section titles (#10768) 2025-08-19 16:24:16 +02:00
brian.mulier
1509ce9b98 fix(core): change cache policy on files returned by webserver that needs to stay fresh
closes #7499
2025-08-19 11:52:48 +02:00
brian.mulier
5a3f3d3312 fix(namespaces): properly send editor content upon creating / updating ns file
part of #7499
2025-08-19 11:52:48 +02:00
Roman Acevedo
6394c337ae fix(tests): filter out ExecutionKind.TEST from FlowTriggers
- fixes Silence flow trigger on TEST-kind executions kestra-ee#4689
2025-08-19 11:03:48 +02:00
Piyush Bhaskar
be4518466f fix(kv): fixes KV creation using authStore 2025-08-19 11:39:29 +05:30
Piyush Bhaskar
543bed48c9 feat(core): changes to introduce Namespace Context (#10750) 2025-08-19 11:06:58 +05:30
Barthélémy Ledoux
5e57d11b73 refactor: make auth store use pinia (#10558)
Co-authored-by: Piyush Bhaskar <102078527+Piyush-r-bhaskar@users.noreply.github.com>
Co-authored-by: Piyush-r-bhaskar <impiyush0012@gmail.com>
2025-08-18 15:32:13 +02:00
Piyush Bhaskar
98189392a2 fix(core): show flow graph inside blueprint detail (#10771) 2025-08-18 17:59:13 +05:30
Piyush Bhaskar
ac9a01964a refactor(executions): implement splitter for execution outputs (#10677) 2025-08-18 17:58:50 +05:30
Piyush Bhaskar
8479323f97 refactor: migrate Splitpanes for Element Plus el-splitter. (#10669)
Co-authored-by: Bart Ledoux <bledoux@kestra.io>
2025-08-18 16:05:34 +05:30
brian.mulier
4b80b92423 refacto(namespaces): uniformize namespaces usage & retrieval
closes kestra-io/kestra-ee#3500
closes kestra-io/kestra-ee#3356
closes kestra-io/kestra-ee#3163
closes kestra-io/kestra-ee#4713
closes kestra-io/kestra-ee#3210
closes kestra-io/kestra#10700
related to kestra-io/kestra#10701
2025-08-18 12:18:47 +02:00
brian.mulier
2e7d714bcb chore(deps): bump ui-libs from 0.0.237 to 0.0.238 2025-08-18 12:18:47 +02:00
Roman Acevedo
73cf7f04fb test(e2e): make sure used docker image is local 2025-08-18 12:03:44 +02:00
Roman Acevedo
ac0ab7e8fa Revert "build(deps): bump com.gradleup.shadow from 8.3.9 to 9.0.1"
This reverts commit fa6da9bd0b.
2025-08-18 12:03:44 +02:00
Roman Acevedo
c1876e69ed test(e2e): print logs if backend failed to start 2025-08-18 12:03:44 +02:00
Roman Acevedo
cf73a80f2e test(e2e): fix e2e marked as cancelled when near timeout 2025-08-18 12:03:44 +02:00
Barthélémy Ledoux
53687f4a1f fix(core): avoid triggering hundreds of reactivity updates for each icon (#10766) 2025-08-18 11:37:37 +02:00
Florian Hussonnois
749bf94125 fix(core): fix preconditions rendering for ExecutionOutputs (#10651)
Ensure that preconditions are always re-rendered for any
new executions

Changes:
* add new fluent skipCache methods on RunContextProperty and Property
  classes

Fix: #10651
2025-08-18 09:24:58 +02:00
Nicolas K.
25a7994f63 fix(test): disable kafka concurrency queue test (#10755)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-08-14 16:59:21 +02:00
Anna Geller
e03c894f3a fix: spelling 2025-08-14 15:27:09 +02:00
Piyush Bhaskar
99772c1a48 fix(ui): fixes logo cut off on no permission interface (#10739) 2025-08-14 18:43:28 +05:30
Roman Acevedo
93d6b816bf fix(tests): namespace binding was breaking filtering in Flow page
fixes https://github.com/kestra-io/kestra-ee/issues/4691

the additional namespace binding in Tabs was added in PR https://github.com/kestra-io/kestra/pull/10543 to solve the special case of Namespace creation
2025-08-14 13:39:42 +02:00
Nicolas K.
a3b0512bec feat(storages): #10636 add tenant id to mock trigger (#10749)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-08-14 12:07:21 +02:00
Loïc Mathieu
265f72b629 fix(execution): parallel flowable may not ends all child flowable
Parallel flowable tasks like `Parallel`, `Dag` and `ForEach` are racy. When a task fail in a branch, other concurrent branches that have flowable may never ends.
We make sure that all children are terminated when a flowable is itself terminated.

Fixes #6780
2025-08-14 12:06:15 +02:00
YannC
07a8d9a665 fix: avoid file being displayed as diff in namespace file editor (#10746)
close #10744
2025-08-14 10:38:33 +02:00
Piyush Bhaskar
59bd607db2 refactor(misc): add misc module to override (#10737) 2025-08-14 13:48:29 +05:30
Nicolas K.
1618815df4 Feat/add get path without tenant (#10741)
* feat(storages): #10636 add get path without tenant id

* feat(storages): #10636 remove first / from get path method

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-08-13 17:48:03 +02:00
Nicolas K.
a2c3799ab7 feat(storages): #10636 add get path without tenant id (#10740)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-08-13 16:51:09 +02:00
Loïc Mathieu
986a2b4d11 chore(ci): don't run docker PR image workflow on forks 2025-08-13 15:32:41 +02:00
Loïc Mathieu
cdd591dab7 fix(tests): makes JdbcQueueTest less flaky 2025-08-13 14:56:39 +02:00
Malaydewangan09
9f5cf5aeb9 fix(): subgroups for better readability 2025-08-13 14:41:47 +05:30
Nicolas K.
cc5f73ae06 wip(storages): add non tenant dependant method to storage interface (#10637)
* wip(storages): add non tenant dependant method to storage interface

* feat(storages): #10636 add instance method to retrieve resources without the tenant id

* fix(stores): #4353 failing unit tests after now that tenant id can't be null

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-08-13 11:00:25 +02:00
dependabot[bot]
e461e46a1c build(deps): bump io.micrometer:micrometer-core from 1.15.2 to 1.15.3
Bumps [io.micrometer:micrometer-core](https://github.com/micrometer-metrics/micrometer) from 1.15.2 to 1.15.3.
- [Release notes](https://github.com/micrometer-metrics/micrometer/releases)
- [Commits](https://github.com/micrometer-metrics/micrometer/compare/v1.15.2...v1.15.3)

---
updated-dependencies:
- dependency-name: io.micrometer:micrometer-core
  dependency-version: 1.15.3
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-13 10:52:52 +02:00
dependabot[bot]
fa6da9bd0b build(deps): bump com.gradleup.shadow from 8.3.9 to 9.0.1
Bumps [com.gradleup.shadow](https://github.com/GradleUp/shadow) from 8.3.9 to 9.0.1.
- [Release notes](https://github.com/GradleUp/shadow/releases)
- [Commits](https://github.com/GradleUp/shadow/compare/8.3.9...9.0.1)

---
updated-dependencies:
- dependency-name: com.gradleup.shadow
  dependency-version: 9.0.1
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-13 10:52:21 +02:00
dependabot[bot]
3cb6815eac build(deps): bump org.assertj:assertj-core from 3.27.3 to 3.27.4
Bumps [org.assertj:assertj-core](https://github.com/assertj/assertj) from 3.27.3 to 3.27.4.
- [Release notes](https://github.com/assertj/assertj/releases)
- [Commits](https://github.com/assertj/assertj/compare/assertj-build-3.27.3...assertj-build-3.27.4)

---
updated-dependencies:
- dependency-name: org.assertj:assertj-core
  dependency-version: 3.27.4
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-13 10:19:45 +02:00
dependabot[bot]
bde9972b26 build(deps): bump actions/checkout from 4 to 5
Bumps [actions/checkout](https://github.com/actions/checkout) from 4 to 5.
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](https://github.com/actions/checkout/compare/v4...v5)

---
updated-dependencies:
- dependency-name: actions/checkout
  dependency-version: '5'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-13 10:09:06 +02:00
dependabot[bot]
bc828efec9 build(deps): bump software.amazon.awssdk.crt:aws-crt
Bumps [software.amazon.awssdk.crt:aws-crt](https://github.com/awslabs/aws-crt-java) from 0.38.8 to 0.38.9.
- [Release notes](https://github.com/awslabs/aws-crt-java/releases)
- [Commits](https://github.com/awslabs/aws-crt-java/compare/v0.38.8...v0.38.9)

---
updated-dependencies:
- dependency-name: software.amazon.awssdk.crt:aws-crt
  dependency-version: 0.38.9
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-13 10:07:37 +02:00
dependabot[bot]
c62f503f1a build(deps): bump software.amazon.awssdk:bom from 2.32.16 to 2.32.21
Bumps software.amazon.awssdk:bom from 2.32.16 to 2.32.21.

---
updated-dependencies:
- dependency-name: software.amazon.awssdk:bom
  dependency-version: 2.32.21
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-13 10:06:21 +02:00
dependabot[bot]
15a6323122 build(deps): bump flyingSaucerVersion from 9.13.1 to 9.13.2
Bumps `flyingSaucerVersion` from 9.13.1 to 9.13.2.

Updates `org.xhtmlrenderer:flying-saucer-core` from 9.13.1 to 9.13.2
- [Release notes](https://github.com/flyingsaucerproject/flyingsaucer/releases)
- [Changelog](https://github.com/flyingsaucerproject/flyingsaucer/blob/main/CHANGELOG.md)
- [Commits](https://github.com/flyingsaucerproject/flyingsaucer/compare/v9.13.1...v9.13.2)

Updates `org.xhtmlrenderer:flying-saucer-pdf` from 9.13.1 to 9.13.2
- [Release notes](https://github.com/flyingsaucerproject/flyingsaucer/releases)
- [Changelog](https://github.com/flyingsaucerproject/flyingsaucer/blob/main/CHANGELOG.md)
- [Commits](https://github.com/flyingsaucerproject/flyingsaucer/compare/v9.13.1...v9.13.2)

---
updated-dependencies:
- dependency-name: org.xhtmlrenderer:flying-saucer-core
  dependency-version: 9.13.2
  dependency-type: direct:production
  update-type: version-update:semver-patch
- dependency-name: org.xhtmlrenderer:flying-saucer-pdf
  dependency-version: 9.13.2
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-13 10:05:10 +02:00
dependabot[bot]
21cb7b497d build(deps): bump org.jooq:jooq from 3.20.5 to 3.20.6
Bumps org.jooq:jooq from 3.20.5 to 3.20.6.

---
updated-dependencies:
- dependency-name: org.jooq:jooq
  dependency-version: 3.20.6
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-13 10:03:54 +02:00
Loïc Mathieu
26cb6ef9ad fix(execution): concurrency limit didn't work with afterExecutions
This is because the execution is never considered fully terminated so concurrency limit is not handled properly.
This should also affect SLA, trigger lock, and other cleaning stuff.

The root issue is that, with a worker task from afterExecution, there are no other update on the execution itself (as it's already terminated) so no execution messages are again processed by the executor.

Because of that, the worker task result message from the afterExecution block is the last message, but unfortunatly as messages from the worker task result have no flow attached, the computation of the final termination is incorrect.
The solution is to load the flow if null inside the executor and the execution is terminated which should only occurs inside afterExecution.

Fixes #10657
Fixes #8459
Fixes #8609
2025-08-13 09:29:46 +02:00
Piyush Bhaskar
95c438515d fix(core): pass viewTypes to initYamlSource (#10704) 2025-08-13 12:32:17 +05:30
Florian Hussonnois
194ae826e5 chore(system): add WorkerJobQueueInterface to properly pass workerId on subscribe 2025-08-12 19:26:31 +02:00
Prayag
31dbecec77 fix(core): Enter key is now validating filter / refreshing data (#9630)
closes #9471

---------

Co-authored-by: brian.mulier <bmmulier@hotmail.fr>
2025-08-12 17:23:10 +02:00
Anna Geller
b39bcce2e8 fix(translation): close https://github.com/kestra-io/kestra/issues/9857 2025-08-12 13:00:14 +02:00
github-actions[bot]
95ac5ce8a7 chore(core): localize to languages other than english (#10697)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-08-12 12:54:34 +02:00
Piyush Bhaskar
90f913815d fix(core): fix misc store to access configs. (#10692) 2025-08-12 16:24:17 +05:30
Anna Geller
5944db5cc8 fix: translation for sample prompt (#10696) 2025-08-12 12:51:10 +02:00
Loïc Mathieu
577f813eef fix(executions): SLA monitor should take into account restarted executions 2025-08-12 11:46:58 +02:00
Loïc Mathieu
06a9f13676 fix(executions): concurrency limit exceeded when restarting an execution
Fixes #7880
2025-08-12 11:46:58 +02:00
Loïc Mathieu
1fd6e23f96 feat(flows): Flow SLA out of beta
Part-of: https://github.com/kestra-io/kestra-ee/issues/4555
2025-08-12 11:29:32 +02:00
Piyush Bhaskar
9a32780c8c fix(flow): fixes flow deletion inside actions (#10693) 2025-08-12 14:56:31 +05:30
Nicolas K.
af140baa66 Feat/add filters to repositories (#10629)
* wip(repositories): use query filter in the log repository

* feat(repositories): #10628 refactor query builder engine

* fix(repositories): #10628 add sort to findAsych query

* Update core/src/main/java/io/kestra/core/utils/ListUtils.java

Co-authored-by: Loïc Mathieu <loikeseke@gmail.com>

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
Co-authored-by: Loïc Mathieu <loikeseke@gmail.com>
2025-08-12 11:17:47 +02:00
Florian Hussonnois
54b0183b95 fix(system): avoid unsupported type error on ServiceType enum 2025-08-12 10:01:30 +02:00
Loïc Mathieu
64de3d5fa8 fix(executions): correctly fail the request when trying to resume an execution with the wrong inputs
Fixes #9959
2025-08-12 09:39:02 +02:00
Piyush Bhaskar
4c17aadb81 fix(ui): more visible color for deafult edge (#10690) 2025-08-12 12:44:20 +05:30
Piyush Bhaskar
bf424fbf53 fix(core): reduce size of code block text and padding (#10689) 2025-08-12 11:46:52 +05:30
brian.mulier
edcdb88559 fix(dashboard): avoid duplicate dashboard calls + properly refresh dashboards on refresh button + don't discard component entirely on refresh 2025-08-11 22:28:19 +02:00
brian.mulier
9a9d0b995a fix(dashboard): properly use time filters in queries
closes kestra-io/kestra-ee#4389
2025-08-11 22:28:19 +02:00
brian-mulier-p
5c5d313fb0 fix(metrics): restore autocompletion on metrics filter (#10688) 2025-08-11 21:08:56 +02:00
Nicolas K.
dfd4d87867 feat(releases): add test jar to meven central deployment (#10675)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-08-11 15:56:51 +02:00
Piyush Bhaskar
367d773a86 fix(flows): enable the save and makes tab dirty when have unsaved changes in no code (#10671) 2025-08-11 18:35:56 +05:30
brian.mulier
c819f15c66 tests(core): add a test to taskrunners to ensure it's working multiple times on the same working directory
part of kestra-io/plugin-ee-kubernetes#45
2025-08-11 14:59:15 +02:00
Loïc Mathieu
673b5c994c feat(flows): add upstream dependencies in flow dependencies
Closes #10638
2025-08-11 12:43:33 +02:00
Loïc Mathieu
2acf37e0e6 fix(executions): properly fail the task if it contains unsupported unicode sequence
This occurs in Postgres using the `\u0000` unicode sequence. Postgres refuse to store any JSONB with this sequence as it has no textual representation.
We now properly detect that and fail the task.

Fixes #10326
2025-08-11 11:53:39 +02:00
Ludovic DEHON
0d7fcbb936 build(core): create a docker image for each pull request (#10644)
relate to kestra-io/kestra#10643
2025-08-09 00:18:28 +02:00
Miloš Paunović
42b01d6951 chore(core): reload number of dependencies on flow save action (#10663)
Closes https://github.com/kestra-io/kestra/issues/10484.
2025-08-08 15:11:41 +02:00
Miloš Paunović
9edfb01920 chore(core): uniform dependency table namespace label (#10655) 2025-08-08 13:14:53 +02:00
Miloš Paunović
7813337f48 fix(core): ensure dependency table updates occur after dom is fully rendered (#10654)
Closes https://github.com/kestra-io/kestra/issues/10639.
2025-08-08 12:52:16 +02:00
Miloš Paunović
ea0342f82a refactor(core): remove revision property from flow nodes in dependency graph (#10650)
Related to https://github.com/kestra-io/kestra/issues/10633.
2025-08-08 12:21:01 +02:00
Piyush Bhaskar
ca8f25108e fix(core): update flow store usage. (#10649) 2025-08-08 11:34:09 +02:00
Miloš Paunović
49b6c331a6 chore(core): amend edge color scheme in execution dependency graph (#10648)
Related to https://github.com/kestra-io/kestra/issues/10639.
2025-08-08 11:29:11 +02:00
Miloš Paunović
e409fb7ac0 chore(core): lower the wheel sensitivity on zooming of dependency graph (#10647)
Relates to https://github.com/kestra-io/kestra/issues/10639.
2025-08-08 10:27:51 +02:00
Miloš Paunović
0b64c29794 fix(flows): properly import pinia store into a dependency graph composable (#10646) 2025-08-08 10:25:58 +02:00
Piyush Bhaskar
c4665460aa fix(flows): copy trigger url propely. (#10645) 2025-08-08 12:57:41 +05:30
Barthélémy Ledoux
5423b6e3a7 refactor: move flow store to pinia (#10620) 2025-08-08 09:04:33 +02:00
Vanshika Kumar
114669e1b5 chore(core): add padding around user image in left sidebar (#10553)
Co-authored-by: Vanshika Kumar <vanshika.kumar-ext@infra.market>
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-08-08 08:34:23 +02:00
Loïc Mathieu
d75f0ced38 fix(executions): allow caching tasks that use the 'workingDir' variable
Fixes #10253
2025-08-07 17:26:24 +02:00
brian.mulier
0a788d8429 fix(core): ensure props with defaults are not marked as required in generated doc 2025-08-07 15:07:00 +02:00
brian.mulier
8c25d1bbd7 fix(core): wrong @NotNull import leading to key not being marked as required
closes #9287
2025-08-07 15:07:00 +02:00
YannC
4e2e8f294f fix: avoid calling nextExecutionDate if value is null when resetting trigger (#10547) 2025-08-07 14:51:27 +02:00
Barthélémy Ledoux
2c34804ce2 fix(core): update necessary node viewer in gradle build (#10624) 2025-08-07 13:38:29 +02:00
Piyush Bhaskar
bab4eef790 refactor(namespace): migrate namespace module to pinia (#10571)
* refactor(namespace): migrate namespace module to pinia

* refactor(namespaces): override the store and fix the test

* fix:  test in good way

* refactor: rename action as ee

* refactor: state and action is different

* refactor:  namespaces store in composition  API and composable to use the common state, actions

* fix: export validate
2025-08-07 16:20:51 +05:30
Miloš Paunović
94aa628ac1 feat(core): implement different graph type for dependencies view (#10240)
Closes https://github.com/kestra-io/kestra/issues/5350.
Closes https://github.com/kestra-io/kestra/issues/10446.
Closes https://github.com/kestra-io/kestra/issues/10563.
Closes https://github.com/kestra-io/kestra-ee/issues/3431.
Closes https://github.com/kestra-io/kestra-ee/issues/4509.

Relates to https://github.com/kestra-io/kestra/issues/10484.
Relates to https://github.com/kestra-io/kestra-ee/issues/3550.
2025-08-07 12:12:12 +02:00
Loïc Mathieu
da180fbc00 chore(system): add a note on MapUtils.nestedToFlattenMap() method 2025-08-07 12:01:31 +02:00
Anna Geller
c7bd592bc7 fix(ai-agent): add prompt suggestion 2025-08-07 10:42:35 +02:00
Florian Hussonnois
693d174960 chore(system): provide a more useful Either utility class
Rewrite and add tests to Either class to be a bit
more useable
2025-08-07 10:31:28 +02:00
Florian Hussonnois
8ee492b9c5 fix(system): fix consumer commit on JDBC queue
Ensure that JDBC queue records are committed to the consumer
after processing. This fixes a rare issue where executions could be blocked after a runner crash.
2025-08-07 10:31:17 +02:00
Loïc Mathieu
d6b8ba34ea chore(system): provide a MapUtils.nestedToFlattenMap() method
It will be used to nest a previously flatten map when needed.
2025-08-07 10:00:13 +02:00
dependabot[bot]
08cc853e00 build(deps): bump software.amazon.awssdk.crt:aws-crt
Bumps [software.amazon.awssdk.crt:aws-crt](https://github.com/awslabs/aws-crt-java) from 0.38.7 to 0.38.8.
- [Release notes](https://github.com/awslabs/aws-crt-java/releases)
- [Commits](https://github.com/awslabs/aws-crt-java/compare/v0.38.7...v0.38.8)

---
updated-dependencies:
- dependency-name: software.amazon.awssdk.crt:aws-crt
  dependency-version: 0.38.8
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-07 09:19:01 +02:00
dependabot[bot]
4f68715483 build(deps): bump org.apache.commons:commons-compress
Bumps [org.apache.commons:commons-compress](https://github.com/apache/commons-compress) from 1.27.1 to 1.28.0.
- [Changelog](https://github.com/apache/commons-compress/blob/master/RELEASE-NOTES.txt)
- [Commits](https://github.com/apache/commons-compress/compare/rel/commons-compress-1.27.1...rel/commons-compress-1.28.0)

---
updated-dependencies:
- dependency-name: org.apache.commons:commons-compress
  dependency-version: 1.28.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-07 09:18:02 +02:00
Karthik D
edde1b6730 fix(core): fixes overflow of outputs content
* fix

* fix

* fix: minor tweaks

* fix: scope the style

---------

Co-authored-by: Piyush-r-bhaskar <impiyush0012@gmail.com>
2025-08-07 12:37:44 +05:30
Biplab Bera
399446f52e feat: disabled the preview button in output tabs for zip files (#10535)
Co-authored-by: Piyush Bhaskar <102078527+Piyush-r-bhaskar@users.noreply.github.com>
2025-08-07 11:56:58 +05:30
Florian Hussonnois
c717890fbc fix(build): fix and enhance release-plugins.sh
Skip gradle release when tag already exists
Check for staging files before commiting
2025-08-06 17:17:50 +02:00
Barthélémy Ledoux
5328b0c574 fix(flows): allow date inputs in playground (#10611) 2025-08-06 15:36:29 +02:00
Barthélémy Ledoux
de14cae1f0 fix(flows): playground only clear highlighted lines on leave task (#10612) 2025-08-06 15:36:17 +02:00
Miloš Paunović
d8a3e703e7 feat(core): add animated edges to topology graph (#10616)
Closes kestra-io/kestra#10614.
2025-08-06 14:49:31 +02:00
dependabot[bot]
90659bc320 build(deps): bump com.azure:azure-sdk-bom from 1.2.36 to 1.2.37
Bumps [com.azure:azure-sdk-bom](https://github.com/azure/azure-sdk-for-java) from 1.2.36 to 1.2.37.
- [Release notes](https://github.com/azure/azure-sdk-for-java/releases)
- [Commits](https://github.com/azure/azure-sdk-for-java/compare/azure-sdk-bom_1.2.36...azure-sdk-bom_1.2.37)

---
updated-dependencies:
- dependency-name: com.azure:azure-sdk-bom
  dependency-version: 1.2.37
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-06 12:55:33 +02:00
dependabot[bot]
37d1d8856e build(deps): bump software.amazon.awssdk:bom from 2.32.11 to 2.32.16
Bumps software.amazon.awssdk:bom from 2.32.11 to 2.32.16.

---
updated-dependencies:
- dependency-name: software.amazon.awssdk:bom
  dependency-version: 2.32.16
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-06 11:56:59 +02:00
Florian Hussonnois
93a4eb5cbc build: add plugin-datagen to plugin list 2025-08-06 11:11:46 +02:00
Miloš Paunović
de160c8a2d chore(deps): regular dependency update (#10607)
Performing a weekly round of dependency updates in the NPM ecosystem to keep everything up to date.
2025-08-06 10:20:32 +02:00
dependabot[bot]
28458b59eb build(deps): bump com.mysql:mysql-connector-j from 9.3.0 to 9.4.0
Bumps [com.mysql:mysql-connector-j](https://github.com/mysql/mysql-connector-j) from 9.3.0 to 9.4.0.
- [Changelog](https://github.com/mysql/mysql-connector-j/blob/release/9.x/CHANGES)
- [Commits](https://github.com/mysql/mysql-connector-j/compare/9.3.0...9.4.0)

---
updated-dependencies:
- dependency-name: com.mysql:mysql-connector-j
  dependency-version: 9.4.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-06 09:50:39 +02:00
dependabot[bot]
2a256d9505 build(deps): bump org.eclipse.angus:jakarta.mail from 2.0.3 to 2.0.4
Bumps org.eclipse.angus:jakarta.mail from 2.0.3 to 2.0.4.

---
updated-dependencies:
- dependency-name: org.eclipse.angus:jakarta.mail
  dependency-version: 2.0.4
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-06 09:50:20 +02:00
dependabot[bot]
9008b21007 build(deps): bump com.google.cloud:libraries-bom from 26.64.0 to 26.65.0
Bumps [com.google.cloud:libraries-bom](https://github.com/googleapis/java-cloud-bom) from 26.64.0 to 26.65.0.
- [Release notes](https://github.com/googleapis/java-cloud-bom/releases)
- [Changelog](https://github.com/googleapis/java-cloud-bom/blob/main/release-please-config.json)
- [Commits](https://github.com/googleapis/java-cloud-bom/compare/v26.64.0...v26.65.0)

---
updated-dependencies:
- dependency-name: com.google.cloud:libraries-bom
  dependency-version: 26.65.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-06 09:49:35 +02:00
dependabot[bot]
8c13bf6a71 build(deps): bump com.gradleup.shadow from 8.3.8 to 8.3.9
Bumps [com.gradleup.shadow](https://github.com/GradleUp/shadow) from 8.3.8 to 8.3.9.
- [Release notes](https://github.com/GradleUp/shadow/releases)
- [Commits](https://github.com/GradleUp/shadow/compare/8.3.8...8.3.9)

---
updated-dependencies:
- dependency-name: com.gradleup.shadow
  dependency-version: 8.3.9
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-06 09:49:06 +02:00
dependabot[bot]
43888cc3dd build(deps): bump actions/download-artifact from 4 to 5
Bumps [actions/download-artifact](https://github.com/actions/download-artifact) from 4 to 5.
- [Release notes](https://github.com/actions/download-artifact/releases)
- [Commits](https://github.com/actions/download-artifact/compare/v4...v5)

---
updated-dependencies:
- dependency-name: actions/download-artifact
  dependency-version: '5'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-06 09:48:47 +02:00
Piyush Bhaskar
c94093d9f6 fix(flows): ensure plugin documentation change on flow switch (#10546)
Co-authored-by: Barthélémy Ledoux <bledoux@kestra.io>
2025-08-05 14:29:36 +05:30
Barthélémy Ledoux
8779dec28a fix(flows): add conditional rendering for restart button based on execution (#10570) 2025-08-05 10:22:13 +02:00
Nicolas K.
41614c3a6e feat(stores): #4353 list all KV for namespace and parent namespaces (#10470)
* feat(stores): #4353 list all KV for namespace and parent namespaces

* feat(stores): #4353 list all KV for namespace and parent namespaces

* feat(stores): #4353 list all KV for namespace and parent namespaces

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-08-05 09:55:41 +02:00
Barthélémy Ledoux
6b4fdd0688 fix: restore InputForm (#10568) 2025-08-05 09:44:39 +02:00
Loïc Mathieu
0319f3d267 feat(system): set the default number of worker threads to 8x available cpu cores
This is a better default for mixed workloads and provides better tail latency.
This is also what we advise to our customer.
2025-08-05 09:19:14 +02:00
brian.mulier
0b37fe2cb8 fix(namespaces): autocomplete in kv & secrets
related to kestra-io/kestra-ee#4559
2025-08-04 20:29:56 +02:00
brian.mulier
e623dd7729 fix(executions): avoid SSE error in follow execution dependencies
closes #10560
2025-08-04 20:22:32 +02:00
Barthélémy Ledoux
db4f7cb4ff fix(flows)*: load flow for execution needs to be stored most of the time (#10566) 2025-08-04 18:54:01 +02:00
Abhilash T
b14b16db0e fix: Updated InputsForm.vue to clear Radio Button Selection (#9654)
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
Co-authored-by: Bart Ledoux <bledoux@kestra.io>
2025-08-04 16:03:25 +02:00
brian.mulier
77f6cec0e4 fix(executions): restore execution redirect & subflow logs view from parent
closes #10528
closes #10551
2025-08-04 15:46:48 +02:00
Piyush Bhaskar
1748b18d66 chore(core): remove variable and directly assign. (#10554) 2025-08-04 18:45:19 +05:30
Piyush Bhaskar
32f96348c1 fix(core): proper state detection from parsed data (#10527) 2025-08-04 18:41:05 +05:30
Barthélémy Ledoux
07db0a8c80 fix(flows): no-code - when changing type message avoid warning (#10498) 2025-08-04 14:57:28 +02:00
Barthélémy Ledoux
2035fd42c3 refactor: use composition api and ts on revision component (#10529) 2025-08-04 14:56:36 +02:00
Barthélémy Ledoux
2856bf07e8 refactor: move editor from vuex to pinia (#10533)
Co-authored-by: Piyush-r-bhaskar <impiyush0012@gmail.com>
2025-08-04 14:55:55 +02:00
Barthélémy Ledoux
f5327cec33 fix: remove debugging value from playground (#10541) 2025-08-04 14:54:45 +02:00
Anna Geller
42955936b2 fix: demo no longer exists 2025-08-04 14:38:13 +02:00
Miloš Paunović
771b98e023 chore(namespaces): add the needed prop for loading all namespaces inside a selector (#10544) 2025-08-04 12:44:38 +02:00
Miloš Paunović
b80e8487e3 fix(namespaces): amend problems with namespace secrets and kv pairs (#10543)
Closes https://github.com/kestra-io/kestra-ee/issues/4584.
2025-08-04 12:19:52 +02:00
YannC.
f35a0b6d60 fix: add missing webhook releases secrets for github releases 2025-08-01 23:21:27 +02:00
brian.mulier
0c9ed17f1c fix(core): remove icon for inputs in no-code
closes #10520
2025-08-01 16:32:08 +02:00
brian.mulier
7ca20371f8 fix(executions): avoid race condition leading to never-ending follow with non-terminal state 2025-08-01 13:12:14 +02:00
brian.mulier
8ff3454cbd fix(core): ensure instances can read all messages when no consumer group / queue type 2025-08-01 13:12:14 +02:00
Piyush Bhaskar
09593d9fd2 fix(namespaces): fixes loading of additional ns (#10518) 2025-08-01 16:28:01 +05:30
Loïc Mathieu
d3cccf36f0 feat(flow): pull up description to the FlowInterface
This avoid the need to parse the flow for ex by AI to get the description.
2025-08-01 12:43:49 +02:00
Loïc Mathieu
eeb91cd9ed fix(tests): RunContextLoggerTest.secrets(): wrong number of logs in awaitLogs() 2025-08-01 12:41:41 +02:00
Loïc Mathieu
2679b0f067 feat(flows): warn on runnable only properties on non-runnable tasks
Closes #9967
Closes #10500
2025-08-01 12:41:08 +02:00
Piyush Bhaskar
54281864c8 fix(executions): do not rely on monaco to get value (#10515) 2025-08-01 13:23:43 +05:30
Loïc Mathieu
e4f9b11d0c fix(ci): workflow build artifact doesn't need the plugin version 2025-08-01 09:41:48 +02:00
Barthélémy Ledoux
12cef0593c fix(flows): playground need to use ui-libs (#10506) 2025-08-01 09:06:11 +02:00
Piyush Bhaskar
c6cf8f307f fix(flows): route to flow page (#10514) 2025-08-01 12:10:56 +05:30
Piyush Bhaskar
3b4eb55f84 fix(executions): properly handle methods and computed for tabs (#10513) 2025-08-01 12:10:27 +05:30
YannC
d32949985d fix: handle empty flows list in lastExecutions correctly (#10493) 2025-08-01 07:21:00 +02:00
YannC
c051ca2e66 fix(ui): load correctly filters + refresh dashboard on filter change (#10504) 2025-08-01 07:15:46 +02:00
Piyush Bhaskar
93a456963b fix(editor): adjust padding for editor (#10497)
* fix(editor): adjust padding for editor

* fix: make padding 16px
2025-07-31 19:10:46 +05:30
YannC.
9a45f17680 fix(ci): do not run github release on tag 2025-07-31 14:37:51 +02:00
github-actions[bot]
5fb6806d74 chore(core): localize to languages other than english (#10494)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-31 17:44:10 +05:30
Barthélémy Ledoux
f3cff72edd fix(flows): forget all old taskRunId when a new execution (#10487) 2025-07-31 13:41:57 +02:00
Barthélémy Ledoux
0abc660e7d fix(flows): wait longer for widgets to be rendered (#10485) 2025-07-31 13:41:46 +02:00
Barthélémy Ledoux
f09ca3d92e fix(flows): load flows documentation when coming back to no-code root (#10374) 2025-07-31 13:41:36 +02:00
YannC
9fd778fca1 feat(ui): added http method autocompletion (#10492) 2025-07-31 13:28:59 +02:00
Loïc Mathieu
667af25e1b fix(executions): Don't create outputs from the Subflow task when we didn't wait
As, well, if we didn't wait for the subflow execution, we cannot have access to its outputs.
2025-07-31 13:06:58 +02:00
github-actions[bot]
1b1aed5ff1 chore(core): localize to languages other than english (#10489)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-31 12:14:37 +02:00
Barthélémy Ledoux
da1bb58199 fix(flows): add the load errors to the flow errors (#10483) 2025-07-31 11:53:43 +02:00
Loïc Mathieu
d3e661f9f8 feat(system): improve performance of computeSchedulable
- Store flowIds in a list to avoid computing the multiple times
- Storeg triggers by ID in a map to avoid iterating the list of triggers for each flow
2025-07-31 11:35:01 +02:00
yuri1969
2126c8815e feat(core): validate URL configuration
Used the `ServerCommandValidator` style.

BREAKING CHANGE: app won't start due invalid `kestra.url`
2025-07-31 11:24:21 +02:00
yuri1969
6cfc5b8799 fix(build): reduce Gradle warnings 2025-07-31 11:21:01 +02:00
Barthélémy Ledoux
16d44034f0 fix(flows): hide executionkind meta in the logs (#10482) 2025-07-31 10:50:34 +02:00
Barthélémy Ledoux
f76e62a4af fix(executions): do not rely on monaco to get value (#10467) 2025-07-31 09:28:33 +02:00
Piyush Bhaskar
f6645da94c fix(core): remove top spacing from no execution page and removing the redundant code (#10445)
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-07-31 12:03:58 +05:30
github-actions[bot]
93b2bbf0d0 chore(core): localize to languages other than english (#10471)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-31 08:23:08 +02:00
Piyush Bhaskar
9d46e2aece fix(executions): make columns that are not links normal text (#10460)
* fix(executions): make it normal text

* fix(executions): use monospace font only
2025-07-31 10:33:33 +05:30
brian.mulier
133315a2a5 chore(deps): hardcode vue override version 2025-07-30 19:25:50 +02:00
brian.mulier
b96b9bb414 fix(core): avoid follow execution from being discarded too early
closes #10472
closes #7623
2025-07-30 19:25:50 +02:00
Barthélémy Ledoux
9865d8a7dc fix(flows): playground - implement new designs (#10459)
Co-authored-by: brian.mulier <bmmulier@hotmail.fr>
2025-07-30 17:54:46 +02:00
brian-mulier-p
29f22c2f81 fix(core): redesign playground run task button (#10423)
closes #10389
2025-07-30 15:23:33 +02:00
dependabot[bot]
3e69469381 build(deps): bump net.thisptr:jackson-jq from 1.3.0 to 1.4.0
Bumps [net.thisptr:jackson-jq](https://github.com/eiiches/jackson-jq) from 1.3.0 to 1.4.0.
- [Release notes](https://github.com/eiiches/jackson-jq/releases)
- [Commits](https://github.com/eiiches/jackson-jq/compare/1.3.0...1.4.0)

---
updated-dependencies:
- dependency-name: net.thisptr:jackson-jq
  dependency-version: 1.4.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-07-30 15:08:39 +02:00
dependabot[bot]
38c24ccf7f build(deps): bump software.amazon.awssdk:bom from 2.32.6 to 2.32.11
Bumps software.amazon.awssdk:bom from 2.32.6 to 2.32.11.

---
updated-dependencies:
- dependency-name: software.amazon.awssdk:bom
  dependency-version: 2.32.11
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-07-30 15:07:49 +02:00
Loïc Mathieu
12cf41a309 fix(ci): don't publish docker in build-artifact 2025-07-30 14:42:16 +02:00
Malaydewangan09
7b8ea0d885 feat(plugins): add script plugins 2025-07-30 17:27:48 +05:30
Barthélémy Ledoux
cf88bbcb12 fix(flows): playground align restart button button (#10415) 2025-07-30 11:57:24 +02:00
Loïc Mathieu
6abe7f96e7 fix(ci): add missing build artifact job 2025-07-30 11:47:10 +02:00
Loïc Mathieu
e73ac78d8b build(ci): allow downloading the exe from the workflow and not the release
This would allow running the workflow even if the release step fail
2025-07-30 11:23:43 +02:00
François Delbrayelle
b0687eb702 fix(): fix icons 2025-07-30 10:28:10 +02:00
weibo1
85f9070f56 feat: Trigger Initialization Method Performance Optimization 2025-07-30 09:23:48 +02:00
YannC
0a42ab40ec fix(dashboard): pageSize & pageNumber is now correctly pass when fetching a chart (#10413) 2025-07-30 08:45:20 +02:00
Piyush Bhaskar
856d2d1d51 refactor(flows): remove execution chart (#10425) 2025-07-30 11:54:35 +05:30
YannC.
a7d6dbc8a3 feat(ci): allow to run github release ci on dispatch 2025-07-29 15:04:50 +02:00
YannC.
cf82109da6 fix(ci): correctly pass GH token to release workflow 2025-07-29 15:01:36 +02:00
Barthélémy Ledoux
d4168ba424 fix(flows): playground clear current execution when clearExecutions() (#10414) 2025-07-29 14:43:11 +02:00
Loïc Mathieu
46a294f25a chore(version): upgrade to v1.0.0-SNAPSHOT 2025-07-29 14:23:19 +02:00
Loïc Mathieu
a229036d8d chore(version): update to version 'v0.24.0-rc0-SNAPSHOT'. 2025-07-29 14:21:49 +02:00
François Delbrayelle
a518fefecd feat(plugins): add plugin-deepseek 2025-07-29 13:58:11 +02:00
Barthélémy Ledoux
1d3210fd7d fix(flows): remove text from warning button (#10370) 2025-07-29 11:27:37 +02:00
brian-mulier-p
597f84ecb7 fix(core): topology was no longer working on new flows (#10411)
closes #10354
2025-07-29 11:19:05 +02:00
Barthélémy Ledoux
5f3c7ac9f0 fix(core): allow icons api call to take longer than local call (#10412) 2025-07-29 11:13:12 +02:00
Nicolas K.
77c4691b04 fix(tests): rework basic auth service test (#10409)
* fix(tests): rework basic auth service test

* fix(tests): clean basic auth service test

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-07-29 10:59:43 +02:00
github-actions[bot]
6d34416529 chore(core): localize to languages other than english (#10410)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-29 13:55:25 +05:30
Piyush Bhaskar
40a67d5dcd feat(flows): add webhook curl and json field (#10392) 2025-07-29 13:48:28 +05:30
Loïc Mathieu
2c68c704f6 fix(test): flaky test JdbcQueueTest.withGroupAndType() 2025-07-29 09:35:28 +02:00
Miloš Paunović
e59d9f622c chore(namespaces): properly handle file name field on flow run dialog if set from defaults (#10390)
Closes https://github.com/kestra-io/kestra/issues/10365.
2025-07-29 08:29:08 +02:00
Piyush Bhaskar
c951ba39a7 fix(core): make validation less aggressive (#10406) 2025-07-29 11:41:38 +05:30
github-actions[bot]
a0200cfacb chore(core): localize to languages other than english (#10405)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-28 20:50:23 +02:00
dependabot[bot]
c6310f0697 build(deps): bump com.github.ben-manes.caffeine:caffeine
Bumps [com.github.ben-manes.caffeine:caffeine](https://github.com/ben-manes/caffeine) from 3.2.1 to 3.2.2.
- [Release notes](https://github.com/ben-manes/caffeine/releases)
- [Commits](https://github.com/ben-manes/caffeine/compare/v3.2.1...v3.2.2)

---
updated-dependencies:
- dependency-name: com.github.ben-manes.caffeine:caffeine
  dependency-version: 3.2.2
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-07-28 17:28:31 +02:00
dependabot[bot]
21ba59a525 build(deps): bump org.postgresql:postgresql in the gradle group
Bumps the gradle group with 1 update: [org.postgresql:postgresql](https://github.com/pgjdbc/pgjdbc).


Updates `org.postgresql:postgresql` from 42.7.6 to 42.7.7
- [Release notes](https://github.com/pgjdbc/pgjdbc/releases)
- [Changelog](https://github.com/pgjdbc/pgjdbc/blob/master/CHANGELOG.md)
- [Commits](https://github.com/pgjdbc/pgjdbc/compare/REL42.7.6...REL42.7.7)

---
updated-dependencies:
- dependency-name: org.postgresql:postgresql
  dependency-version: 42.7.7
  dependency-type: direct:production
  dependency-group: gradle
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-07-28 17:26:23 +02:00
Barthélémy Ledoux
4f9e3cd06c fix(flows): bring back alignment on overview (#10369) 2025-07-28 17:22:55 +02:00
Barthélémy Ledoux
e74010d1a4 fix(flows): playground - use all tasks as breakpoints (#10399) 2025-07-28 17:22:23 +02:00
Barthélémy Ledoux
465e6467e9 chore: update ui-libs (#10391) 2025-07-28 17:12:56 +02:00
YannC
c68c1b16d9 fix: set postgres and mysql queue offset as a bigint (#10344) 2025-07-28 16:28:09 +02:00
YannC
468c32156e chore: enforce micronaut open api version until Micronaut Platform works (#10393) 2025-07-28 16:27:59 +02:00
Loïc Mathieu
6e0a1c61ef fix(tests): increase the sleep inside ExecutionControllerRunnerTest.triggerExecutionAndFollowDependencies()
I'm not happy with that but I ran 3x 100 repetitions and all passed
2025-07-28 16:23:40 +02:00
Loïc Mathieu
552d55ef6b fix(test): RestactCaseTest.restartFailedWithFinally() should use executionService.isTerminated() 2025-07-28 16:23:40 +02:00
skayliu
08b0b682bf refactor(pebble): add more timestamp data time format 2025-07-28 16:17:55 +02:00
ben8t
cff90c93bb feat(plugin): add Notion plugin (#10049) 2025-07-28 16:09:01 +02:00
Roman Acevedo
ea465056d0 fix(triggers): bulk action on triggers did not take into account this is async (#10307) 2025-07-28 15:21:03 +02:00
Piyush Bhaskar
02f150f0b0 fix(core): animation on ai agent button (#10379)
* fix(core): animation on ai agent button

* reafctor(ai): add AITriggerButton component.
2025-07-28 18:48:34 +05:30
Loïc Mathieu
95d95d3d3c fix(tests): fix assertions in DateFilterTest.now() 2025-07-28 14:51:41 +02:00
Nicolas K.
6b8d3d6928 fix(tests): flaky test with wire mock not staring fast enough (#10383)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-07-28 14:07:48 +02:00
Piyush Bhaskar
1e347073ca fix(plugins): go to list of plugins from sidebar (#10385)
* fix(plugins): go to list of plugins from sidebar

* fix: update route to use path.
2025-07-28 17:24:09 +05:30
Loïc Mathieu
ac09dcecd9 fix(flows): wrong warning in FILE input
As we deprecate the `extension` property but still have a default for it, the warning is always disaply.
Removing the default and applying only when needed fixes the issue.

Fixes #10361
2025-07-28 13:40:11 +02:00
Miloš Paunović
40b337cd22 fix(namespaces): make sure the namespace parameter is properly passed when reading a file (#10384)
Relates to https://github.com/kestra-io/kestra/issues/10363.
Relates to https://github.com/kestra-io/kestra-ee/issues/4514.
2025-07-28 12:35:44 +02:00
Karthik D
5377d16036 chore(plugins): simplify the plugins search field (#10373)
Closes https://github.com/kestra-io/kestra/issues/10300.

Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-07-28 12:15:08 +02:00
Barthélémy Ledoux
f717bc413f tests: update frontend-tests (#10380) 2025-07-28 12:02:32 +02:00
Loïc Mathieu
d6bed2d235 fix(flows): file watching flow should delete the flow when the file is deleted while reading for update
When you use flow watching, a file can be updated then deleted, if it occurs quickly you could have a modified event, read the file then have a file not found exception as it has been deleted.
We should delete the flow in this case.

This has been detected because the related tests are flaky, doing that reduce falkiness of the related tests
2025-07-28 12:00:30 +02:00
Loïc Mathieu
07fd74b238 fix(test): flaky ExecutionControllerRunnerTest.triggerExecutionAndFollowDependencies() 2025-07-28 11:50:57 +02:00
Loïc Mathieu
60eef29de2 fix(tests): await for terminated execution instead of sleep in ExecutionControllerRunnerTest.killByIdShouldFailed() 2025-07-28 11:50:57 +02:00
Loïc Mathieu
20ca7b6380 fix(tests): increase wait time to 10 in LogConsumerTests.logs()
This  should reduce flakiness
2025-07-28 11:50:57 +02:00
Piyush Bhaskar
9d82df61c6 Revert "fix(core): fixes cumbersome operator selection (#10322)" (#10377)
This reverts commit e78210b5eb.
2025-07-28 14:52:58 +05:30
Piyush Bhaskar
e78210b5eb fix(core): fixes cumbersome operator selection (#10322) 2025-07-28 14:29:42 +05:30
Miloš Paunović
83143fae83 fix(executions): add default icons to execution dependency view (#10375)
Closes https://github.com/kestra-io/kestra/issues/10327.
2025-07-28 10:58:12 +02:00
Malay Dewangan
25f5ccc6b5 fix(runner): support floating-point CPU values in Docker runner (#10366) 2025-07-28 14:21:23 +05:30
Piyush Bhaskar
cf3e49a284 fix(core): give height to tooltip and use ks tokens (#10372) 2025-07-28 13:49:42 +05:30
JAGADEESH E
9a72d378df chore(core): amend missing property causing console warning in the settings page (#9788)
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-07-28 09:49:14 +02:00
Loïc Mathieu
752a927fac feat(logs): allow LogShipper to filters on flowId and executionId
Closes https://github.com/kestra-io/kestra-ee/issues/4468
2025-07-28 09:40:36 +02:00
yuri
4053392921 fix(core): amend misc label-related issues (#10044)
* fix(core): amend misc label-related issues

* re-enabled bulk update of label value
* re-enabled merging flow-execution labels by key
* made duplicated keys rejection readable
* forced multiple validations within `RequestUtils`
* ensured existing labels can be overriden
* added multiple tests validating complex scenarios

BREAKING CHANGE: switched from first to last label value override
BREAKING CHANGE: preventing empty key/value labels
BREAKING CHANGE: preventing whitespace in key

* fix(core): reflect feedback

* Deduplicated a list inside the `Labels` task.
* Worked around label mutation at `Worker`.
* Attempted to deduplicate labels within `Execution` as possible.

* fix(core): remove irrelevant changes
2025-07-28 09:38:38 +02:00
Barthélémy Ledoux
8b0483643a feat(flows): code editor can launch playground (#10359) 2025-07-28 09:15:39 +02:00
Piyush Bhaskar
5feeb41c7a fix(core): update state count emission and filter table executions. (#10367) 2025-07-28 12:42:20 +05:30
github-actions[bot]
d7f5e5c05d chore(core): localize to languages other than english (#10368)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-28 09:09:32 +02:00
Aditya
4840f723fc chore(core): properly handle environment name set either via config and through the settings page (#10151)
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-07-28 09:06:34 +02:00
Devesh Kumar
8cf159b281 fix(namespaces): prevent namespace folder highlighting when containing file is selected (#10364)
Closes https://github.com/kestra-io/kestra/issues/10360.
2025-07-28 08:47:25 +02:00
Loïc Mathieu
4c79576113 fix(tests): improve JdbcQueueTest flaky tests 2025-07-25 12:50:27 +02:00
Florian Hussonnois
f87f2ed753 fix(system): avoid potential NPE in ServiceLivenessManager (#10338)
Avoid a potential NPE in ServiceLivenessManager when
a local service is unregistered during shutdown before the liveness probe completes

Fix: #10338
2025-07-25 12:33:18 +02:00
Florian Hussonnois
298a6c7ca8 fix(system): ignore state transition failure for indexer
Fix: kestra-io/kestra-ee#4474
2025-07-25 11:35:09 +02:00
Loïc Mathieu
ab464fff6e fix(executions): flow concurrency limit not honors when executions are created at a high rate
This is due to the fact that we now process the execution queue concurrently so there is a race when counting currently running executions. This can be seen easily using a ForEachItem as it could create tens or hundreds of executions almost instantly leading to almost all those executions started as they would all see 0 executions running...

Using a dedicated execution running queue, as done in EE, would serialize the messages and fix the issue.

However, if using multiple executor instances and concurrency limit = 1, there is a theoretical race as no locks will be done if no execution is running. A max surge of executions could be as high as the number of executor but this race is less probable to happen in real world scenario.

Fixes #10167
2025-07-25 11:35:00 +02:00
Florian Hussonnois
6dcba16314 chore(core): clean QueryFilter class 2025-07-25 11:34:09 +02:00
Barthélémy Ledoux
80a328e87e fix(flows): better loading pattern (#10345) 2025-07-25 10:14:07 +02:00
Loïc Mathieu
f2034f4975 fix(executions): race condition inside nested ForEach with concurrency
Fixes #10167
2025-07-25 09:45:29 +02:00
github-actions[bot]
edca56d168 chore(core): localize to languages other than english (#10341)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-24 23:21:03 +02:00
489 changed files with 15817 additions and 9057 deletions

View File

@@ -2,7 +2,7 @@ name: Auto-Translate UI keys and create PR
on:
schedule:
- cron: "0 9-21 * * *" # Every hour from 9 AM to 9 PM
- cron: "0 9-21/3 * * *" # Every 3 hours from 9 AM to 9 PM
workflow_dispatch:
inputs:
retranslate_modified_keys:
@@ -20,7 +20,7 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
name: Checkout
with:
fetch-depth: 0

View File

@@ -27,7 +27,7 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v4
uses: actions/checkout@v5
with:
# We must fetch at least the immediate parents so that if this is
# a pull request then we can checkout the head.

View File

@@ -1,147 +0,0 @@
name: Create Docker images on Release
on:
workflow_dispatch:
inputs:
retag-latest:
description: 'Retag latest Docker images'
required: true
type: string
default: "false"
options:
- "true"
- "false"
release-tag:
description: 'Kestra Release Tag'
required: false
type: string
plugin-version:
description: 'Plugin version'
required: false
type: string
default: "LATEST"
env:
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
jobs:
plugins:
name: List Plugins
runs-on: ubuntu-latest
outputs:
plugins: ${{ steps.plugins.outputs.plugins }}
steps:
# Checkout
- uses: actions/checkout@v4
# Get Plugins List
- name: Get Plugins List
uses: ./.github/actions/plugins-list
id: plugins
with:
plugin-version: ${{ env.PLUGIN_VERSION }}
docker:
name: Publish Docker
needs: [ plugins ]
runs-on: ubuntu-latest
strategy:
matrix:
image:
- name: "-no-plugins"
plugins: ""
packages: jattach
python-libs: ""
- name: ""
plugins: ${{needs.plugins.outputs.plugins}}
packages: python3 python-is-python3 python3-pip curl jattach
python-libs: kestra
steps:
- uses: actions/checkout@v4
# Vars
- name: Set image name
id: vars
run: |
if [[ "${{ inputs.release-tag }}" == "" ]]; then
TAG=${GITHUB_REF#refs/*/}
echo "tag=${TAG}" >> $GITHUB_OUTPUT
else
TAG="${{ inputs.release-tag }}"
echo "tag=${TAG}" >> $GITHUB_OUTPUT
fi
if [[ "${{ env.PLUGIN_VERSION }}" == *"-SNAPSHOT" ]]; then
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ ${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT;
else
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
fi
# Download release
- name: Download release
uses: robinraju/release-downloader@v1.12
with:
tag: ${{steps.vars.outputs.tag}}
fileName: 'kestra-*'
out-file-path: build/executable
- name: Copy exe to image
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
# Docker setup
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Docker - Fix Qemu
shell: bash
run: |
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
# Docker Login
- name: Login to DockerHub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_PASSWORD }}
# Docker Build and push
- name: Push to Docker Hub
uses: docker/build-push-action@v6
with:
context: .
push: true
tags: ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }}
platforms: linux/amd64,linux/arm64
build-args: |
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }}
APT_PACKAGES=${{ matrix.image.packages }}
PYTHON_LIBRARIES=${{ matrix.image.python-libs }}
- name: Install regctl
if: github.event.inputs.retag-latest == 'true'
uses: regclient/actions/regctl-installer@main
- name: Retag to latest
if: github.event.inputs.retag-latest == 'true'
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
end:
runs-on: ubuntu-latest
needs:
- docker
if: always()
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
steps:
# Slack
- name: Slack notification
uses: Gamesight/slack-workflow-status@master
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
name: GitHub Actions
icon_emoji: ':github-actions:'
channel: 'C02DQ1A7JLR' # _int_git channel

View File

@@ -19,7 +19,7 @@ on:
default: "no input"
jobs:
check:
timeout-minutes: 10
timeout-minutes: 15
runs-on: ubuntu-latest
env:
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
@@ -32,7 +32,7 @@ jobs:
password: ${{ github.token }}
- name: Checkout kestra
uses: actions/checkout@v4
uses: actions/checkout@v5
with:
path: kestra

View File

@@ -21,12 +21,12 @@ jobs:
runs-on: ubuntu-latest
steps:
# Checkout
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions

View File

@@ -33,13 +33,13 @@ jobs:
exit 1;
fi
# Checkout
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
fetch-depth: 0
path: kestra
# Checkout GitHub Actions
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions

View File

@@ -4,9 +4,8 @@ on:
workflow_dispatch:
inputs:
plugin-version:
description: "Kestra version"
default: 'LATEST'
required: true
description: "plugins version"
required: false
type: string
push:
branches:
@@ -34,7 +33,7 @@ jobs:
if: "!startsWith(github.ref, 'refs/heads/releases')"
uses: ./.github/workflows/workflow-release.yml
with:
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
plugin-version: ${{ inputs.plugin-version != '' && inputs.plugin-version || (github.ref == 'refs/heads/develop' && 'LATEST-SNAPSHOT' || 'LATEST') }}
secrets:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
@@ -43,7 +42,8 @@ jobs:
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
end:
runs-on: ubuntu-latest
needs:

View File

@@ -17,7 +17,7 @@ jobs:
runs-on: ubuntu-latest
steps:
# Checkout
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
fetch-depth: 0

View File

@@ -34,7 +34,7 @@ jobs:
fi
# Checkout
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
fetch-depth: 0

View File

@@ -17,12 +17,12 @@ jobs:
runs-on: ubuntu-latest
steps:
# Checkout
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions
@@ -66,12 +66,12 @@ jobs:
actions: read
steps:
# Checkout
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions
@@ -111,12 +111,12 @@ jobs:
actions: read
steps:
# Checkout
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions

View File

@@ -29,7 +29,7 @@ jobs:
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
name: Checkout - Current ref
with:
fetch-depth: 0

View File

@@ -1,23 +1,7 @@
name: Build Artifacts
on:
workflow_call:
inputs:
plugin-version:
description: "Kestra version"
default: 'LATEST'
required: true
type: string
outputs:
docker-tag:
value: ${{ jobs.build.outputs.docker-tag }}
description: "The Docker image Tag for Kestra"
docker-artifact-name:
value: ${{ jobs.build.outputs.docker-artifact-name }}
description: "The GitHub artifact containing the Kestra docker image name."
plugins:
value: ${{ jobs.build.outputs.plugins }}
description: "The Kestra plugins list used for the build."
workflow_call: {}
jobs:
build:
@@ -31,7 +15,7 @@ jobs:
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
steps:
- name: Checkout - Current ref
uses: actions/checkout@v4
uses: actions/checkout@v5
with:
fetch-depth: 0
@@ -82,55 +66,6 @@ jobs:
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
# Docker Tag
- name: Setup - Docker vars
id: vars
shell: bash
run: |
TAG=${GITHUB_REF#refs/*/}
if [[ $TAG = "master" ]]
then
TAG="latest";
elif [[ $TAG = "develop" ]]
then
TAG="develop";
elif [[ $TAG = v* ]]
then
TAG="${TAG}";
else
TAG="build-${{ github.run_id }}";
fi
echo "tag=${TAG}" >> $GITHUB_OUTPUT
echo "artifact=docker-kestra-${TAG}" >> $GITHUB_OUTPUT
# Docker setup
- name: Docker - Setup QEMU
uses: docker/setup-qemu-action@v3
- name: Docker - Fix Qemu
shell: bash
run: |
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
- name: Docker - Setup Buildx
uses: docker/setup-buildx-action@v3
# Docker Build
- name: Docker - Build & export image
uses: docker/build-push-action@v6
if: "!startsWith(github.ref, 'refs/tags/v')"
with:
context: .
push: false
file: Dockerfile
tags: |
kestra/kestra:${{ steps.vars.outputs.tag }}
build-args: |
KESTRA_PLUGINS=${{ steps.plugins.outputs.plugins }}
APT_PACKAGES=${{ env.DOCKER_APT_PACKAGES }}
PYTHON_LIBRARIES=${{ env.DOCKER_PYTHON_LIBRARIES }}
outputs: type=docker,dest=/tmp/${{ steps.vars.outputs.artifact }}.tar
# Upload artifacts
- name: Artifacts - Upload JAR
uses: actions/upload-artifact@v4
@@ -143,10 +78,3 @@ jobs:
with:
name: exe
path: build/executable/
- name: Artifacts - Upload Docker
uses: actions/upload-artifact@v4
if: "!startsWith(github.ref, 'refs/tags/v')"
with:
name: ${{ steps.vars.outputs.artifact }}
path: /tmp/${{ steps.vars.outputs.artifact }}.tar

View File

@@ -20,7 +20,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v5
- name: Cache Node Modules
id: cache-node-modules

View File

@@ -1,14 +1,17 @@
name: Github - Release
on:
workflow_dispatch:
workflow_call:
secrets:
GH_PERSONAL_TOKEN:
description: "The Github personal token."
required: true
push:
tags:
- '*'
SLACK_RELEASES_WEBHOOK_URL:
description: "The Slack webhook URL."
required: true
jobs:
publish:
@@ -17,14 +20,14 @@ jobs:
steps:
# Check out
- name: Checkout - Repository
uses: actions/checkout@v4
uses: actions/checkout@v5
with:
fetch-depth: 0
submodules: true
# Checkout GitHub Actions
- name: Checkout - Actions
uses: actions/checkout@v4
uses: actions/checkout@v5
with:
repository: kestra-io/actions
sparse-checkout-cone-mode: true
@@ -35,7 +38,7 @@ jobs:
# Download Exec
# Must be done after checkout actions
- name: Artifacts - Download executable
uses: actions/download-artifact@v4
uses: actions/download-artifact@v5
if: startsWith(github.ref, 'refs/tags/v')
with:
name: exe

View File

@@ -1,22 +1,37 @@
name: Publish - Docker
name: Create Docker images on Release
on:
workflow_dispatch:
inputs:
plugin-version:
description: "Kestra version"
default: 'LATEST'
retag-latest:
description: 'Retag latest Docker images'
required: true
type: choice
default: "false"
options:
- "true"
- "false"
release-tag:
description: 'Kestra Release Tag (by default, deduced with the ref)'
required: false
type: string
plugin-version:
description: 'Plugin version'
required: false
type: string
default: "LATEST"
force-download-artifact:
description: 'Force download artifact'
required: false
type: string
type: choice
default: "true"
options:
- "true"
- "false"
workflow_call:
inputs:
plugin-version:
description: "Kestra version"
description: "Plugin version"
default: 'LATEST'
required: false
type: string
@@ -33,47 +48,93 @@ on:
description: "The Dockerhub password."
required: true
env:
PLUGIN_VERSION: ${{ inputs.plugin-version != null && inputs.plugin-version || 'LATEST' }}
jobs:
plugins:
name: List Plugins
runs-on: ubuntu-latest
outputs:
plugins: ${{ steps.plugins.outputs.plugins }}
steps:
# Checkout
- uses: actions/checkout@v5
# Get Plugins List
- name: Get Plugins List
uses: ./.github/actions/plugins-list
id: plugins
with: # remap LATEST-SNAPSHOT to LATEST
plugin-version: ${{ env.PLUGIN_VERSION == 'LATEST-SNAPSHOT' && 'LATEST' || env.PLUGIN_VERSION }}
# ********************************************************************************************************************
# Build
# ********************************************************************************************************************
build-artifacts:
name: Build Artifacts
if: ${{ github.event.inputs.force-download-artifact == 'true' }}
if: ${{ inputs.force-download-artifact == 'true' }}
uses: ./.github/workflows/workflow-build-artifacts.yml
with:
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
# ********************************************************************************************************************
# Docker
# ********************************************************************************************************************
publish:
name: Publish - Docker
docker:
name: Publish Docker
needs: [ plugins, build-artifacts ]
if: always()
runs-on: ubuntu-latest
needs: build-artifacts
if: |
always() &&
(needs.build-artifacts.result == 'success' ||
github.event.inputs.force-download-artifact != 'true')
env:
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
strategy:
matrix:
image:
- tag: -no-plugins
- name: "-no-plugins"
plugins: ""
packages: jattach
plugins: false
python-libraries: ""
- tag: ""
plugins: true
packages: python3 python3-venv python-is-python3 python3-pip nodejs npm curl zip unzip jattach
python-libraries: kestra
python-libs: ""
- name: ""
plugins: ${{needs.plugins.outputs.plugins}}
packages: python3 python-is-python3 python3-pip curl jattach
python-libs: kestra
steps:
- name: Checkout - Current ref
uses: actions/checkout@v4
- uses: actions/checkout@v5
# Vars
- name: Set image name
id: vars
run: |
if [[ "${{ inputs.release-tag }}" == "" ]]; then
TAG=${GITHUB_REF#refs/*/}
echo "tag=${TAG}" >> $GITHUB_OUTPUT
else
TAG="${{ inputs.release-tag }}"
echo "tag=${TAG}" >> $GITHUB_OUTPUT
fi
if [[ $GITHUB_REF == refs/tags/* ]]; then
if [[ $TAG =~ ^v[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
# this will remove the patch version number
MINOR_SEMVER=${TAG%.*}
echo "minor_semver=${MINOR_SEMVER}" >> $GITHUB_OUTPUT
else
echo "Tag '$TAG' is not a valid semver (vMAJOR.MINOR.PATCH), skipping minor_semver"
fi
fi
if [[ "${{ env.PLUGIN_VERSION }}" == *"-SNAPSHOT" ]]; then
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ ${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT;
else
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
fi
# Download executable from artifact
- name: Artifacts - Download executable
uses: actions/download-artifact@v5
with:
name: exe
path: build/executable
- name: Copy exe to image
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
# Docker setup
- name: Docker - Setup QEMU
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Docker - Fix Qemu
@@ -81,66 +142,59 @@ jobs:
run: |
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
- name: Docker - Setup Docker Buildx
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
# Docker Login
- name: Docker - Login to DockerHub
- name: Login to DockerHub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_PASSWORD }}
# # Get Plugins List
- name: Plugins - Get List
uses: ./.github/actions/plugins-list
id: plugins-list
if: ${{ matrix.image.plugins}}
with:
plugin-version: ${{ env.PLUGIN_VERSION }}
# Vars
- name: Docker - Set variables
shell: bash
id: vars
run: |
TAG=${GITHUB_REF#refs/*/}
PLUGINS="${{ matrix.image.plugins == true && steps.plugins-list.outputs.plugins || '' }}"
if [[ $TAG == v* ]]; then
TAG="${TAG}";
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
elif [[ $TAG = "develop" ]]; then
TAG="develop";
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ $PLUGINS" >> $GITHUB_OUTPUT
else
TAG="build-${{ github.run_id }}";
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ $PLUGINS" >> $GITHUB_OUTPUT
fi
echo "tag=${TAG}${{ matrix.image.tag }}" >> $GITHUB_OUTPUT
# Build Docker Image
- name: Artifacts - Download executable
uses: actions/download-artifact@v4
with:
name: exe
path: build/executable
- name: Docker - Copy exe to image
shell: bash
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
# Docker Build and push
- name: Docker - Build image
- name: Push to Docker Hub
uses: docker/build-push-action@v6
with:
context: .
push: true
tags: kestra/kestra:${{ steps.vars.outputs.tag }}
tags: ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }}
platforms: linux/amd64,linux/arm64
build-args: |
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }}
APT_PACKAGES=${{ matrix.image.packages }}
PYTHON_LIBRARIES=${{ matrix.image.python-libraries }}
PYTHON_LIBRARIES=${{ matrix.image.python-libs }}
- name: Install regctl
if: startsWith(github.ref, 'refs/tags/v')
uses: regclient/actions/regctl-installer@main
- name: Retag to minor semver version
if: startsWith(github.ref, 'refs/tags/v') && steps.vars.outputs.minor_semver != ''
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.minor_semver, matrix.image.name) }}
- name: Retag to latest
if: startsWith(github.ref, 'refs/tags/v') && inputs.retag-latest == 'true'
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
end:
runs-on: ubuntu-latest
needs:
- docker
if: always()
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
steps:
# Slack
- name: Slack notification
uses: Gamesight/slack-workflow-status@master
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
name: GitHub Actions
icon_emoji: ':github-actions:'
channel: 'C02DQ1A7JLR' # _int_git channel

View File

@@ -25,7 +25,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout - Current ref
uses: actions/checkout@v4
uses: actions/checkout@v5
# Setup build
- name: Setup - Build

View File

@@ -0,0 +1,16 @@
name: Pull Request - Delete Docker
on:
pull_request:
types: [closed]
jobs:
publish:
name: Pull Request - Delete Docker
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
runs-on: ubuntu-latest
steps:
- uses: dataaxiom/ghcr-cleanup-action@v1
with:
package: kestra-pr
delete-tags: ${{ github.event.pull_request.number }}

View File

@@ -0,0 +1,78 @@
name: Pull Request - Publish Docker
on:
pull_request:
branches:
- develop
jobs:
build-artifacts:
name: Build Artifacts
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
uses: ./.github/workflows/workflow-build-artifacts.yml
publish:
name: Publish Docker
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
runs-on: ubuntu-latest
needs: build-artifacts
env:
GITHUB_IMAGE_PATH: "ghcr.io/kestra-io/kestra-pr"
steps:
- name: Checkout - Current ref
uses: actions/checkout@v5
with:
fetch-depth: 0
# Docker setup
- name: Docker - Setup QEMU
uses: docker/setup-qemu-action@v3
- name: Docker - Setup Docker Buildx
uses: docker/setup-buildx-action@v3
# Docker Login
- name: Login to GHCR
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
# Build Docker Image
- name: Artifacts - Download executable
uses: actions/download-artifact@v5
with:
name: exe
path: build/executable
- name: Docker - Copy exe to image
shell: bash
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
- name: Docker - Build image
uses: docker/build-push-action@v6
with:
context: .
file: ./Dockerfile.pr
push: true
tags: ${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }}
platforms: linux/amd64,linux/arm64
# Add comment on pull request
- name: Add comment to PR
uses: actions/github-script@v7
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |
await github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: `**🐋 Docker image**: \`${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }}\`\n` +
`\n` +
`\`\`\`bash\n` +
`docker run --pull=always --rm -it -p 8080:8080 --user=root -v /var/run/docker.sock:/var/run/docker.sock -v /tmp:/tmp ${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }} server local\n` +
`\`\`\``
})

View File

@@ -4,7 +4,7 @@ on:
workflow_dispatch:
inputs:
plugin-version:
description: "Kestra version"
description: "plugins version"
default: 'LATEST'
required: false
type: string
@@ -16,7 +16,7 @@ on:
workflow_call:
inputs:
plugin-version:
description: "Kestra version"
description: "plugins version"
default: 'LATEST'
required: false
type: string
@@ -42,21 +42,25 @@ on:
SONATYPE_GPG_FILE:
description: "The Sonatype GPG file."
required: true
GH_PERSONAL_TOKEN:
description: "GH personnal Token."
required: true
SLACK_RELEASES_WEBHOOK_URL:
description: "Slack webhook for releases channel."
required: true
jobs:
build-artifacts:
name: Build - Artifacts
uses: ./.github/workflows/workflow-build-artifacts.yml
with:
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
Docker:
name: Publish Docker
needs: build-artifacts
uses: ./.github/workflows/workflow-publish-docker.yml
if: startsWith(github.ref, 'refs/heads/develop') || github.event.inputs.publish-docker == 'true'
if: github.ref == 'refs/heads/develop' || inputs.publish-docker == 'true'
with:
force-download-artifact: 'false'
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
plugin-version: ${{ inputs.plugin-version != null && inputs.plugin-version || 'LATEST' }}
secrets:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
@@ -77,4 +81,5 @@ jobs:
if: startsWith(github.ref, 'refs/tags/v')
uses: ./.github/workflows/workflow-github-release.yml
secrets:
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}

View File

@@ -27,7 +27,7 @@ jobs:
ui: ${{ steps.changes.outputs.ui }}
backend: ${{ steps.changes.outputs.backend }}
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
if: "!startsWith(github.ref, 'refs/tags/v')"
- uses: dorny/paths-filter@v3
if: "!startsWith(github.ref, 'refs/tags/v')"

View File

@@ -19,6 +19,7 @@
#plugin-databricks:io.kestra.plugin:plugin-databricks:LATEST
#plugin-datahub:io.kestra.plugin:plugin-datahub:LATEST
#plugin-dataform:io.kestra.plugin:plugin-dataform:LATEST
#plugin-datagen:io.kestra.plugin:plugin-datagen:LATEST
#plugin-dbt:io.kestra.plugin:plugin-dbt:LATEST
#plugin-debezium:io.kestra.plugin:plugin-debezium-db2:LATEST
#plugin-debezium:io.kestra.plugin:plugin-debezium-mongodb:LATEST
@@ -26,6 +27,7 @@
#plugin-debezium:io.kestra.plugin:plugin-debezium-oracle:LATEST
#plugin-debezium:io.kestra.plugin:plugin-debezium-postgres:LATEST
#plugin-debezium:io.kestra.plugin:plugin-debezium-sqlserver:LATEST
#plugin-deepseek:io.kestra.plugin:plugin-deepseek:LATEST
#plugin-docker:io.kestra.plugin:plugin-docker:LATEST
#plugin-elasticsearch:io.kestra.plugin:plugin-elasticsearch:LATEST
#plugin-fivetran:io.kestra.plugin:plugin-fivetran:LATEST
@@ -86,13 +88,18 @@
#plugin-powerbi:io.kestra.plugin:plugin-powerbi:LATEST
#plugin-pulsar:io.kestra.plugin:plugin-pulsar:LATEST
#plugin-redis:io.kestra.plugin:plugin-redis:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-bun:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-deno:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-go:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-groovy:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-jbang:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-julia:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-jython:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-lua:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-nashorn:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-node:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-perl:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-php:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-powershell:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-python:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-r:LATEST

305
AGENTS.md Normal file
View File

@@ -0,0 +1,305 @@
# Kestra AGENTS.md
This file provides guidance for AI coding agents working on the Kestra project. Kestra is an open-source data orchestration and scheduling platform built with Java (Micronaut) and Vue.js.
## Repository Layout
- **`core/`**: Core Kestra framework and task definitions
- **`cli/`**: Command-line interface and server implementation
- **`webserver/`**: REST API server implementation
- **`ui/`**: Vue.js frontend application
- **`jdbc-*`**: Database connector modules (H2, MySQL, PostgreSQL)
- **`script/`**: Script execution engine
- **`storage-local/`**: Local file storage implementation
- **`repository-memory/`**: In-memory repository implementation
- **`runner-memory/`**: In-memory execution runner
- **`processor/`**: Task processing engine
- **`model/`**: Data models and Data Transfer Objects
- **`platform/`**: Platform-specific implementations
- **`tests/`**: Integration test framework
- **`e2e-tests/`**: End-to-end testing suite
## Development Environment
### Prerequisites
- Java 21+
- Node.js 22+ and npm
- Python 3, pip, and python venv
- Docker & Docker Compose
- Gradle (wrapper included)
### Quick Setup with Devcontainer
The easiest way to get started is using the provided devcontainer:
1. Install VSCode Remote Development extension
2. Run `Dev Containers: Open Folder in Container...` from command palette
3. Select the Kestra root folder
4. Wait for Gradle build to complete
### Manual Setup
1. Clone the repository
2. Run `./gradlew build` to build the backend
3. Navigate to `ui/` and run `npm install`
4. Create configuration files as described below
## Configuration Files
### Backend Configuration
Create `cli/src/main/resources/application-override.yml`:
**Local Mode (H2 database):**
```yaml
micronaut:
server:
cors:
enabled: true
configurations:
all:
allowedOrigins:
- http://localhost:5173
```
**Standalone Mode (PostgreSQL):**
```yaml
kestra:
repository:
type: postgres
storage:
type: local
local:
base-path: "/app/storage"
queue:
type: postgres
tasks:
tmp-dir:
path: /tmp/kestra-wd/tmp
anonymous-usage-report:
enabled: false
datasources:
postgres:
url: jdbc:postgresql://host.docker.internal:5432/kestra
driverClassName: org.postgresql.Driver
username: kestra
password: k3str4
flyway:
datasources:
postgres:
enabled: true
locations:
- classpath:migrations/postgres
ignore-migration-patterns: "*:missing,*:future"
out-of-order: true
micronaut:
server:
cors:
enabled: true
configurations:
all:
allowedOrigins:
- http://localhost:5173
```
### Frontend Configuration
Create `ui/.env.development.local` for environment variables.
## Running the Application
### Backend
- **Local mode**: `./gradlew runLocal` (uses H2 database)
- **Standalone mode**: Use VSCode Run and Debug with main class `io.kestra.cli.App` and args `server standalone`
### Frontend
- Navigate to `ui/` directory
- Run `npm run dev` for development server (port 5173)
- Run `npm run build` for production build
## Building and Testing
### Backend
```bash
# Build the project
./gradlew build
# Run tests
./gradlew test
# Run specific module tests
./gradlew :core:test
# Clean build
./gradlew clean build
```
### Frontend
```bash
cd ui
npm install
npm run test
npm run lint
npm run build
```
### End-to-End Tests
```bash
# Build and start E2E tests
./build-and-start-e2e-tests.sh
# Or use the Makefile
make install
make install-plugins
make start-standalone-postgres
```
## Development Guidelines
### Java Backend
- Use Java 21 features
- Follow Micronaut framework patterns
- Add Swagger annotations for API documentation
- Use annotation processors (enable in IDE)
- Set `MICRONAUT_ENVIRONMENTS=local,override` for custom config
- Set `KESTRA_PLUGINS_PATH` for custom plugin loading
### Vue.js Frontend
- Vue 3 with Composition API
- TypeScript for type safety
- Vite for build tooling
- ESLint and Prettier for code quality
- Component-based architecture in `src/components/`
### Code Style
- Follow `.editorconfig` settings
- Use 4 spaces for Java, 2 spaces for YAML/JSON/CSS
- Enable format on save in VSCode
- Use Prettier for frontend code formatting
## Testing Strategy
### Backend Testing
- Unit tests in `src/test/java/`
- Integration tests in `tests/` module
- Use Micronaut test framework
- Test both local and standalone modes
### Frontend Testing
- Unit tests with Jest
- E2E tests with Playwright
- Component testing with Storybook
- Run `npm run test:unit` and `npm run test:e2e`
## Plugin Development
### Creating Plugins
- Follow the [Plugin Developer Guide](https://kestra.io/docs/plugin-developer-guide/)
- Place JAR files in `KESTRA_PLUGINS_PATH`
- Use the plugin template structure
- Test with both local and standalone modes
### Plugin Loading
- Set `KESTRA_PLUGINS_PATH` environment variable
- Use devcontainer mounts for local development
- Plugins are loaded at startup
## Common Issues and Solutions
### JavaScript Heap Out of Memory
Set `NODE_OPTIONS=--max-old-space-size=4096` environment variable.
### CORS Issues
Ensure backend CORS is configured for `http://localhost:5173` when using frontend dev server.
### Database Connection Issues
- Use `host.docker.internal` instead of `localhost` when connecting from devcontainer
- Verify PostgreSQL is running and accessible
- Check database credentials and permissions
### Gradle Build Issues
- Clear Gradle cache: `./gradlew clean`
- Check Java version compatibility
- Verify all dependencies are available
## Pull Request Guidelines
### Before Submitting
1. Run all tests: `./gradlew test` and `npm test`
2. Check code formatting: `./gradlew spotlessCheck`
3. Verify CORS configuration if changing API
4. Test both local and standalone modes
5. Update documentation for user-facing changes
### Commit Messages
- Follow conventional commit format
- Use present tense ("Add feature" not "Added feature")
- Reference issue numbers when applicable
- Keep commits focused and atomic
### Review Checklist
- [ ] All tests pass
- [ ] Code follows project style guidelines
- [ ] Documentation is updated
- [ ] No breaking changes without migration guide
- [ ] CORS properly configured if API changes
- [ ] Both local and standalone modes tested
## Useful Commands
```bash
# Quick development commands
./gradlew runLocal # Start local backend
./gradlew :ui:build # Build frontend
./gradlew clean build # Clean rebuild
npm run dev # Start frontend dev server
make install # Install Kestra locally
make start-standalone-postgres # Start with PostgreSQL
# Testing commands
./gradlew test # Run all backend tests
./gradlew :core:test # Run specific module tests
npm run test # Run frontend tests
npm run lint # Lint frontend code
```
## Getting Help
- Open a [GitHub issue](https://github.com/kestra-io/kestra/issues)
- Join the [Kestra Slack community](https://kestra.io/slack)
- Check the [main documentation](https://kestra.io/docs)
## Environment Variables
| Variable | Description | Default |
|----------|-------------|---------|
| `MICRONAUT_ENVIRONMENTS` | Custom config environments | `local,override` |
| `KESTRA_PLUGINS_PATH` | Path to custom plugins | `/workspaces/kestra/local/plugins` |
| `NODE_OPTIONS` | Node.js options | `--max-old-space-size=4096` |
| `JAVA_HOME` | Java installation path | `/usr/java/jdk-21` |
Remember: Always test your changes in both local and standalone modes, and ensure CORS is properly configured for frontend development.

7
Dockerfile.pr Normal file
View File

@@ -0,0 +1,7 @@
FROM kestra/kestra:develop
USER root
COPY --chown=kestra:kestra docker /
USER kestra

View File

@@ -65,10 +65,6 @@ Kestra is an open-source, event-driven orchestration platform that makes both **
## 🚀 Quick Start
### Try the Live Demo
Try Kestra with our [**Live Demo**](https://demo.kestra.io/ui/login?auto). No installation required!
### Get Started Locally in 5 Minutes
#### Launch Kestra in Docker

View File

@@ -7,7 +7,7 @@ set -e
# run tests on this image
LOCAL_IMAGE_VERSION="local-e2e"
LOCAL_IMAGE_VERSION="local-e2e-$(date +%s)"
echo "Running E2E"
echo "Start time: $(date '+%Y-%m-%d %H:%M:%S')"
@@ -15,6 +15,7 @@ start_time=$(date +%s)
echo ""
echo "Building the image for this current repository"
make clean
make build-docker VERSION=$LOCAL_IMAGE_VERSION
end_time=$(date +%s)
@@ -32,7 +33,7 @@ echo "npm i"
npm i
echo 'sh ./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"'
sh ./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"
./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"
end_time2=$(date +%s)
elapsed2=$(( end_time2 - start_time2 ))

View File

@@ -16,7 +16,7 @@ plugins {
id "java"
id 'java-library'
id "idea"
id "com.gradleup.shadow" version "8.3.8"
id "com.gradleup.shadow" version "8.3.9"
id "application"
// test
@@ -225,14 +225,14 @@ subprojects {
}
testlogger {
theme 'mocha-parallel'
showExceptions true
showFullStackTraces true
showCauses true
slowThreshold 2000
showStandardStreams true
showPassedStandardStreams false
showSkippedStandardStreams true
theme = 'mocha-parallel'
showExceptions = true
showFullStackTraces = true
showCauses = true
slowThreshold = 2000
showStandardStreams = true
showPassedStandardStreams = false
showSkippedStandardStreams = true
}
}
}
@@ -410,7 +410,7 @@ jar {
shadowJar {
archiveClassifier.set(null)
mergeServiceFiles()
zip64 true
zip64 = true
}
distZip.dependsOn shadowJar
@@ -427,8 +427,8 @@ def executableDir = layout.buildDirectory.dir("executable")
def executable = layout.buildDirectory.file("executable/${project.name}-${project.version}").get().asFile
tasks.register('writeExecutableJar') {
group "build"
description "Write an executable jar from shadow jar"
group = "build"
description = "Write an executable jar from shadow jar"
dependsOn = [shadowJar]
final shadowJarFile = tasks.shadowJar.outputs.files.singleFile
@@ -454,8 +454,8 @@ tasks.register('writeExecutableJar') {
}
tasks.register('executableJar', Zip) {
group "build"
description "Zip the executable jar"
group = "build"
description = "Zip the executable jar"
dependsOn = [writeExecutableJar]
archiveFileName = "${project.name}-${project.version}.zip"
@@ -620,6 +620,28 @@ subprojects {subProject ->
}
}
}
if (subProject.name != 'platform' && subProject.name != 'cli') {
// only if a test source set actually exists (avoids empty artifacts)
def hasTests = subProject.extensions.findByName('sourceSets')?.findByName('test') != null
if (hasTests) {
// wire the artifact onto every Maven publication of this subproject
publishing {
publications {
withType(MavenPublication).configureEach { pub ->
// keep the normal java component + sources/javadoc already configured
pub.artifact(subProject.tasks.named('testsJar').get())
}
}
}
// make sure publish tasks build the tests jar first
tasks.matching { it.name.startsWith('publish') }.configureEach {
dependsOn subProject.tasks.named('testsJar')
}
}
}
}
}

View File

@@ -16,6 +16,6 @@ abstract public class AbstractServerCommand extends AbstractCommand implements S
}
protected static int defaultWorkerThread() {
return Runtime.getRuntime().availableProcessors() * 4;
return Runtime.getRuntime().availableProcessors() * 8;
}
}

View File

@@ -48,7 +48,7 @@ public class StandAloneCommand extends AbstractServerCommand {
@CommandLine.Option(names = "--tenant", description = "Tenant identifier, Required to load flows from path with the enterprise edition")
private String tenantId;
@CommandLine.Option(names = {"--worker-thread"}, description = "the number of worker threads, defaults to four times the number of available processors. Set it to 0 to avoid starting a worker.")
@CommandLine.Option(names = {"--worker-thread"}, description = "the number of worker threads, defaults to eight times the number of available processors. Set it to 0 to avoid starting a worker.")
private int workerThread = defaultWorkerThread();
@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "a list of execution identifiers to skip, separated by a coma; for troubleshooting purpose only")

View File

@@ -22,7 +22,7 @@ public class WorkerCommand extends AbstractServerCommand {
@Inject
private ApplicationContext applicationContext;
@Option(names = {"-t", "--thread"}, description = "The max number of worker threads, defaults to four times the number of available processors")
@Option(names = {"-t", "--thread"}, description = "The max number of worker threads, defaults to eight times the number of available processors")
private int thread = defaultWorkerThread();
@Option(names = {"-g", "--worker-group"}, description = "The worker group key, must match the regex [a-zA-Z0-9_-]+ (EE only)")

View File

@@ -10,24 +10,21 @@ import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.services.PluginDefaultService;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
import io.micronaut.scheduling.io.watch.FileWatchConfiguration;
import jakarta.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import jakarta.validation.ConstraintViolationException;
import java.util.concurrent.CopyOnWriteArrayList;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
@Singleton
@Slf4j
@Requires(property = "micronaut.io.watch.enabled", value = "true")
@@ -49,13 +46,9 @@ public class FileChangedEventListener {
@Inject
protected FlowListenersInterface flowListeners;
@Nullable
@Value("${micronaut.io.watch.tenantId}")
private String tenantId;
FlowFilesManager flowFilesManager;
private List<FlowWithPath> flows = new ArrayList<>();
private List<FlowWithPath> flows = new CopyOnWriteArrayList<>();
private boolean isStarted = false;
@@ -113,8 +106,6 @@ public class FileChangedEventListener {
}
public void startListening(List<Path> paths) throws IOException, InterruptedException {
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
for (Path path : paths) {
path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY);
}
@@ -157,12 +148,20 @@ public class FileChangedEventListener {
flows.add(FlowWithPath.of(flow.get(), filePath.toString()));
}
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(tenantId, content));
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(getTenantIdFromPath(filePath), content));
log.info("Flow {} from file {} has been created or modified", flow.get().getId(), entry);
}
} catch (NoSuchFileException e) {
log.error("File not found: {}", entry, e);
log.warn("File not found: {}, deleting it", entry, e);
// the file might have been deleted while reading so if not found we try to delete the flow
flows.stream()
.filter(flow -> flow.getPath().equals(filePath.toString()))
.findFirst()
.ifPresent(flowWithPath -> {
flowFilesManager.deleteFlow(flowWithPath.getTenantId(), flowWithPath.getNamespace(), flowWithPath.getId());
this.flows.removeIf(fwp -> fwp.uidWithoutRevision().equals(flowWithPath.uidWithoutRevision()));
});
} catch (IOException e) {
log.error("Error reading file: {}", entry, e);
}
@@ -193,8 +192,6 @@ public class FileChangedEventListener {
}
private void loadFlowsFromFolder(Path folder) {
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
try {
Files.walkFileTree(folder, new SimpleFileVisitor<Path>() {
@Override
@@ -214,7 +211,7 @@ public class FileChangedEventListener {
if (flow.isPresent() && flows.stream().noneMatch(flowWithPath -> flowWithPath.uidWithoutRevision().equals(flow.get().uidWithoutRevision()))) {
flows.add(FlowWithPath.of(flow.get(), file.toString()));
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(tenantId, content));
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(getTenantIdFromPath(file), content));
}
}
return FileVisitResult.CONTINUE;
@@ -238,10 +235,8 @@ public class FileChangedEventListener {
}
private Optional<FlowWithSource> parseFlow(String content, Path entry) {
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
try {
FlowWithSource flow = pluginDefaultService.parseFlowWithAllDefaults(tenantId, content, false);
FlowWithSource flow = pluginDefaultService.parseFlowWithAllDefaults(getTenantIdFromPath(entry), content, false);
modelValidator.validate(flow);
return Optional.of(flow);
} catch (ConstraintViolationException | FlowProcessingException e) {
@@ -265,4 +260,8 @@ public class FileChangedEventListener {
private Path buildPath(FlowInterface flow) {
return fileWatchConfiguration.getPaths().getFirst().resolve(flow.uidWithoutRevision() + ".yml");
}
private String getTenantIdFromPath(Path path) {
return path.getFileName().toString().split("_")[0];
}
}

View File

@@ -212,7 +212,7 @@ kestra:
retention: 30d
anonymous-usage-report:
enabled: true
uri: https://api.kestra.io/v1/reports/usages
uri: https://api.kestra.io/v1/server-events/
initial-delay: 5m
fixed-delay: 1h

View File

@@ -1,6 +1,7 @@
package io.kestra.cli.services;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.utils.Await;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
@@ -71,7 +72,9 @@ class FileChangedEventListenerTest {
type: io.kestra.plugin.core.log.Log
message: Hello World! 🚀
""";
Files.write(Path.of(FILE_WATCH + "/myflow.yaml"), flow.getBytes());
GenericFlow genericFlow = GenericFlow.fromYaml(MAIN_TENANT, flow);
Files.write(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"), flow.getBytes());
Await.until(
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").isPresent(),
Duration.ofMillis(100),
@@ -83,7 +86,7 @@ class FileChangedEventListenerTest {
assertThat(myflow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
// delete the flow
Files.delete(Path.of(FILE_WATCH + "/myflow.yaml"));
Files.delete(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"));
Await.until(
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").isEmpty(),
Duration.ofMillis(100),
@@ -110,7 +113,8 @@ class FileChangedEventListenerTest {
values:
message: Hello World!
""";
Files.write(Path.of(FILE_WATCH + "/plugin-default.yaml"), pluginDefault.getBytes());
GenericFlow genericFlow = GenericFlow.fromYaml(MAIN_TENANT, pluginDefault);
Files.write(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"), pluginDefault.getBytes());
Await.until(
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").isPresent(),
Duration.ofMillis(100),
@@ -122,7 +126,7 @@ class FileChangedEventListenerTest {
assertThat(pluginDefaultFlow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
// delete both files
Files.delete(Path.of(FILE_WATCH + "/plugin-default.yaml"));
Files.delete(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"));
Await.until(
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").isEmpty(),
Duration.ofMillis(100),

View File

@@ -53,6 +53,8 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static io.kestra.core.docs.AbstractClassDocumentation.flattenWithoutType;
import static io.kestra.core.docs.AbstractClassDocumentation.required;
import static io.kestra.core.serializers.JacksonMapper.MAP_TYPE_REFERENCE;
@Singleton
@@ -92,12 +94,16 @@ public class JsonSchemaGenerator {
}
public <T> Map<String, Object> schemas(Class<? extends T> cls, boolean arrayOf, List<String> allowedPluginTypes) {
return this.schemas(cls, arrayOf, allowedPluginTypes, false);
}
public <T> Map<String, Object> schemas(Class<? extends T> cls, boolean arrayOf, List<String> allowedPluginTypes, boolean withOutputs) {
SchemaGeneratorConfigBuilder builder = new SchemaGeneratorConfigBuilder(
SchemaVersion.DRAFT_7,
OptionPreset.PLAIN_JSON
);
this.build(builder, true, allowedPluginTypes);
this.build(builder, true, allowedPluginTypes, withOutputs);
SchemaGeneratorConfig schemaGeneratorConfig = builder.build();
@@ -122,12 +128,13 @@ public class JsonSchemaGenerator {
if (jsonNode instanceof ObjectNode clazzSchema && clazzSchema.get("required") instanceof ArrayNode requiredPropsNode && clazzSchema.get("properties") instanceof ObjectNode properties) {
List<String> requiredFieldValues = StreamSupport.stream(requiredPropsNode.spliterator(), false)
.map(JsonNode::asText)
.toList();
.collect(Collectors.toList());
properties.fields().forEachRemaining(e -> {
int indexInRequiredArray = requiredFieldValues.indexOf(e.getKey());
if (indexInRequiredArray != -1 && e.getValue() instanceof ObjectNode valueNode && valueNode.has("default")) {
requiredPropsNode.remove(indexInRequiredArray);
requiredFieldValues.remove(indexInRequiredArray);
}
});
@@ -248,6 +255,10 @@ public class JsonSchemaGenerator {
}
protected void build(SchemaGeneratorConfigBuilder builder, boolean draft7, List<String> allowedPluginTypes) {
this.build(builder, draft7, allowedPluginTypes, false);
}
protected void build(SchemaGeneratorConfigBuilder builder, boolean draft7, List<String> allowedPluginTypes, boolean withOutputs) {
// builder.withObjectMapper(builder.getObjectMapper().configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false));
builder
.with(new JakartaValidationModule(
@@ -429,6 +440,13 @@ public class JsonSchemaGenerator {
if (pluginAnnotation.beta()) {
collectedTypeAttributes.put("$beta", true);
}
if (withOutputs) {
Map<String, Object> outputsSchema = this.outputs(null, scope.getType().getErasedType());
collectedTypeAttributes.set("outputs", context.getGeneratorConfig().createObjectNode().pojoNode(
flattenWithoutType(AbstractClassDocumentation.properties(outputsSchema), required(outputsSchema))
));
}
}
// handle deprecated tasks

View File

@@ -1,11 +1,10 @@
package io.kestra.core.models;
import io.kestra.core.utils.MapUtils;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
public record Label(@NotNull String key, @NotNull String value) {
@@ -29,11 +28,36 @@ public record Label(@NotNull String key, @NotNull String value) {
* @return the nested {@link Map}.
*/
public static Map<String, Object> toNestedMap(List<Label> labels) {
Map<String, Object> asMap = labels.stream()
return MapUtils.flattenToNestedMap(toMap(labels));
}
/**
* Static helper method for converting a list of labels to a flat map.
* Key order is kept.
*
* @param labels The list of {@link Label} to be converted.
* @return the flat {@link Map}.
*/
public static Map<String, String> toMap(@Nullable List<Label> labels) {
if (labels == null || labels.isEmpty()) return Collections.emptyMap();
return labels.stream()
.filter(label -> label.value() != null && label.key() != null)
// using an accumulator in case labels with the same key exists: the first is kept
.collect(Collectors.toMap(Label::key, Label::value, (first, second) -> first));
return MapUtils.flattenToNestedMap(asMap);
// using an accumulator in case labels with the same key exists: the second is kept
.collect(Collectors.toMap(Label::key, Label::value, (first, second) -> second, LinkedHashMap::new));
}
/**
* Static helper method for deduplicating a list of labels by their key.
* Value of the last key occurrence is kept.
*
* @param labels The list of {@link Label} to be deduplicated.
* @return the deduplicated {@link List}.
*/
public static List<Label> deduplicate(@Nullable List<Label> labels) {
if (labels == null || labels.isEmpty()) return Collections.emptyList();
return toMap(labels).entrySet().stream()
.map(entry -> new Label(entry.getKey(), entry.getValue()))
.collect(Collectors.toCollection(ArrayList::new));
}
/**

View File

@@ -6,9 +6,9 @@ import com.fasterxml.jackson.annotation.JsonValue;
import io.kestra.core.exceptions.InvalidQueryFiltersException;
import io.kestra.core.models.dashboards.filters.*;
import io.kestra.core.utils.Enums;
import java.util.ArrayList;
import lombok.Builder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -49,42 +49,27 @@ public record QueryFilter(
PREFIX
}
@SuppressWarnings("unchecked")
private List<Object> asValues(Object value) {
return value instanceof String valueStr ? Arrays.asList(valueStr.split(",")) : (List<Object>) value;
}
@SuppressWarnings("unchecked")
public <T extends Enum<T>> AbstractFilter<T> toDashboardFilterBuilder(T field, Object value) {
switch (this.operation) {
case EQUALS:
return EqualTo.<T>builder().field(field).value(value).build();
case NOT_EQUALS:
return NotEqualTo.<T>builder().field(field).value(value).build();
case GREATER_THAN:
return GreaterThan.<T>builder().field(field).value(value).build();
case LESS_THAN:
return LessThan.<T>builder().field(field).value(value).build();
case GREATER_THAN_OR_EQUAL_TO:
return GreaterThanOrEqualTo.<T>builder().field(field).value(value).build();
case LESS_THAN_OR_EQUAL_TO:
return LessThanOrEqualTo.<T>builder().field(field).value(value).build();
case IN:
return In.<T>builder().field(field).values(asValues(value)).build();
case NOT_IN:
return NotIn.<T>builder().field(field).values(asValues(value)).build();
case STARTS_WITH:
return StartsWith.<T>builder().field(field).value(value.toString()).build();
case ENDS_WITH:
return EndsWith.<T>builder().field(field).value(value.toString()).build();
case CONTAINS:
return Contains.<T>builder().field(field).value(value.toString()).build();
case REGEX:
return Regex.<T>builder().field(field).value(value.toString()).build();
case PREFIX:
return Regex.<T>builder().field(field).value("^" + value.toString().replace(".", "\\.") + "(?:\\..+)?$").build();
default:
throw new IllegalArgumentException("Unsupported operation: " + this.operation);
}
return switch (this.operation) {
case EQUALS -> EqualTo.<T>builder().field(field).value(value).build();
case NOT_EQUALS -> NotEqualTo.<T>builder().field(field).value(value).build();
case GREATER_THAN -> GreaterThan.<T>builder().field(field).value(value).build();
case LESS_THAN -> LessThan.<T>builder().field(field).value(value).build();
case GREATER_THAN_OR_EQUAL_TO -> GreaterThanOrEqualTo.<T>builder().field(field).value(value).build();
case LESS_THAN_OR_EQUAL_TO -> LessThanOrEqualTo.<T>builder().field(field).value(value).build();
case IN -> In.<T>builder().field(field).values(asValues(value)).build();
case NOT_IN -> NotIn.<T>builder().field(field).values(asValues(value)).build();
case STARTS_WITH -> StartsWith.<T>builder().field(field).value(value.toString()).build();
case ENDS_WITH -> EndsWith.<T>builder().field(field).value(value.toString()).build();
case CONTAINS -> Contains.<T>builder().field(field).value(value.toString()).build();
case REGEX -> Regex.<T>builder().field(field).value(value.toString()).build();
case PREFIX -> Regex.<T>builder().field(field).value("^" + value.toString().replace(".", "\\.") + "(?:\\..+)?$").build();
};
}
public enum Field {
@@ -154,6 +139,12 @@ public record QueryFilter(
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
}
},
EXECUTION_ID("executionId") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
}
},
CHILD_FILTER("childFilter") {
@Override
public List<Op> supportedOp() {
@@ -228,7 +219,7 @@ public record QueryFilter(
@Override
public List<Field> supportedField() {
return List.of(Field.QUERY, Field.SCOPE, Field.NAMESPACE, Field.START_DATE,
Field.END_DATE, Field.FLOW_ID, Field.TRIGGER_ID, Field.MIN_LEVEL
Field.END_DATE, Field.FLOW_ID, Field.TRIGGER_ID, Field.MIN_LEVEL, Field.EXECUTION_ID
);
}
},

View File

@@ -62,6 +62,7 @@ public record ServiceUsage(
List<DailyServiceStatistics> statistics = Arrays
.stream(ServiceType.values())
.filter(it -> !it.equals(ServiceType.INVALID))
.map(type -> of(from, to, repository, type, interval))
.toList();
return new ServiceUsage(statistics);

View File

@@ -1,74 +0,0 @@
package io.kestra.core.models.collectors;
import io.kestra.core.models.ServerType;
import io.micronaut.core.annotation.Introspected;
import jakarta.annotation.Nullable;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.experimental.SuperBuilder;
import lombok.extern.jackson.Jacksonized;
import java.time.Instant;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;
import java.util.Set;
@SuperBuilder(toBuilder = true)
@Getter
@Jacksonized
@Introspected
@AllArgsConstructor
public class Usage {
@NotNull
private final String uuid;
@NotNull
private final String startUuid;
@NotNull
private final String instanceUuid;
@NotNull
private final ServerType serverType;
@NotNull
private final String version;
@NotNull
private final ZoneId zoneId;
@Nullable
private final String uri;
@Nullable
private final Set<String> environments;
@NotNull
private final Instant startTime;
@Valid
private final HostUsage host;
@Valid
private final ConfigurationUsage configurations;
@Valid
private final List<PluginUsage> plugins;
@Valid
private final FlowUsage flows;
@Valid
private final ExecutionUsage executions;
@Valid
@Nullable
private ServiceUsage services;
@Valid
@Nullable
private List<PluginMetric> pluginMetrics;
}

View File

@@ -25,6 +25,7 @@ import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
import io.kestra.core.services.LabelService;
import io.kestra.core.test.flow.TaskFixture;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.MapUtils;
import io.swagger.v3.oas.annotations.Hidden;
import jakarta.annotation.Nullable;
@@ -131,12 +132,12 @@ public class Execution implements DeletedInterface, TenantInterface {
* @param labels The Flow labels.
* @return a new {@link Execution}.
*/
public static Execution newExecution(final Flow flow, final List<Label> labels) {
public static Execution newExecution(final FlowInterface flow, final List<Label> labels) {
return newExecution(flow, null, labels, Optional.empty());
}
public List<Label> getLabels() {
return Optional.ofNullable(this.labels).orElse(new ArrayList<>());
return ListUtils.emptyOnNull(this.labels);
}
/**
@@ -181,8 +182,22 @@ public class Execution implements DeletedInterface, TenantInterface {
}
/**
* Customization of Lombok-generated builder.
*/
public static class ExecutionBuilder {
/**
* Enforce unique values of {@link Label} when using the builder.
*
* @param labels The labels.
* @return Deduplicated labels.
*/
public ExecutionBuilder labels(List<Label> labels) {
this.labels = Label.deduplicate(labels);
return this;
}
void prebuild() {
this.originalId = this.id;
this.metadata = ExecutionMetadata.builder()
@@ -231,7 +246,6 @@ public class Execution implements DeletedInterface, TenantInterface {
}
public Execution withLabels(List<Label> labels) {
return new Execution(
this.tenantId,
this.id,
@@ -241,7 +255,7 @@ public class Execution implements DeletedInterface, TenantInterface {
this.taskRunList,
this.inputs,
this.outputs,
labels,
Label.deduplicate(labels),
this.variables,
this.state,
this.parentId,
@@ -400,7 +414,7 @@ public class Execution implements DeletedInterface, TenantInterface {
*
* @param resolvedTasks normal tasks
* @param resolvedErrors errors tasks
* @param resolvedErrors finally tasks
* @param resolvedFinally finally tasks
* @return the flow we need to follow
*/
public List<ResolvedTask> findTaskDependingFlowState(
@@ -1026,6 +1040,16 @@ public class Execution implements DeletedInterface, TenantInterface {
return result;
}
/**
* Find all children of this {@link TaskRun}.
*/
public List<TaskRun> findChildren(TaskRun parentTaskRun) {
return taskRunList.stream()
.filter(taskRun -> parentTaskRun.getId().equals(taskRun.getParentTaskRunId()))
.toList();
}
public List<String> findParentsValues(TaskRun taskRun, boolean withCurrent) {
return (withCurrent ?
Stream.concat(findParents(taskRun).stream(), Stream.of(taskRun)) :

View File

@@ -38,6 +38,8 @@ public abstract class AbstractFlow implements FlowInterface {
@Min(value = 1)
Integer revision;
String description;
@Valid
List<Input<?>> inputs;

View File

@@ -61,13 +61,10 @@ public class Flow extends AbstractFlow implements HasUID {
}
});
String description;
Map<String, Object> variables;
@Valid
@NotEmpty
@Schema(additionalProperties = Schema.AdditionalPropertiesValue.TRUE)
List<Task> tasks;
@@ -125,7 +122,7 @@ public class Flow extends AbstractFlow implements HasUID {
AbstractRetry retry;
@Valid
@PluginProperty(beta = true)
@PluginProperty
List<SLA> sla;
public Stream<String> allTypes() {

View File

@@ -4,6 +4,7 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.utils.IdUtils;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import java.util.Optional;
@@ -57,6 +58,7 @@ public interface FlowId {
@Getter
@AllArgsConstructor
@EqualsAndHashCode
class Default implements FlowId {
private final String tenantId;
private final String namespace;

View File

@@ -31,6 +31,8 @@ public interface FlowInterface extends FlowId, DeletedInterface, TenantInterface
Pattern YAML_REVISION_MATCHER = Pattern.compile("(?m)^revision: \\d+\n?");
String getDescription();
boolean isDisabled();
boolean isDeleted();

View File

@@ -4,6 +4,8 @@ import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.kestra.core.models.flows.input.*;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.ConstraintViolationException;
@@ -16,6 +18,8 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.function.Function;
@SuppressWarnings("deprecation")
@SuperBuilder
@Getter
@@ -78,7 +82,7 @@ public abstract class Input<T> implements Data {
@Schema(
title = "The default value to use if no value is specified."
)
T defaults;
Property<T> defaults;
@Schema(
title = "The display name of the input."

View File

@@ -43,4 +43,11 @@ public class Output implements Data {
Type type;
String displayName;
/**
* Specifies whether the output is required or not.
* <p>
* By default, an output is always required.
*/
Boolean required;
}

View File

@@ -116,7 +116,7 @@ public class State {
}
public Instant maxDate() {
if (this.histories.size() == 0) {
if (this.histories.isEmpty()) {
return Instant.now();
}
@@ -124,7 +124,7 @@ public class State {
}
public Instant minDate() {
if (this.histories.size() == 0) {
if (this.histories.isEmpty()) {
return Instant.now();
}
@@ -173,6 +173,11 @@ public class State {
return this.current.isBreakpoint();
}
@JsonIgnore
public boolean isQueued() {
return this.current.isQueued();
}
@JsonIgnore
public boolean isRetrying() {
return this.current.isRetrying();
@@ -206,6 +211,14 @@ public class State {
return this.histories.get(this.histories.size() - 2).state.isPaused();
}
/**
* Return true if the execution has failed, then was restarted.
* This is to disambiguate between a RESTARTED after PAUSED and RESTARTED after FAILED state.
*/
public boolean failedThenRestarted() {
return this.current == Type.RESTARTED && this.histories.get(this.histories.size() - 2).state.isFailed();
}
@Introspected
public enum Type {
CREATED,
@@ -264,6 +277,10 @@ public class State {
return this == Type.KILLED;
}
public boolean isQueued(){
return this == Type.QUEUED;
}
/**
* @return states that are terminal to an execution
*/

View File

@@ -20,9 +20,8 @@ public class FileInput extends Input<URI> {
private static final String DEFAULT_EXTENSION = ".upl";
@Builder.Default
@Deprecated(since = "0.24", forRemoval = true)
public String extension = DEFAULT_EXTENSION;
public String extension;
@Override
public void validate(URI input) throws ConstraintViolationException {
@@ -33,6 +32,7 @@ public class FileInput extends Input<URI> {
String res = inputs.stream()
.filter(in -> in instanceof FileInput)
.filter(in -> in.getId().equals(fileName))
.filter(flowInput -> ((FileInput) flowInput).getExtension() != null)
.map(flowInput -> ((FileInput) flowInput).getExtension())
.findFirst()
.orElse(FileInput.DEFAULT_EXTENSION);

View File

@@ -6,19 +6,21 @@ import jakarta.validation.ConstraintViolationException;
import jakarta.validation.constraints.NotNull;
/**
* Represents a
* Represents an input along with its associated value and validation state.
*
* @param input The flow's {@link Input}.
* @param value The flow's input value/data.
* @param enabled Specify whether the input is enabled.
* @param exception The input validation exception.
* @param input The {@link Input} definition of the flow.
* @param value The provided value for the input.
* @param enabled {@code true} if the input is enabled; {@code false} otherwise.
* @param isDefault {@code true} if the provided value is the default; {@code false} otherwise.
* @param exception The validation exception, if the input value is invalid; {@code null} otherwise.
*/
public record InputAndValue(
Input<?> input,
Object value,
boolean enabled,
boolean isDefault,
ConstraintViolationException exception) {
/**
* Creates a new {@link InputAndValue} instance.
*
@@ -26,6 +28,6 @@ public record InputAndValue(
* @param value The value.
*/
public InputAndValue(@NotNull Input<?> input, @Nullable Object value) {
this(input, value, true, null);
this(input, value, true, false, null);
}
}

View File

@@ -68,6 +68,19 @@ public class Property<T> {
String getExpression() {
return expression;
}
/**
* Returns a new {@link Property} with no cached rendered value,
* so that the next render will evaluate its original Pebble expression.
* <p>
* The returned property will still cache its rendered result.
* To re-evaluate on a subsequent render, call {@code skipCache()} again.
*
* @return a new {@link Property} without a pre-rendered value
*/
public Property<T> skipCache() {
return Property.ofExpression(expression);
}
/**
* Build a new Property object with a value already set.<br>
@@ -132,8 +145,8 @@ public class Property<T> {
*
* @see io.kestra.core.runners.RunContextProperty#as(Class)
*/
public static <T> T as(Property<T> property, RunContext runContext, Class<T> clazz) throws IllegalVariableEvaluationException {
return as(property, runContext, clazz, Map.of());
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz) throws IllegalVariableEvaluationException {
return as(property, context, clazz, Map.of());
}
/**
@@ -143,9 +156,9 @@ public class Property<T> {
*
* @see io.kestra.core.runners.RunContextProperty#as(Class, Map)
*/
public static <T> T as(Property<T> property, RunContext runContext, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.value == null) {
String rendered = runContext.render(property.expression, variables);
String rendered = context.render(property.expression, variables);
property.value = MAPPER.convertValue(rendered, clazz);
}
@@ -159,8 +172,8 @@ public class Property<T> {
*
* @see io.kestra.core.runners.RunContextProperty#asList(Class)
*/
public static <T, I> T asList(Property<T> property, RunContext runContext, Class<I> itemClazz) throws IllegalVariableEvaluationException {
return asList(property, runContext, itemClazz, Map.of());
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz) throws IllegalVariableEvaluationException {
return asList(property, context, itemClazz, Map.of());
}
/**
@@ -171,7 +184,7 @@ public class Property<T> {
* @see io.kestra.core.runners.RunContextProperty#asList(Class, Map)
*/
@SuppressWarnings("unchecked")
public static <T, I> T asList(Property<T> property, RunContext runContext, Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.value == null) {
JavaType type = MAPPER.getTypeFactory().constructCollectionLikeType(List.class, itemClazz);
try {
@@ -179,7 +192,7 @@ public class Property<T> {
// We need to detect if the expression is already a list or if it's a pebble expression (for eg. referencing a variable containing a list).
// Doing that allows us to, if it's an expression, first render then read it as a list.
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
property.value = MAPPER.readValue(runContext.render(property.expression, variables), type);
property.value = MAPPER.readValue(context.render(property.expression, variables), type);
}
// Otherwise, if it's already a list, we read it as a list first then render it from run context which handle list rendering by rendering each item of the list
else {
@@ -187,9 +200,9 @@ public class Property<T> {
property.value = (T) asRawList.stream()
.map(throwFunction(item -> {
if (item instanceof String str) {
return MAPPER.convertValue(runContext.render(str, variables), itemClazz);
return MAPPER.convertValue(context.render(str, variables), itemClazz);
} else if (item instanceof Map map) {
return MAPPER.convertValue(runContext.render(map, variables), itemClazz);
return MAPPER.convertValue(context.render(map, variables), itemClazz);
}
return item;
}))

View File

@@ -0,0 +1,38 @@
package io.kestra.core.models.property;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.VariableRenderer;
import java.util.Map;
/**
* Contextual object for rendering properties.
*
* @see Property
*/
public interface PropertyContext {
String render(String inline, Map<String, Object> variables) throws IllegalVariableEvaluationException;
Map<String, Object> render(Map<String, Object> inline, Map<String, Object> variables) throws IllegalVariableEvaluationException;
/**
* Static helper method for creating a new {@link PropertyContext} from a given {@link VariableRenderer}.
*
* @param renderer the {@link VariableRenderer}.
* @return a new {@link PropertyContext}.
*/
static PropertyContext create(final VariableRenderer renderer) {
return new PropertyContext() {
@Override
public String render(String inline, Map<String, Object> variables) throws IllegalVariableEvaluationException {
return renderer.render(inline, variables);
}
@Override
public Map<String, Object> render(Map<String, Object> inline, Map<String, Object> variables) throws IllegalVariableEvaluationException {
return renderer.render(inline, variables);
}
};
}
}

View File

@@ -222,6 +222,7 @@ public class Trigger extends TriggerContext implements HasUID {
}
// If trigger is a schedule and execution ended after the next execution date
else if (abstractTrigger instanceof Schedule schedule &&
this.getNextExecutionDate() != null &&
execution.getState().getEndDate().get().isAfter(this.getNextExecutionDate().toInstant())
) {
RecoverMissedSchedules recoverMissedSchedules = Optional.ofNullable(schedule.getRecoverMissedSchedules())

View File

@@ -5,11 +5,9 @@ import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.runners.*;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.templates.Template;
public interface QueueFactoryInterface {
String EXECUTION_NAMED = "executionQueue";
@@ -28,12 +26,13 @@ public interface QueueFactoryInterface {
String SUBFLOWEXECUTIONRESULT_NAMED = "subflowExecutionResultQueue";
String CLUSTER_EVENT_NAMED = "clusterEventQueue";
String SUBFLOWEXECUTIONEND_NAMED = "subflowExecutionEndQueue";
String EXECUTION_RUNNING_NAMED = "executionRunningQueue";
QueueInterface<Execution> execution();
QueueInterface<Executor> executor();
QueueInterface<WorkerJob> workerJob();
WorkerJobQueueInterface workerJob();
QueueInterface<WorkerTaskResult> workerTaskResult();
@@ -58,4 +57,6 @@ public interface QueueFactoryInterface {
QueueInterface<SubflowExecutionResult> subflowExecutionResult();
QueueInterface<SubflowExecutionEnd> subflowExecutionEnd();
QueueInterface<ExecutionRunning> executionRunning();
}

View File

@@ -27,7 +27,7 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
void delete(String consumerGroup, T message) throws QueueException;
default Runnable receive(Consumer<Either<T, DeserializationException>> consumer) {
return receive((String) null, consumer);
return receive(null, consumer, false);
}
default Runnable receive(String consumerGroup, Consumer<Either<T, DeserializationException>> consumer) {

View File

@@ -27,8 +27,6 @@ public class QueueService {
return ((Executor) object).getExecution().getId();
} else if (object.getClass() == MetricEntry.class) {
return null;
} else if (object.getClass() == ExecutionRunning.class) {
return ((ExecutionRunning) object).getExecution().getId();
} else if (object.getClass() == SubflowExecutionEnd.class) {
return ((SubflowExecutionEnd) object).getParentExecutionId();
} else {

View File

@@ -0,0 +1,12 @@
package io.kestra.core.queues;
import java.io.Serial;
public class UnsupportedMessageException extends QueueException {
@Serial
private static final long serialVersionUID = 1L;
public UnsupportedMessageException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -0,0 +1,12 @@
package io.kestra.core.queues;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.utils.Either;
import java.util.function.Consumer;
public interface WorkerJobQueueInterface extends QueueInterface<WorkerJob> {
Runnable subscribe(String workerId, String workerGroup, Consumer<Either<WorkerJob, DeserializationException>> consumer);
}

View File

@@ -0,0 +1,29 @@
package io.kestra.core.reporter;
public abstract class AbstractReportable<T extends Reportable.Event> implements Reportable<T> {
private final Type type;
private final ReportingSchedule schedule;
private final boolean isTenantSupported;
public AbstractReportable(Type type, ReportingSchedule schedule, boolean isTenantSupported) {
this.type = type;
this.schedule = schedule;
this.isTenantSupported = isTenantSupported;
}
@Override
public boolean isTenantSupported() {
return isTenantSupported;
}
@Override
public Type type() {
return type;
}
@Override
public ReportingSchedule schedule() {
return schedule;
}
}

View File

@@ -0,0 +1,94 @@
package io.kestra.core.reporter;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
/**
* Interface for reporting server event for a specific type.
*
* @param <T>
*/
public interface Reportable<T extends Reportable.Event> {
/**
* Gets the type of the event to report.
*/
Type type();
/**
* Gets the reporting schedule.
*/
ReportingSchedule schedule();
/**
* Generates a report for the given timestamp.
*
* @param now the time when the report is triggered.
* @return an Optional containing the report data if available.
*/
T report(Instant now, TimeInterval interval);
default T report(Instant now) {
ZonedDateTime to = now.atZone(ZoneId.systemDefault());
ZonedDateTime from = to.minus(Duration.ofDays(1));
return report(now, new TimeInterval(from, to));
}
/**
* Checks whether this reportable is enabled for scheduled reporting.
*/
boolean isEnabled();
/**
* Generates a report for the given timestamp and tenant.
*
* @param now the time when the report is triggered.
* @param tenant the tenant for which the report is triggered.
* @return the event to report.
*/
default T report(Instant now, TimeInterval interval, String tenant) {
throw new UnsupportedOperationException();
}
default T report(Instant now, String tenant) {
ZonedDateTime to = now.atZone(ZoneId.systemDefault());
ZonedDateTime from = to.minus(Duration.ofDays(1));
return report(now, new TimeInterval(from, to), tenant);
}
/**
* Checks whether this {@link Reportable} can accept a tenant.
*
* @return {@code true} a {@link #report(Instant, TimeInterval, String)} can called, Otherwise {@code false}.
*/
default boolean isTenantSupported() {
return false;
}
record TimeInterval(ZonedDateTime from, ZonedDateTime to){
public static TimeInterval of(ZonedDateTime from, ZonedDateTime to) {
return new TimeInterval(from, to);
}
}
/**
* Marker interface indicating that the returned event
* must be a structured, domain-specific object
* (not a primitive wrapper, String, collection, or other basic type).
*/
interface Event {
}
/**
* Defines the schedule for a report.
*/
interface ReportingSchedule {
/**
* Determines whether a report should run at the given instant.
*/
boolean shouldRun(Instant now);
}
}

View File

@@ -0,0 +1,40 @@
package io.kestra.core.reporter;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@Singleton
@Slf4j
public class ReportableRegistry {
private final Map<Type, Reportable<?>> reportables = new ConcurrentHashMap<>();
/**
* Creates a new {@link ReportableRegistry} instance.
*
* @param reportables The {@link Reportable reportables}
*/
@Inject
public ReportableRegistry(final List<Reportable<?>> reportables) {
reportables.forEach(reportable -> this.reportables.put(reportable.type(), reportable));
}
public void register(final Reportable<?> reportable) {
Objects.requireNonNull(reportable, "reportable must not be null");
if (reportables.containsKey(reportable.type())) {
log.warn("Event already registered for type '{}'", reportable.type());
} else {
reportables.put(reportable.type(), reportable);
}
}
public List<Reportable<?>> getAll() {
return List.copyOf(reportables.values());
}
}

View File

@@ -0,0 +1,43 @@
package io.kestra.core.reporter;
import io.micronaut.context.annotation.Requires;
import io.micronaut.scheduling.annotation.Scheduled;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.time.Clock;
import java.time.Instant;
@Singleton
@Requires(property = "kestra.anonymous-usage-report.enabled", value = "true")
@Requires(property = "kestra.server-type")
@Slf4j
public class ReportableScheduler {
private final ReportableRegistry registry;
private final ServerEventSender sender;
private final Clock clock;
@Inject
public ReportableScheduler(ReportableRegistry registry, ServerEventSender sender) {
this.registry = registry;
this.sender = sender;
this.clock = Clock.systemDefaultZone();
}
@Scheduled(fixedDelay = "5m", initialDelay = "${kestra.anonymous-usage-report.initial-delay}")
public void tick() {
Instant now = clock.instant();
for (Reportable<?> r : registry.getAll()) {
if (r.isEnabled() && r.schedule().shouldRun(now)) {
try {
Object value = r.report(now);
if (value != null) sender.send(now, r.type(), value);
} catch (Exception e) {
log.debug("Failed to send report for event-type '{}'", r.type(), e);
}
}
}
}
}

View File

@@ -0,0 +1,57 @@
package io.kestra.core.reporter;
import io.kestra.core.reporter.Reportable.ReportingSchedule;
import java.time.Duration;
import java.time.Instant;
/**
* Utility class providing common implementations of {@link Reportable.ReportingSchedule}.
*/
public class Schedules {
/**
* Creates a reporting schedule that triggers after the specified period has elapsed
* since the last execution.
*
* @param period the duration between successive runs; must be positive
* @return a {@link Reportable.ReportingSchedule} that runs at the given interval
* @throws IllegalArgumentException if {@code period} is zero or negative
*/
public static ReportingSchedule every(final Duration period) {
if (period.isZero() || period.isNegative()) {
throw new IllegalArgumentException("Period must be positive");
}
return new ReportingSchedule() {
private Instant lastRun = Instant.EPOCH;
@Override
public boolean shouldRun(Instant now) {
if (Duration.between(lastRun, now).compareTo(period) >= 0) {
lastRun = now;
return true;
}
return false;
}
};
}
/**
* Creates a reporting schedule that triggers once every hour.
*
* @return a schedule running every 1 hour
*/
public static ReportingSchedule hourly() {
return every(Duration.ofHours(1));
}
/**
* Creates a reporting schedule that triggers once every day.
*
* @return a schedule running every 24 hours
*/
public static ReportingSchedule daily() {
return every(Duration.ofDays(1));
}
}

View File

@@ -0,0 +1,31 @@
package io.kestra.core.reporter;
import com.fasterxml.jackson.annotation.JsonUnwrapped;
import io.kestra.core.models.ServerType;
import lombok.Builder;
import java.time.ZoneId;
import java.time.ZonedDateTime;
/**
* Represents a Kestra Server Event.
*/
@Builder(toBuilder = true)
public record ServerEvent(
String instanceUuid,
String sessionUuid,
ServerType serverType,
String serverVersion,
ZoneId zoneId,
Object payload,
String uuid,
ZonedDateTime reportedAt
) {
@JsonUnwrapped
public Object payload() {
return payload;
}
}

View File

@@ -0,0 +1,91 @@
package io.kestra.core.reporter;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.models.collectors.Result;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.services.InstanceService;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.VersionProvider;
import io.micronaut.context.annotation.Value;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.annotation.Client;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.hateoas.JsonError;
import io.micronaut.reactor.http.client.ReactorHttpClient;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.net.URI;
import java.time.Instant;
import java.time.ZoneId;
import java.util.UUID;
@Singleton
@Slf4j
public class ServerEventSender {
private static final String SESSION_UUID = IdUtils.create();
private static final ObjectMapper OBJECT_MAPPER = JacksonMapper.ofJson();
@Inject
@Client
private ReactorHttpClient client;
@Inject
private VersionProvider versionProvider;
@Inject
private InstanceService instanceService;
private final ServerType serverType;
@Value("${kestra.anonymous-usage-report.uri}")
protected URI url;
public ServerEventSender( ) {
this.serverType = KestraContext.getContext().getServerType();
}
public void send(final Instant now, final Type type, Object event) {
ServerEvent serverEvent = ServerEvent
.builder()
.uuid(UUID.randomUUID().toString())
.sessionUuid(SESSION_UUID)
.instanceUuid(instanceService.fetch())
.serverType(serverType)
.serverVersion(versionProvider.getVersion())
.reportedAt(now.atZone(ZoneId.systemDefault()))
.payload(event)
.zoneId(ZoneId.systemDefault())
.build();
try {
MutableHttpRequest<ServerEvent> request = this.request(serverEvent, type);
if (log.isTraceEnabled()) {
log.trace("Report anonymous usage: '{}'", OBJECT_MAPPER.writeValueAsString(serverEvent));
}
this.handleResponse(client.toBlocking().retrieve(request, Argument.of(Result.class), Argument.of(JsonError.class)));
} catch (HttpClientResponseException t) {
log.trace("Unable to report anonymous usage with body '{}'", t.getResponse().getBody(String.class), t);
} catch (Exception t) {
log.trace("Unable to handle anonymous usage", t);
}
}
private void handleResponse (Result result){
}
protected MutableHttpRequest<ServerEvent> request(ServerEvent event, Type type) throws Exception {
URI baseUri = URI.create(this.url.toString().endsWith("/") ? this.url.toString() : this.url + "/");
URI resolvedUri = baseUri.resolve(type.name().toLowerCase());
return HttpRequest.POST(resolvedUri, event)
.header("User-Agent", "Kestra/" + versionProvider.getVersion());
}
}

View File

@@ -0,0 +1,9 @@
package io.kestra.core.reporter;
/**
* A reportable event type.
*/
public interface Type {
String name();
}

View File

@@ -0,0 +1,12 @@
package io.kestra.core.reporter;
/**
* All supported reportable event type.
*/
public enum Types implements Type {
USAGE,
SYSTEM_INFORMATION,
PLUGIN_METRICS,
SERVICE_USAGE,
PLUGIN_USAGE;
}

View File

@@ -0,0 +1,6 @@
package io.kestra.core.reporter.model;
public record Count(
long count
) {
}

View File

@@ -0,0 +1,80 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.models.collectors.ExecutionUsage;
import io.kestra.core.models.collectors.FlowUsage;
import io.kestra.core.reporter.AbstractReportable;
import io.kestra.core.reporter.Schedules;
import io.kestra.core.reporter.Types;
import io.kestra.core.reporter.model.Count;
import io.kestra.core.repositories.DashboardRepositoryInterface;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.micronaut.core.annotation.Introspected;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Getter;
import lombok.experimental.SuperBuilder;
import lombok.extern.jackson.Jacksonized;
import java.time.Instant;
import java.util.Objects;
@Singleton
public class FeatureUsageReport extends AbstractReportable<FeatureUsageReport.UsageEvent> {
private final FlowRepositoryInterface flowRepository;
private final ExecutionRepositoryInterface executionRepository;
private final DashboardRepositoryInterface dashboardRepository;
private final boolean enabled;
@Inject
public FeatureUsageReport(FlowRepositoryInterface flowRepository,
ExecutionRepositoryInterface executionRepository,
DashboardRepositoryInterface dashboardRepository) {
super(Types.USAGE, Schedules.hourly(), true);
this.flowRepository = flowRepository;
this.executionRepository = executionRepository;
this.dashboardRepository = dashboardRepository;
ServerType serverType = KestraContext.getContext().getServerType();
this.enabled = ServerType.EXECUTOR.equals(serverType) || ServerType.STANDALONE.equals(serverType);
}
@Override
public UsageEvent report(final Instant now, TimeInterval interval) {
return UsageEvent
.builder()
.flows(FlowUsage.of(flowRepository))
.executions(ExecutionUsage.of(executionRepository, interval.from(), interval.to()))
.dashboards(new Count(dashboardRepository.count()))
.build();
}
@Override
public boolean isEnabled() {
return enabled;
}
@Override
public UsageEvent report(Instant now, TimeInterval interval, String tenant) {
Objects.requireNonNull(tenant, "tenant is null");
Objects.requireNonNull(interval, "interval is null");
return UsageEvent
.builder()
.flows(FlowUsage.of(tenant, flowRepository))
.executions(ExecutionUsage.of(tenant, executionRepository, interval.from(), interval.to()))
.build();
}
@SuperBuilder(toBuilder = true)
@Getter
@Jacksonized
@Introspected
public static class UsageEvent implements Event {
private ExecutionUsage executions;
private FlowUsage flows;
private Count dashboards;
}
}

View File

@@ -0,0 +1,105 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.ServerType;
import io.kestra.core.models.collectors.PluginMetric;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.reporter.AbstractReportable;
import io.kestra.core.reporter.Schedules;
import io.kestra.core.reporter.Types;
import io.kestra.core.utils.ListUtils;
import io.micrometer.core.instrument.Timer;
import io.micronaut.core.annotation.Introspected;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Builder;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@Singleton
public class PluginMetricReport extends AbstractReportable<PluginMetricReport.PluginMetricEvent> {
private final PluginRegistry pluginRegistry;
private final MetricRegistry metricRegistry;
private final boolean enabled;
@Inject
public PluginMetricReport(PluginRegistry pluginRegistry,
MetricRegistry metricRegistry) {
super(Types.PLUGIN_METRICS, Schedules.daily(), false);
this.metricRegistry = metricRegistry;
this.pluginRegistry = pluginRegistry;
ServerType serverType = KestraContext.getContext().getServerType();
this.enabled = ServerType.SCHEDULER.equals(serverType) || ServerType.WORKER.equals(serverType) || ServerType.STANDALONE.equals(serverType);
}
@Override
public PluginMetricEvent report(final Instant now, final TimeInterval period) {
return PluginMetricEvent
.builder()
.pluginMetrics(pluginMetrics())
.build();
}
@Override
public boolean isEnabled() {
return enabled;
}
@Builder
@Introspected
public record PluginMetricEvent (
List<PluginMetric> pluginMetrics
) implements Event {
}
private List<PluginMetric> pluginMetrics() {
List<PluginMetric> taskMetrics = pluginRegistry.plugins().stream()
.flatMap(registeredPlugin -> registeredPlugin.getTasks().stream())
.map(Class::getName)
.map(this::taskMetric)
.flatMap(Optional::stream)
.toList();
List<PluginMetric> triggerMetrics = pluginRegistry.plugins().stream()
.flatMap(registeredPlugin -> registeredPlugin.getTriggers().stream())
.map(Class::getName)
.map(this::triggerMetric)
.flatMap(Optional::stream)
.toList();
return ListUtils.concat(taskMetrics, triggerMetrics);
}
private Optional<PluginMetric> taskMetric(String type) {
Timer duration = metricRegistry.find(MetricRegistry.METRIC_WORKER_ENDED_DURATION).tag(MetricRegistry.TAG_TASK_TYPE, type).timer();
return fromTimer(type, duration);
}
private Optional<PluginMetric> triggerMetric(String type) {
Timer duration = metricRegistry.find(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION).tag(MetricRegistry.TAG_TRIGGER_TYPE, type).timer();
if (duration == null) {
// this may be because this is a trigger executed by the scheduler, we search there instead
duration = metricRegistry.find(MetricRegistry.METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION).tag(MetricRegistry.TAG_TRIGGER_TYPE, type).timer();
}
return fromTimer(type, duration);
}
private Optional<PluginMetric> fromTimer(String type, Timer timer) {
if (timer == null || timer.count() == 0) {
return Optional.empty();
}
double count = timer.count();
double totalTime = timer.totalTime(TimeUnit.MILLISECONDS);
double meanTime = timer.mean(TimeUnit.MILLISECONDS);
return Optional.of(new PluginMetric(type, count, totalTime, meanTime));
}
}

View File

@@ -0,0 +1,51 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.models.collectors.PluginUsage;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.reporter.AbstractReportable;
import io.kestra.core.reporter.Schedules;
import io.kestra.core.reporter.Types;
import io.micronaut.core.annotation.Introspected;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Builder;
import java.time.Instant;
import java.util.List;
@Singleton
public class PluginUsageReport extends AbstractReportable<PluginUsageReport.PluginUsageEvent> {
private final PluginRegistry pluginRegistry;
private final boolean enabled;
@Inject
public PluginUsageReport(PluginRegistry pluginRegistry) {
super(Types.PLUGIN_USAGE, Schedules.daily(), false);
this.pluginRegistry = pluginRegistry;
ServerType serverType = KestraContext.getContext().getServerType();
this.enabled = ServerType.EXECUTOR.equals(serverType) || ServerType.STANDALONE.equals(serverType);
}
@Override
public PluginUsageEvent report(final Instant now, final TimeInterval period) {
return PluginUsageEvent
.builder()
.plugins(PluginUsage.of(pluginRegistry))
.build();
}
@Override
public boolean isEnabled() {
return enabled;
}
@Builder
@Introspected
public record PluginUsageEvent(
List<PluginUsage> plugins
) implements Event {
}
}

View File

@@ -0,0 +1,53 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.models.collectors.ServiceUsage;
import io.kestra.core.reporter.AbstractReportable;
import io.kestra.core.reporter.Schedules;
import io.kestra.core.reporter.Types;
import io.kestra.core.repositories.ServiceInstanceRepositoryInterface;
import io.micronaut.core.annotation.Introspected;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Builder;
import java.time.Duration;
import java.time.Instant;
@Singleton
public class ServiceUsageReport extends AbstractReportable<ServiceUsageReport.ServiceUsageEvent> {
private final ServiceInstanceRepositoryInterface serviceInstanceRepository;
private final boolean isEnabled;
@Inject
public ServiceUsageReport(ServiceInstanceRepositoryInterface serviceInstanceRepository) {
super(Types.SERVICE_USAGE, Schedules.daily(), false);
this.serviceInstanceRepository = serviceInstanceRepository;
ServerType serverType = KestraContext.getContext().getServerType();
this.isEnabled = ServerType.STANDALONE.equals(serverType) || ServerType.EXECUTOR.equals(serverType);
}
@Override
public ServiceUsageEvent report(final Instant now, final TimeInterval period) {
return ServiceUsageEvent
.builder()
.services(ServiceUsage.of(period.from().toInstant(), period.to().toInstant(), serviceInstanceRepository, Duration.ofMinutes(5)))
.build();
}
@Override
public boolean isEnabled() {
return isEnabled;
}
@Builder
@Introspected
public record ServiceUsageEvent(
ServiceUsage services
) implements Event {
}
}

View File

@@ -0,0 +1,63 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.models.collectors.ConfigurationUsage;
import io.kestra.core.models.collectors.HostUsage;
import io.kestra.core.reporter.AbstractReportable;
import io.kestra.core.reporter.Schedules;
import io.kestra.core.reporter.Types;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.core.annotation.Introspected;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Builder;
import java.lang.management.ManagementFactory;
import java.time.Instant;
import java.util.Set;
@Singleton
public class SystemInformationReport extends AbstractReportable<SystemInformationReport.SystemInformationEvent> {
private final Environment environment;
private final ApplicationContext applicationContext;
private final String kestraUrl;
private final Instant startTime;
@Inject
public SystemInformationReport(ApplicationContext applicationContext) {
super(Types.SYSTEM_INFORMATION, Schedules.daily(), false);
this.environment = applicationContext.getEnvironment();
this.applicationContext = applicationContext;
this.kestraUrl = applicationContext.getProperty("kestra.url", String.class).orElse(null);
this.startTime = Instant.ofEpochMilli(ManagementFactory.getRuntimeMXBean().getStartTime());
}
@Override
public SystemInformationEvent report(final Instant now, final TimeInterval timeInterval) {
return SystemInformationEvent
.builder()
.environments(environment.getActiveNames())
.configurations(ConfigurationUsage.of(applicationContext))
.startTime(startTime)
.host(HostUsage.of())
.uri(kestraUrl)
.build();
}
@Override
public boolean isEnabled() {
return true;
}
@Builder
@Introspected
public record SystemInformationEvent(
Set<String> environments,
HostUsage host,
ConfigurationUsage configurations,
Instant startTime,
String uri
) implements Event {
}
}

View File

@@ -16,6 +16,14 @@ import java.util.Map;
import java.util.Optional;
public interface DashboardRepositoryInterface {
/**
* Gets the total number of Dashboards.
*
* @return the total number.
*/
long count();
Boolean isEnabled();
Optional<Dashboard> get(String tenantId, String id);

View File

@@ -161,7 +161,7 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
}
List<Execution> lastExecutions(
@Nullable String tenantId,
String tenantId,
@Nullable List<FlowFilter> flows
);
}

View File

@@ -81,9 +81,7 @@ public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry
Flux<LogEntry> findAsync(
@Nullable String tenantId,
@Nullable String namespace,
@Nullable Level minLevel,
ZonedDateTime startDate
List<QueryFilter> filters
);
Flux<LogEntry> findAllAsync(@Nullable String tenantId);
@@ -96,5 +94,7 @@ public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry
void deleteByQuery(String tenantId, String namespace, String flowId, String triggerId);
void deleteByFilters(String tenantId, List<QueryFilter> filters);
int deleteByQuery(String tenantId, String namespace, String flowId, String executionId, List<Level> logLevels, ZonedDateTime startDate, ZonedDateTime endDate);
}

View File

@@ -1,5 +1,6 @@
package io.kestra.core.runners;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.utils.IdUtils;
import jakarta.validation.constraints.NotNull;
@@ -11,7 +12,7 @@ import lombok.With;
@Value
@AllArgsConstructor
@Builder
public class ExecutionRunning {
public class ExecutionRunning implements HasUID {
String tenantId;
@NotNull
@@ -26,6 +27,7 @@ public class ExecutionRunning {
@With
ConcurrencyState concurrencyState;
@Override
public String uid() {
return IdUtils.fromPartsAndSeparator('|', this.tenantId, this.namespace, this.flowId, this.execution.getId());
}

View File

@@ -86,7 +86,7 @@ public class Executor {
public Boolean canBeProcessed() {
return !(this.getException() != null || this.getFlow() == null || this.getFlow() instanceof FlowWithException || this.getFlow().getTasks() == null ||
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint());
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint() || this.getExecution().getState().isQueued());
}
public Executor withFlow(FlowWithSource flow) {

View File

@@ -102,49 +102,39 @@ public class ExecutorService {
return this.flowExecutorInterface;
}
public Executor checkConcurrencyLimit(Executor executor, FlowInterface flow, Execution execution, long count) {
// if above the limit, handle concurrency limit based on its behavior
if (count >= flow.getConcurrency().getLimit()) {
public ExecutionRunning processExecutionRunning(FlowInterface flow, int runningCount, ExecutionRunning executionRunning) {
// if concurrency was removed, it can be null as we always get the latest flow definition
if (flow.getConcurrency() != null && runningCount >= flow.getConcurrency().getLimit()) {
return switch (flow.getConcurrency().getBehavior()) {
case QUEUE -> {
var newExecution = execution.withState(State.Type.QUEUED);
ExecutionRunning executionRunning = ExecutionRunning.builder()
.tenantId(flow.getTenantId())
.namespace(flow.getNamespace())
.flowId(flow.getId())
.execution(newExecution)
.concurrencyState(ExecutionRunning.ConcurrencyState.QUEUED)
.build();
// when max concurrency is reached, we throttle the execution and stop processing
logService.logExecution(
newExecution,
executionRunning.getExecution(),
Level.INFO,
"Flow is queued due to concurrency limit exceeded, {} running(s)",
count
"Execution is queued due to concurrency limit exceeded, {} running(s)",
runningCount
);
// return the execution queued
yield executor
.withExecutionRunning(executionRunning)
.withExecution(newExecution, "checkConcurrencyLimit");
var newExecution = executionRunning.getExecution().withState(State.Type.QUEUED);
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_QUEUED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_QUEUED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
yield executionRunning
.withExecution(newExecution)
.withConcurrencyState(ExecutionRunning.ConcurrencyState.QUEUED);
}
case CANCEL ->
executor.withExecution(execution.withState(State.Type.CANCELLED), "checkConcurrencyLimit");
executionRunning
.withExecution(executionRunning.getExecution().withState(State.Type.CANCELLED))
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
case FAIL ->
executor.withException(new IllegalStateException("Flow is FAILED due to concurrency limit exceeded"), "checkConcurrencyLimit");
executionRunning
.withExecution(executionRunning.getExecution().failedExecutionFromExecutor(new IllegalStateException("Execution is FAILED due to concurrency limit exceeded")).getExecution())
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
};
}
// if under the limit, update the executor with a RUNNING ExecutionRunning to track them
var executionRunning = new ExecutionRunning(
flow.getTenantId(),
flow.getNamespace(),
flow.getId(),
executor.getExecution(),
ExecutionRunning.ConcurrencyState.RUNNING
);
return executor.withExecutionRunning(executionRunning);
// if under the limit, run it!
return executionRunning
.withExecution(executionRunning.getExecution().withState(State.Type.RUNNING))
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
}
public Executor process(Executor executor) {
@@ -247,9 +237,9 @@ public class ExecutorService {
try {
state = flowableParent.resolveState(runContext, execution, parentTaskRun);
} catch (Exception e) {
// This will lead to the next task being still executed but at least Kestra will not crash.
// This will lead to the next task being still executed, but at least Kestra will not crash.
// This is the best we can do, Flowable task should not fail, so it's a kind of panic mode.
runContext.logger().error("Unable to resolve state from the Flowable task: " + e.getMessage(), e);
runContext.logger().error("Unable to resolve state from the Flowable task: {}", e.getMessage(), e);
state = Optional.of(State.Type.FAILED);
}
Optional<WorkerTaskResult> endedTask = childWorkerTaskTypeToWorkerTask(
@@ -390,11 +380,9 @@ public class ExecutorService {
if (flow.getOutputs() != null) {
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
try {
Map<String, Object> outputs = flow.getOutputs()
.stream()
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
outputs = runContext.render(outputs);
Map<String, Object> outputs = FlowInputOutput.renderFlowOutputs(flow.getOutputs(), runContext);
outputs = flowInputOutput.typedOutputs(flow, executor.getExecution(), outputs);
newExecution = newExecution.withOutputs(outputs);
} catch (Exception e) {
@@ -599,6 +587,23 @@ public class ExecutorService {
list = list.stream().filter(workerTaskResult -> !workerTaskResult.getTaskRun().getId().equals(taskRun.getParentTaskRunId()))
.collect(Collectors.toCollection(ArrayList::new));
}
// If the task is a flowable and its terminated, check that all children are terminated.
// This may not be the case for parallel flowable tasks like Parallel, Dag, ForEach...
// After a fail task, some child flowable may not be correctly terminated.
if (task instanceof FlowableTask<?> && taskRun.getState().isTerminated()) {
List<TaskRun> updated = executor.getExecution().findChildren(taskRun).stream()
.filter(child -> !child.getState().isTerminated())
.map(throwFunction(child -> child.withState(taskRun.getState().getCurrent())))
.toList();
if (!updated.isEmpty()) {
Execution execution = executor.getExecution();
for (TaskRun child : updated) {
execution = execution.withTaskRun(child);
}
executor = executor.withExecution(execution, "handledTerminatedFlowableTasks");
}
}
}
metricRegistry

View File

@@ -2,7 +2,6 @@ package io.kestra.core.runners;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.encryption.EncryptionService;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.KestraRuntimeException;
@@ -12,11 +11,14 @@ import io.kestra.core.models.flows.DependsOn;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.Output;
import io.kestra.core.models.flows.RenderableInput;
import io.kestra.core.models.flows.Type;
import io.kestra.core.models.flows.input.FileInput;
import io.kestra.core.models.flows.input.InputAndValue;
import io.kestra.core.models.flows.input.ItemTypeInterface;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.property.PropertyContext;
import io.kestra.core.models.property.URIFetcher;
import io.kestra.core.models.tasks.common.EncryptedString;
import io.kestra.core.models.validations.ManualConstraintViolation;
@@ -75,16 +77,19 @@ public class FlowInputOutput {
private final StorageInterface storageInterface;
private final Optional<String> secretKey;
private final RunContextFactory runContextFactory;
private final VariableRenderer variableRenderer;
@Inject
public FlowInputOutput(
StorageInterface storageInterface,
RunContextFactory runContextFactory,
VariableRenderer variableRenderer,
@Nullable @Value("${kestra.encryption.secret-key}") String secretKey
) {
this.storageInterface = storageInterface;
this.runContextFactory = runContextFactory;
this.secretKey = Optional.ofNullable(secretKey);
this.variableRenderer = variableRenderer;
}
/**
@@ -249,11 +254,7 @@ public class FlowInputOutput {
}
final Map<String, ResolvableInput> resolvableInputMap = Collections.unmodifiableMap(inputs.stream()
.map(input -> {
// get value or default
Object value = Optional.ofNullable((Object) data.get(input.getId())).orElseGet(input::getDefaults);
return ResolvableInput.of(input, value);
})
.map(input -> ResolvableInput.of(input,data.get(input.getId())))
.collect(Collectors.toMap(it -> it.get().input().getId(), Function.identity(), (o1, o2) -> o1, LinkedHashMap::new)));
resolvableInputMap.values().forEach(input -> resolveInputValue(input, flow, execution, resolvableInputMap));
@@ -312,8 +313,16 @@ public class FlowInputOutput {
});
resolvable.setInput(input);
Object value = resolvable.get().value();
// resolve default if needed
if (value == null && input.getDefaults() != null) {
value = resolveDefaultValue(input, runContext);
resolvable.isDefault(true);
}
// validate and parse input value
final Object value = resolvable.get().value();
if (value == null) {
if (input.getRequired()) {
resolvable.resolveWithError(input.toConstraintViolationException("missing required input", null));
@@ -341,7 +350,33 @@ public class FlowInputOutput {
return resolvable.get();
}
public static Object resolveDefaultValue(Input<?> input, PropertyContext renderer) throws IllegalVariableEvaluationException {
return switch (input.getType()) {
case STRING, ENUM, SELECT, SECRET, EMAIL -> resolveDefaultPropertyAs(input, renderer, String.class);
case INT -> resolveDefaultPropertyAs(input, renderer, Integer.class);
case FLOAT -> resolveDefaultPropertyAs(input, renderer, Float.class);
case BOOLEAN, BOOL -> resolveDefaultPropertyAs(input, renderer, Boolean.class);
case DATETIME -> resolveDefaultPropertyAs(input, renderer, Instant.class);
case DATE -> resolveDefaultPropertyAs(input, renderer, LocalDate.class);
case TIME -> resolveDefaultPropertyAs(input, renderer, LocalTime.class);
case DURATION -> resolveDefaultPropertyAs(input, renderer, Duration.class);
case FILE, URI -> resolveDefaultPropertyAs(input, renderer, URI.class);
case JSON, YAML -> resolveDefaultPropertyAs(input, renderer, Object.class);
case ARRAY -> resolveDefaultPropertyAsList(input, renderer, Object.class);
case MULTISELECT -> resolveDefaultPropertyAsList(input, renderer, String.class);
};
}
@SuppressWarnings("unchecked")
private static <T> Object resolveDefaultPropertyAs(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException {
return Property.as((Property<T>) input.getDefaults(), renderer, clazz);
}
@SuppressWarnings("unchecked")
private static <T> Object resolveDefaultPropertyAsList(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException {
return Property.asList((Property<List<T>>) input.getDefaults(), renderer, clazz);
}
private RunContext buildRunContextForExecutionAndInputs(final FlowInterface flow, final Execution execution, Map<String, InputAndValue> dependencies) {
Map<String, Object> flattenInputs = MapUtils.flattenToNestedMap(dependencies.entrySet()
.stream()
@@ -368,7 +403,7 @@ public class FlowInputOutput {
final Map<String, Object> in
) {
if (flow.getOutputs() == null) {
return ImmutableMap.of();
return Map.of();
}
Map<String, Object> results = flow
.getOutputs()
@@ -376,6 +411,9 @@ public class FlowInputOutput {
.map(output -> {
Object current = in == null ? null : in.get(output.getId());
try {
if (current == null && Boolean.FALSE.equals(output.getRequired())) {
return Optional.of(new AbstractMap.SimpleEntry<>(output.getId(), null));
}
return parseData(execution, output, current)
.map(entry -> {
if (output.getType().equals(Type.SECRET)) {
@@ -406,7 +444,7 @@ public class FlowInputOutput {
if (data.getType() == null) {
return Optional.of(new AbstractMap.SimpleEntry<>(data.getId(), current));
}
final Type elementType = data instanceof ItemTypeInterface itemTypeInterface ? itemTypeInterface.getItemType() : null;
return Optional.of(new AbstractMap.SimpleEntry<>(
@@ -483,6 +521,30 @@ public class FlowInputOutput {
throw new Exception("Expected `" + type + "` but received `" + current + "` with errors:\n```\n" + e.getMessage() + "\n```");
}
}
public static Map<String, Object> renderFlowOutputs(List<Output> outputs, RunContext runContext) throws IllegalVariableEvaluationException {
if (outputs == null) return Map.of();
// render required outputs
Map<String, Object> outputsById = outputs
.stream()
.filter(output -> output.getRequired() == null || output.getRequired())
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
outputsById = runContext.render(outputsById);
// render optional outputs one by one to catch, log, and skip any error.
for (io.kestra.core.models.flows.Output output : outputs) {
if (Boolean.FALSE.equals(output.getRequired())) {
try {
outputsById.putAll(runContext.render(Map.of(output.getId(), output.getValue())));
} catch (Exception e) {
runContext.logger().warn("Failed to render optional flow output '{}'. Output is ignored.", output.getId(), e);
outputsById.put(output.getId(), null);
}
}
}
return outputsById;
}
/**
* Mutable wrapper to hold a flow's input, and it's resolved value.
@@ -511,22 +573,26 @@ public class FlowInputOutput {
return input;
}
public void isDefault(boolean isDefault) {
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), isDefault, this.input.exception());
}
public void setInput(final Input<?> input) {
this.input = new InputAndValue(input, this.input.value(), this.input.enabled(), this.input.exception());
this.input = new InputAndValue(input, this.input.value(), this.input.enabled(), this.input.isDefault(), this.input.exception());
}
public void resolveWithEnabled(boolean enabled) {
this.input = new InputAndValue(this.input.input(), input.value(), enabled, this.input.exception());
this.input = new InputAndValue(this.input.input(), input.value(), enabled, this.input.isDefault(), this.input.exception());
markAsResolved();
}
public void resolveWithValue(@Nullable Object value) {
this.input = new InputAndValue(this.input.input(), value, this.input.enabled(), this.input.exception());
this.input = new InputAndValue(this.input.input(), value, this.input.enabled(), this.input.isDefault(), this.input.exception());
markAsResolved();
}
public void resolveWithError(@Nullable ConstraintViolationException exception) {
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), exception);
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), this.input.isDefault(), exception);
markAsResolved();
}

View File

@@ -286,18 +286,10 @@ public class FlowableUtils {
// start as many tasks as we have concurrency slots
return collect.values().stream()
.map(resolvedTasks -> filterCreated(resolvedTasks, taskRuns, parentTaskRun))
.map(resolvedTasks -> resolveSequentialNexts(execution, resolvedTasks, null, null, parentTaskRun))
.filter(resolvedTasks -> !resolvedTasks.isEmpty())
.limit(concurrencySlots)
.map(resolvedTasks -> resolvedTasks.getFirst().toNextTaskRun(execution))
.toList();
}
private static List<ResolvedTask> filterCreated(List<ResolvedTask> tasks, List<TaskRun> taskRuns, TaskRun parentTaskRun) {
return tasks.stream()
.filter(resolvedTask -> taskRuns.stream()
.noneMatch(taskRun -> FlowableUtils.isTaskRunFor(resolvedTask, taskRun, parentTaskRun))
)
.map(resolvedTasks -> resolvedTasks.getFirst())
.toList();
}

View File

@@ -6,6 +6,7 @@ import io.kestra.core.encryption.EncryptionService;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.executions.AbstractMetricEntry;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.property.PropertyContext;
import io.kestra.core.storages.StateStore;
import io.kestra.core.storages.Storage;
import io.kestra.core.storages.kv.KVStore;
@@ -18,7 +19,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
public abstract class RunContext {
public abstract class RunContext implements PropertyContext {
/**
* Returns the trigger execution id attached to this context.

View File

@@ -9,6 +9,7 @@ import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Type;
import io.kestra.core.models.property.PropertyContext;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.plugins.PluginConfigurations;
@@ -77,7 +78,7 @@ public class RunContextFactory {
public RunContextInitializer initializer() {
return applicationContext.getBean(RunContextInitializer.class);
}
public RunContext of(FlowInterface flow, Execution execution) {
return of(flow, execution, Function.identity());
}
@@ -98,7 +99,7 @@ public class RunContextFactory {
.withDecryptVariables(true)
.withSecretInputs(secretInputsFromFlow(flow))
)
.build(runContextLogger))
.build(runContextLogger, PropertyContext.create(variableRenderer)))
.withSecretInputs(secretInputsFromFlow(flow))
.build();
}
@@ -127,7 +128,7 @@ public class RunContextFactory {
.withTaskRun(taskRun)
.withDecryptVariables(decryptVariables)
.withSecretInputs(secretInputsFromFlow(flow))
.build(runContextLogger))
.build(runContextLogger, PropertyContext.create(variableRenderer)))
.withKvStoreService(kvStoreService)
.withSecretInputs(secretInputsFromFlow(flow))
.withTask(task)
@@ -146,7 +147,7 @@ public class RunContextFactory {
.withFlow(flow)
.withTrigger(trigger)
.withSecretInputs(secretInputsFromFlow(flow))
.build(runContextLogger)
.build(runContextLogger, PropertyContext.create(variableRenderer))
)
.withSecretInputs(secretInputsFromFlow(flow))
.withTrigger(trigger)

View File

@@ -4,15 +4,11 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.Validator;
import lombok.extern.slf4j.Slf4j;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static io.kestra.core.utils.Rethrow.throwFunction;
@@ -27,12 +23,19 @@ public class RunContextProperty<T> {
private final RunContext runContext;
private final Task task;
private final AbstractTrigger trigger;
private final boolean skipCache;
RunContextProperty(Property<T> property, RunContext runContext) {
this(property, runContext, false);
}
RunContextProperty(Property<T> property, RunContext runContext, boolean skipCache) {
this.property = property;
this.runContext = runContext;
this.task = ((DefaultRunContext) runContext).getTask();
this.trigger = ((DefaultRunContext) runContext).getTrigger();
this.skipCache = skipCache;
}
private void validate() {
@@ -45,6 +48,19 @@ public class RunContextProperty<T> {
log.trace("Unable to do validation: no task or trigger found");
}
}
/**
* Returns a new {@link RunContextProperty} that will always be rendered by evaluating
* its original Pebble expression, without using any previously cached value.
* <p>
* This ensures that each time the property is rendered, the underlying
* expression is re-evaluated to produce a fresh result.
*
* @return a new {@link Property} that bypasses the cache
*/
public RunContextProperty<T> skipCache() {
return new RunContextProperty<>(this.property, this.runContext, true);
}
/**
* Render a property then convert it to its target type and validate it.<br>
@@ -55,13 +71,13 @@ public class RunContextProperty<T> {
* Warning, due to the caching mechanism, this method is not thread-safe.
*/
public Optional<T> as(Class<T> clazz) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.as(prop, this.runContext, clazz)));
validate();
return as;
}
/**
* Render a property with additional variables, then convert it to its target type and validate it.<br>
*
@@ -71,7 +87,7 @@ public class RunContextProperty<T> {
* Warning, due to the caching mechanism, this method is not thread-safe.
*/
public Optional<T> as(Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.as(prop, this.runContext, clazz, variables)));
validate();
@@ -89,7 +105,7 @@ public class RunContextProperty<T> {
*/
@SuppressWarnings("unchecked")
public <I> T asList(Class<I> itemClazz) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz)))
.orElse((T) Collections.emptyList());
@@ -108,7 +124,7 @@ public class RunContextProperty<T> {
*/
@SuppressWarnings("unchecked")
public <I> T asList(Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz, variables)))
.orElse((T) Collections.emptyList());
@@ -127,7 +143,7 @@ public class RunContextProperty<T> {
*/
@SuppressWarnings("unchecked")
public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass)))
.orElse((T) Collections.emptyMap());
@@ -146,11 +162,15 @@ public class RunContextProperty<T> {
*/
@SuppressWarnings("unchecked")
public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass, variables)))
.orElse((T) Collections.emptyMap());
validate();
return as;
}
private Property<T> getProperty() {
return skipCache ? this.property.skipCache() : this.property;
}
}

View File

@@ -1,6 +1,7 @@
package io.kestra.core.runners;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
@@ -9,6 +10,7 @@ import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.input.SecretInput;
import io.kestra.core.models.property.PropertyContext;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.utils.ListUtils;
@@ -138,10 +140,10 @@ public final class RunVariables {
* @param logger The {@link RunContextLogger logger}
* @return The immutable map of variables.
*/
Map<String, Object> build(final RunContextLogger logger);
Map<String, Object> build(RunContextLogger logger, PropertyContext propertyContext);
}
public record KestraConfiguration(String environment, String url) { }
public record KestraConfiguration(String environment, String url) { }
/**
* Default builder class for constructing variables.
@@ -174,7 +176,7 @@ public final class RunVariables {
// Note: for performance reason, cloning maps should be avoided as much as possible.
@Override
public Map<String, Object> build(final RunContextLogger logger) {
public Map<String, Object> build(final RunContextLogger logger, final PropertyContext propertyContext) {
ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
builder.put("envs", envs != null ? envs : Map.of());
@@ -280,9 +282,15 @@ public final class RunVariables {
if (flow != null && flow.getInputs() != null) {
// we add default inputs value from the flow if not already set, this will be useful for triggers
flow.getInputs().stream()
.filter(input -> input.getDefaults() != null && !inputs.containsKey(input.getId()))
.forEach(input -> inputs.put(input.getId(), input.getDefaults()));
flow.getInputs().stream()
.filter(input -> input.getDefaults() != null && !inputs.containsKey(input.getId()))
.forEach(input -> {
try {
inputs.put(input.getId(), FlowInputOutput.resolveDefaultValue(input, propertyContext));
} catch (IllegalVariableEvaluationException e) {
throw new RuntimeException("Unable to inject default value for input '" + input.getId() + "'", e);
}
});
}
if (!inputs.isEmpty()) {

View File

@@ -85,7 +85,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
@Inject
@Named(QueueFactoryInterface.WORKERJOB_NAMED)
private QueueInterface<WorkerJob> workerJobQueue;
private WorkerJobQueueInterface workerJobQueue;
@Inject
@Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED)
@@ -274,12 +274,11 @@ public class Worker implements Service, Runnable, AutoCloseable {
}
}));
this.receiveCancellations.addFirst(this.workerJobQueue.receive(
this.receiveCancellations.addFirst(this.workerJobQueue.subscribe(
this.id,
this.workerGroup,
Worker.class,
either -> {
pendingJobCount.incrementAndGet();
executorService.execute(() -> {
pendingJobCount.decrementAndGet();
runningJobCount.incrementAndGet();
@@ -764,6 +763,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
workerTask = workerTask.withTaskRun(workerTask.getTaskRun().withState(state));
WorkerTaskResult workerTaskResult = new WorkerTaskResult(workerTask.getTaskRun(), dynamicTaskRuns);
this.workerTaskResultQueue.emit(workerTaskResult);
// upload the cache file, hash may not be present if we didn't succeed in computing it
@@ -796,6 +796,10 @@ public class Worker implements Service, Runnable, AutoCloseable {
// If it's a message too big, we remove the outputs
failed = failed.withOutputs(Variables.empty());
}
if (e instanceof UnsupportedMessageException) {
// we expect the offending char is in the output so we remove it
failed = failed.withOutputs(Variables.empty());
}
WorkerTaskResult workerTaskResult = new WorkerTaskResult(failed);
RunContextLogger contextLogger = runContextLoggerFactory.create(workerTask);
contextLogger.logger().error("Unable to emit the worker task result to the queue: {}", e.getMessage(), e);
@@ -818,7 +822,11 @@ public class Worker implements Service, Runnable, AutoCloseable {
private Optional<String> hashTask(RunContext runContext, Task task) {
try {
var map = JacksonMapper.toMap(task);
var rMap = runContext.render(map);
// If there are task provided variables, rendering the task may fail.
// The best we can do is to add a fake 'workingDir' as it's an often added variables,
// and it should not be part of the task hash.
Map<String, Object> variables = Map.of("workingDir", "workingDir");
var rMap = runContext.render(map, variables);
var json = JacksonMapper.ofJson().writeValueAsBytes(rMap);
MessageDigest digest = MessageDigest.getInstance("SHA-256");
digest.update(json);

View File

@@ -102,6 +102,19 @@ public abstract class AbstractDate {
}
if (value instanceof Long longValue) {
if(value.toString().length() == 13) {
return Instant.ofEpochMilli(longValue).atZone(zoneId);
}else if(value.toString().length() == 19 ){
if(value.toString().endsWith("000")){
long seconds = longValue/1_000_000_000;
int nanos = (int) (longValue%1_000_000_000);
return Instant.ofEpochSecond(seconds,nanos).atZone(zoneId);
}else{
long milliseconds = longValue/1_000_000;
int micros = (int) (longValue%1_000_000);
return Instant.ofEpochMilli(milliseconds).atZone(zoneId).withNano(micros*1000);
}
}
return Instant.ofEpochSecond(longValue).atZone(zoneId);
}

View File

@@ -8,6 +8,7 @@ import io.kestra.core.events.CrudEventType;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
@@ -29,10 +30,7 @@ import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.server.ServiceType;
import io.kestra.core.services.*;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.Either;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.*;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.core.util.CollectionUtils;
@@ -91,7 +89,9 @@ public abstract class AbstractScheduler implements Scheduler, Service {
private volatile Boolean isReady = false;
private final ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> scheduledFuture;
private final ScheduledExecutorService executionMonitorExecutor = Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> executionMonitorFuture;
@Getter
protected SchedulerTriggerStateInterface triggerState;
@@ -152,7 +152,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
this.flowListeners.run();
this.flowListeners.listen(this::initializedTriggers);
ScheduledFuture<?> evaluationLoop = scheduleExecutor.scheduleAtFixedRate(
scheduledFuture = scheduleExecutor.scheduleAtFixedRate(
this::handle,
0,
1,
@@ -162,10 +162,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
// look at exception on the evaluation loop thread
Thread.ofVirtual().name("scheduler-evaluation-loop-watch").start(
() -> {
Await.until(evaluationLoop::isDone);
Await.until(scheduledFuture::isDone);
try {
evaluationLoop.get();
scheduledFuture.get();
} catch (CancellationException ignored) {
} catch (ExecutionException | InterruptedException e) {
@@ -177,7 +177,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
);
// Periodically report metrics and logs of running executions
ScheduledFuture<?> monitoringLoop = executionMonitorExecutor.scheduleWithFixedDelay(
executionMonitorFuture = executionMonitorExecutor.scheduleWithFixedDelay(
this::executionMonitor,
30,
10,
@@ -187,10 +187,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
// look at exception on the monitoring loop thread
Thread.ofVirtual().name("scheduler-monitoring-loop-watch").start(
() -> {
Await.until(monitoringLoop::isDone);
Await.until(executionMonitorFuture::isDone);
try {
monitoringLoop.get();
executionMonitorFuture.get();
} catch (CancellationException ignored) {
} catch (ExecutionException | InterruptedException e) {
@@ -318,7 +318,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
}
synchronized (this) { // we need a sync block as we read then update so we should not do it in multiple threads concurrently
List<Trigger> triggers = triggerState.findAllForAllTenants();
Map<String, Trigger> triggers = triggerState.findAllForAllTenants().stream().collect(Collectors.toMap(HasUID::uid, Function.identity()));
flows
.stream()
@@ -328,7 +328,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
.flatMap(flow -> flow.getTriggers().stream().filter(trigger -> trigger instanceof WorkerTriggerInterface).map(trigger -> new FlowAndTrigger(flow, trigger)))
.distinct()
.forEach(flowAndTrigger -> {
Optional<Trigger> trigger = triggers.stream().filter(t -> t.uid().equals(Trigger.uid(flowAndTrigger.flow(), flowAndTrigger.trigger()))).findFirst(); // must have one or none
String triggerUid = Trigger.uid(flowAndTrigger.flow(), flowAndTrigger.trigger());
Optional<Trigger> trigger = Optional.ofNullable(triggers.get(triggerUid));
if (trigger.isEmpty()) {
RunContext runContext = runContextFactory.of(flowAndTrigger.flow(), flowAndTrigger.trigger());
ConditionContext conditionContext = conditionService.conditionContext(runContext, flowAndTrigger.flow(), null);
@@ -467,9 +468,12 @@ public abstract class AbstractScheduler implements Scheduler, Service {
private List<FlowWithTriggers> computeSchedulable(List<FlowWithSource> flows, List<Trigger> triggerContextsToEvaluate, ScheduleContextInterface scheduleContext) {
List<String> flowToKeep = triggerContextsToEvaluate.stream().map(Trigger::getFlowId).toList();
List<String> flowIds = flows.stream().map(FlowId::uidWithoutRevision).toList();
Map<String, Trigger> triggerById = triggerContextsToEvaluate.stream().collect(Collectors.toMap(HasUID::uid, Function.identity()));
// delete trigger which flow has been deleted
triggerContextsToEvaluate.stream()
.filter(trigger -> !flows.stream().map(FlowId::uidWithoutRevision).toList().contains(FlowId.uid(trigger)))
.filter(trigger -> !flowIds.contains(FlowId.uid(trigger)))
.forEach(trigger -> {
try {
this.triggerState.delete(trigger);
@@ -491,12 +495,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
.map(abstractTrigger -> {
RunContext runContext = runContextFactory.of(flow, abstractTrigger);
ConditionContext conditionContext = conditionService.conditionContext(runContext, flow, null);
Trigger triggerContext = null;
Trigger lastTrigger = triggerContextsToEvaluate
.stream()
.filter(triggerContextToFind -> triggerContextToFind.uid().equals(Trigger.uid(flow, abstractTrigger)))
.findFirst()
.orElse(null);
Trigger triggerContext;
Trigger lastTrigger = triggerById.get(Trigger.uid(flow, abstractTrigger));
// If a trigger is not found in triggers to evaluate, then we ignore it
if (lastTrigger == null) {
return null;
@@ -1006,8 +1006,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
setState(ServiceState.TERMINATING);
this.receiveCancellations.forEach(Runnable::run);
this.scheduleExecutor.shutdown();
this.executionMonitorExecutor.shutdown();
ExecutorsUtils.closeScheduledThreadPool(this.scheduleExecutor, Duration.ofSeconds(5), List.of(scheduledFuture));
ExecutorsUtils.closeScheduledThreadPool(executionMonitorExecutor, Duration.ofSeconds(5), List.of(executionMonitorFuture));
try {
if (onClose != null) {
onClose.run();

View File

@@ -1,6 +1,7 @@
package io.kestra.core.server;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.utils.ExecutorsUtils;
import io.micronaut.core.annotation.Introspected;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
@@ -8,9 +9,11 @@ import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -25,6 +28,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
protected final ServerConfig serverConfig;
private final AtomicBoolean isStopped = new AtomicBoolean(false);
private ScheduledExecutorService scheduledExecutorService;
private ScheduledFuture<?> scheduledFuture;
private Instant lastScheduledExecution;
/**
@@ -98,7 +102,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, name));
Duration scheduleInterval = getScheduleInterval();
log.debug("Scheduling '{}' at fixed rate {}.", name, scheduleInterval);
scheduledExecutorService.scheduleAtFixedRate(
scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(
this,
0,
scheduleInterval.toSeconds(),
@@ -133,20 +137,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
@Override
public void close() {
if (isStopped.compareAndSet(false, true) && scheduledExecutorService != null) {
scheduledExecutorService.shutdown();
if (scheduledExecutorService.isTerminated()) {
return;
}
try {
if (!scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS)) {
log.debug("Failed to wait for scheduled '{}' task termination. Cause: Timeout", name);
}
log.debug("Stopped scheduled '{}' task.", name);
} catch (InterruptedException e) {
scheduledExecutorService.shutdownNow();
Thread.currentThread().interrupt();
log.debug("Failed to wait for scheduled '{}' task termination. Cause: Interrupted.", name);
}
ExecutorsUtils.closeScheduledThreadPool(scheduledExecutorService, Duration.ofSeconds(5), List.of(scheduledFuture));
}
}
}

View File

@@ -250,9 +250,10 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
stateLock.lock();
// Optional callback to be executed at the end.
Runnable returnCallback = null;
localServiceState = localServiceState(service);
try {
localServiceState = localServiceState(service);
if (localServiceState == null) {
return null; // service has been unregistered.
}
@@ -301,7 +302,7 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
// Update the local instance
this.serviceRegistry.register(localServiceState.with(remoteInstance));
} catch (Exception e) {
final ServiceInstance localInstance = localServiceState(service).instance();
final ServiceInstance localInstance = localServiceState.instance();
log.error("[Service id={}, type='{}', hostname='{}'] Failed to update state to {}. Error: {}",
localInstance.uid(),
localInstance.type(),
@@ -317,7 +318,7 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
returnCallback.run();
}
}
return localServiceState(service).instance();
return Optional.ofNullable(localServiceState(service)).map(LocalServiceState::instance).orElse(null);
}
private void mayDisableStateUpdate(final Service service, final ServiceInstance instance) {
@@ -371,9 +372,11 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
final Service service,
final ServiceInstance instance,
final boolean isLivenessEnabled) {
// Never shutdown STANDALONE server or WEB_SERVER service.
if (instance.server().type().equals(ServerInstance.Type.STANDALONE) ||
instance.is(ServiceType.WEBSERVER)) {
// Never shutdown STANDALONE server or WEBSERVER and INDEXER services.
if (ServerInstance.Type.STANDALONE.equals(instance.server().type()) ||
instance.is(ServiceType.INDEXER) ||
instance.is(ServiceType.WEBSERVER)
) {
// Force the RUNNING state.
return Optional.of(instance.state(Service.ServiceState.RUNNING, now, null));
}

View File

@@ -1,5 +1,8 @@
package io.kestra.core.server;
import com.fasterxml.jackson.annotation.JsonCreator;
import io.kestra.core.utils.Enums;
/**
* Supported Kestra's service types.
*/
@@ -9,4 +12,14 @@ public enum ServiceType {
SCHEDULER,
WEBSERVER,
WORKER,
INVALID;
@JsonCreator
public static ServiceType fromString(final String value) {
try {
return Enums.getForNameIgnoreCase(value, ServiceType.class, INVALID);
} catch (IllegalArgumentException e) {
return INVALID;
}
}
}

View File

@@ -1,22 +0,0 @@
package io.kestra.core.services;
import io.micronaut.context.annotation.Requires;
import io.micronaut.scheduling.annotation.Scheduled;
import lombok.extern.slf4j.Slf4j;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@Singleton
@Slf4j
@Requires(property = "kestra.anonymous-usage-report.enabled", value = "true")
@Requires(property = "kestra.server-type")
public class CollectorScheduler {
@Inject
protected CollectorService collectorService;
@Scheduled(initialDelay = "${kestra.anonymous-usage-report.initial-delay}", fixedDelay = "${kestra.anonymous-usage-report.fixed-delay}")
public void report() {
collectorService.report();
}
}

View File

@@ -1,220 +0,0 @@
package io.kestra.core.services;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.ServerType;
import io.kestra.core.models.collectors.*;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.ServiceInstanceRepositoryInterface;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.VersionProvider;
import io.micrometer.core.instrument.Timer;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Value;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.annotation.Client;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.hateoas.JsonError;
import io.micronaut.reactor.http.client.ReactorHttpClient;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.lang.management.ManagementFactory;
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@Singleton
@Slf4j
public class CollectorService {
protected static final String UUID = IdUtils.create();
@Inject
@Client
protected ReactorHttpClient client;
@Inject
protected ApplicationContext applicationContext;
@Inject
private FlowRepositoryInterface flowRepository;
@Inject
private ExecutionRepositoryInterface executionRepository;
@Inject
protected InstanceService instanceService;
@Inject
protected VersionProvider versionProvider;
@Inject
protected PluginRegistry pluginRegistry;
@Nullable
@Value("${kestra.server-type}")
protected ServerType serverType;
@Nullable
@Value("${kestra.url:}")
protected String kestraUrl;
@Value("${kestra.anonymous-usage-report.uri}")
protected URI url;
@Inject
private ServiceInstanceRepositoryInterface serviceRepository;
@Inject
private MetricRegistry metricRegistry;
private transient Usage defaultUsage;
protected synchronized Usage defaultUsage() {
boolean first = defaultUsage == null;
if (first) {
defaultUsage = Usage.builder()
.startUuid(UUID)
.instanceUuid(instanceService.fetch())
.serverType(serverType)
.version(versionProvider.getVersion())
.zoneId(ZoneId.systemDefault())
.uri(kestraUrl == null ? null : kestraUrl)
.environments(applicationContext.getEnvironment().getActiveNames())
.startTime(Instant.ofEpochMilli(ManagementFactory.getRuntimeMXBean().getStartTime()))
.host(HostUsage.of())
.configurations(ConfigurationUsage.of(applicationContext))
.plugins(PluginUsage.of(pluginRegistry))
.build();
}
return defaultUsage;
}
public Usage metrics(boolean details) {
return metrics(details, serverType == ServerType.WORKER || serverType == ServerType.SCHEDULER || serverType == ServerType.STANDALONE);
}
public Usage metrics(boolean details, boolean metrics) {
ZonedDateTime to = ZonedDateTime.now();
ZonedDateTime from = to
.toLocalDate()
.atStartOfDay(ZoneId.systemDefault())
.minusDays(1);
return metrics(details, metrics, from, to);
}
public Usage metrics(boolean details, boolean metrics, ZonedDateTime from, ZonedDateTime to) {
Usage.UsageBuilder<?, ?> builder = defaultUsage()
.toBuilder()
.uuid(IdUtils.create());
if (details) {
builder = builder
.flows(FlowUsage.of(flowRepository))
.executions(ExecutionUsage.of(executionRepository, from, to))
.services(ServiceUsage.of(from.toInstant(), to.toInstant(), serviceRepository, Duration.ofMinutes(5)));
}
if (metrics) {
builder = builder.pluginMetrics(pluginMetrics());
}
return builder.build();
}
public void report() {
try {
Usage metrics = this.metrics(serverType == ServerType.EXECUTOR || serverType == ServerType.STANDALONE);
MutableHttpRequest<Usage> post = this.request(metrics);
if (log.isTraceEnabled()) {
log.trace("Report anonymous usage: '{}'", JacksonMapper.ofJson().writeValueAsString(metrics));
}
Result result = client.toBlocking()
.retrieve(
post,
Argument.of(Result.class),
Argument.of(JsonError.class)
);
this.handleResponse(result);
} catch (HttpClientResponseException t) {
log.debug("Unable to report anonymous usage with body '{}'", t.getResponse().getBody(String.class), t);
} catch (Exception t) {
log.debug("Unable to handle anonymous usage", t);
}
}
private void handleResponse(Result result) {
}
protected MutableHttpRequest<Usage> request(Usage metrics) throws Exception {
return HttpRequest.POST(this.url, metrics)
.header("User-Agent", "Kestra/" + versionProvider.getVersion());
}
private List<PluginMetric> pluginMetrics() {
List<PluginMetric> taskMetrics = pluginRegistry.plugins().stream()
.flatMap(registeredPlugin -> registeredPlugin.getTasks().stream())
.map(cls -> cls.getName())
.map(type -> taskMetric(type))
.filter(opt -> opt.isPresent())
.map(opt -> opt.get())
.toList();
List<PluginMetric> triggerMetrics = pluginRegistry.plugins().stream()
.flatMap(registeredPlugin -> registeredPlugin.getTriggers().stream())
.map(cls -> cls.getName())
.map(type -> triggerMetric(type))
.filter(opt -> opt.isPresent())
.map(opt -> opt.get())
.toList();
return ListUtils.concat(taskMetrics, triggerMetrics);
}
private Optional<PluginMetric> taskMetric(String type) {
Timer duration = metricRegistry.find(MetricRegistry.METRIC_WORKER_ENDED_DURATION).tag(MetricRegistry.TAG_TASK_TYPE, type).timer();
return fromTimer(type, duration);
}
private Optional<PluginMetric> triggerMetric(String type) {
Timer duration = metricRegistry.find(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION).tag(MetricRegistry.TAG_TRIGGER_TYPE, type).timer();
if (duration == null) {
// this may be because this is a trigger executed by the scheduler, we search there instead
duration = metricRegistry.find(MetricRegistry.METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION).tag(MetricRegistry.TAG_TRIGGER_TYPE, type).timer();
}
return fromTimer(type, duration);
}
private Optional<PluginMetric> fromTimer(String type, Timer timer) {
if (timer == null || timer.count() == 0) {
return Optional.empty();
}
double count = timer.count();
double totalTime = timer.totalTime(TimeUnit.MILLISECONDS);
double meanTime = timer.mean(TimeUnit.MILLISECONDS);
return Optional.of(new PluginMetric(type, count, totalTime, meanTime));
}
}

View File

@@ -9,6 +9,7 @@ import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.validations.ModelValidator;
@@ -51,7 +52,6 @@ import java.util.stream.StreamSupport;
@Singleton
@Slf4j
public class FlowService {
@Inject
Optional<FlowRepositoryInterface> flowRepository;
@@ -236,6 +236,7 @@ public class FlowService {
}
List<String> warnings = new ArrayList<>(checkValidSubflows(flow, tenantId));
List<io.kestra.plugin.core.trigger.Flow> flowTriggers = ListUtils.emptyOnNull(flow.getTriggers()).stream()
.filter(io.kestra.plugin.core.trigger.Flow.class::isInstance)
.map(io.kestra.plugin.core.trigger.Flow.class::cast)
@@ -246,6 +247,21 @@ public class FlowService {
}
});
// add warning for runnable properties (timeout, workerGroup, taskCache) when used not in a runnable
flow.allTasksWithChilds().forEach(task -> {
if (!(task instanceof RunnableTask<?>)) {
if (task.getTimeout() != null) {
warnings.add("The task '" + task.getId() + "' cannot use the 'timeout' property as it's only relevant for runnable tasks.");
}
if (task.getTaskCache() != null) {
warnings.add("The task '" + task.getId() + "' cannot use the 'taskCache' property as it's only relevant for runnable tasks.");
}
if (task.getWorkerGroup() != null) {
warnings.add("The task '" + task.getId() + "' cannot use the 'workerGroup' property as it's only relevant for runnable tasks.");
}
}
});
return warnings;
}
@@ -531,29 +547,26 @@ public class FlowService {
throw noRepositoryException();
}
List<FlowTopology> flowTopologies = flowTopologyRepository.get().findByFlow(tenant, namespace, id, destinationOnly);
return expandAll ? recursiveFlowTopology(tenant, namespace, id, destinationOnly) : flowTopologies.stream();
return expandAll ? recursiveFlowTopology(new ArrayList<>(), tenant, namespace, id, destinationOnly) : flowTopologyRepository.get().findByFlow(tenant, namespace, id, destinationOnly).stream();
}
private Stream<FlowTopology> recursiveFlowTopology(String tenantId, String namespace, String flowId, boolean destinationOnly) {
private Stream<FlowTopology> recursiveFlowTopology(List<FlowId> flowIds, String tenantId, String namespace, String id, boolean destinationOnly) {
if (flowTopologyRepository.isEmpty()) {
throw noRepositoryException();
}
List<FlowTopology> flowTopologies = flowTopologyRepository.get().findByFlow(tenantId, namespace, flowId, destinationOnly);
List<FlowTopology> subTopologies = flowTopologies.stream()
// filter on destination is not the current node to avoid an infinite loop
.filter(topology -> !(topology.getDestination().getTenantId().equals(tenantId) && topology.getDestination().getNamespace().equals(namespace) && topology.getDestination().getId().equals(flowId)))
.toList();
List<FlowTopology> flowTopologies = flowTopologyRepository.get().findByFlow(tenantId, namespace, id, destinationOnly);
if (subTopologies.isEmpty()) {
FlowId flowId = FlowId.of(tenantId, namespace, id, null);
if (flowIds.contains(flowId)) {
return flowTopologies.stream();
} else {
return Stream.concat(flowTopologies.stream(), subTopologies.stream()
.map(topology -> topology.getDestination())
// recursively fetch child nodes
.flatMap(destination -> recursiveFlowTopology(destination.getTenantId(), destination.getNamespace(), destination.getId(), destinationOnly)));
}
flowIds.add(flowId);
return flowTopologies.stream()
.flatMap(topology -> Stream.of(topology.getDestination(), topology.getSource()))
// recursively fetch child nodes
.flatMap(node -> recursiveFlowTopology(flowIds, node.getTenantId(), node.getNamespace(), node.getId(), destinationOnly));
}
private IllegalStateException noRepositoryException() {

View File

@@ -1,6 +1,7 @@
package io.kestra.core.services;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKind;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
@@ -10,7 +11,6 @@ import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInte
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.utils.ListUtils;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.AllArgsConstructor;
import lombok.Getter;
@@ -24,14 +24,15 @@ import java.util.stream.Stream;
@Singleton
public class FlowTriggerService {
@Inject
private ConditionService conditionService;
private final ConditionService conditionService;
private final RunContextFactory runContextFactory;
private final FlowService flowService;
@Inject
private RunContextFactory runContextFactory;
@Inject
private FlowService flowService;
public FlowTriggerService(ConditionService conditionService, RunContextFactory runContextFactory, FlowService flowService) {
this.conditionService = conditionService;
this.runContextFactory = runContextFactory;
this.flowService = flowService;
}
// used in EE only
public Stream<FlowWithFlowTrigger> withFlowTriggersOnly(Stream<FlowWithSource> allFlows) {
@@ -53,6 +54,8 @@ public class FlowTriggerService {
List<FlowWithFlowTrigger> validTriggersBeforeMultipleConditionEval = allFlows.stream()
// prevent recursive flow triggers
.filter(flow -> flowService.removeUnwanted(flow, execution))
// filter out Test Executions
.filter(flow -> execution.getKind() == null)
// ensure flow & triggers are enabled
.filter(flow -> !flow.isDisabled() && !(flow instanceof FlowWithException))
.filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty())

View File

@@ -10,22 +10,34 @@ import lombok.extern.slf4j.Slf4j;
@Singleton
@Slf4j
public class InstanceService {
private final SettingRepositoryInterface settingRepository;
@Inject
private SettingRepositoryInterface settingRepository;
private Setting instanceIdSetting;
public InstanceService(SettingRepositoryInterface settingRepository) {
this.settingRepository = settingRepository;
}
private volatile Setting instanceIdSetting;
public String fetch() {
if (this.instanceIdSetting == null) {
instanceIdSetting = settingRepository
.findByKey(Setting.INSTANCE_UUID)
.orElseGet(() -> settingRepository.save(Setting.builder()
.key(Setting.INSTANCE_UUID)
.value(IdUtils.create())
.build()
));
synchronized (this) {
if (this.instanceIdSetting == null) {
instanceIdSetting = fetchInstanceUuid();
}
}
}
return this.instanceIdSetting.getValue().toString();
}
private Setting fetchInstanceUuid() {
return settingRepository
.findByKey(Setting.INSTANCE_UUID)
.orElseGet(() -> settingRepository.save(Setting.builder()
.key(Setting.INSTANCE_UUID)
.value(IdUtils.create())
.build()
));
}
}

View File

@@ -54,6 +54,18 @@ public interface StorageInterface extends AutoCloseable, Plugin {
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
InputStream get(String tenantId, @Nullable String namespace, URI uri) throws IOException;
/**
* Retrieves an input stream of a instance resource for the given storage URI.
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace of the object (may be null)
* @param uri the URI of the object to retrieve
* @return an InputStream to read the object's contents
* @throws IOException if the object cannot be read
*/
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
InputStream getInstanceResource(@Nullable String namespace, URI uri) throws IOException;
/**
* Retrieves a storage object along with its metadata.
*
@@ -91,6 +103,18 @@ public interface StorageInterface extends AutoCloseable, Plugin {
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
List<FileAttributes> list(String tenantId, @Nullable String namespace, URI uri) throws IOException;
/**
* Lists the attributes of all instance files and instance directories under the given URI.
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace (may be null)
* @param uri the URI to list
* @return a list of file attributes
* @throws IOException if the listing fails
*/
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
List<FileAttributes> listInstanceResource(@Nullable String namespace, URI uri) throws IOException;
/**
* Checks whether the given URI exists in the internal storage.
*
@@ -108,6 +132,23 @@ public interface StorageInterface extends AutoCloseable, Plugin {
}
}
/**
* Checks whether the given URI exists in the instance internal storage.
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace (may be null)
* @param uri the URI to check
* @return true if the URI exists, false otherwise
*/
@SuppressWarnings("try")
default boolean existsInstanceResource(@Nullable String namespace, URI uri) {
try (InputStream ignored = getInstanceResource(namespace, uri)) {
return true;
} catch (IOException ieo) {
return false;
}
}
/**
* Retrieves the metadata attributes for the given URI.
*
@@ -120,6 +161,18 @@ public interface StorageInterface extends AutoCloseable, Plugin {
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
FileAttributes getAttributes(String tenantId, @Nullable String namespace, URI uri) throws IOException;
/**
* Retrieves the metadata attributes for the given URI.
* n instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace (may be null)
* @param uri the URI of the object
* @return the file attributes
* @throws IOException if the attributes cannot be retrieved
*/
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
FileAttributes getInstanceAttributes(@Nullable String namespace, URI uri) throws IOException;
/**
* Stores data at the given URI.
*
@@ -148,34 +201,86 @@ public interface StorageInterface extends AutoCloseable, Plugin {
@Retryable(includes = {IOException.class})
URI put(String tenantId, @Nullable String namespace, URI uri, StorageObject storageObject) throws IOException;
/**
* Stores instance data at the given URI.
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace (may be null)
* @param uri the target URI
* @param data the input stream containing the data to store
* @return the URI of the stored object
* @throws IOException if storing fails
*/
@Retryable(includes = {IOException.class})
default URI putInstanceResource(@Nullable String namespace, URI uri, InputStream data) throws IOException {
return this.putInstanceResource(namespace, uri, new StorageObject(null, data));
}
/**
* Stores a instance storage object at the given URI.
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace (may be null)
* @param uri the target URI
* @param storageObject the storage object to store
* @return the URI of the stored object
* @throws IOException if storing fails
*/
@Retryable(includes = {IOException.class})
URI putInstanceResource(@Nullable String namespace, URI uri, StorageObject storageObject) throws IOException;
/**
* Deletes the object at the given URI.
*
* @param tenantId the tenant identifier (may be null for global deletion)
* @param tenantId the tenant identifier
* @param namespace the namespace (may be null)
* @param uri the URI of the object to delete
* @return true if deletion was successful
* @throws IOException if deletion fails
*/
@Retryable(includes = {IOException.class})
boolean delete(@Nullable String tenantId, @Nullable String namespace, URI uri) throws IOException;
boolean delete(String tenantId, @Nullable String namespace, URI uri) throws IOException;
/**
* Deletes the instance object at the given URI.
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace (may be null)
* @param uri the URI of the object to delete
* @return true if deletion was successful
* @throws IOException if deletion fails
*/
@Retryable(includes = {IOException.class})
boolean deleteInstanceResource(@Nullable String namespace, URI uri) throws IOException;
/**
* Creates a new directory at the given URI.
*
* @param tenantId the tenant identifier (optional)
* @param tenantId the tenant identifier
* @param namespace the namespace (optional)
* @param uri the URI of the directory to create
* @return the URI of the created directory
* @throws IOException if creation fails
*/
@Retryable(includes = {IOException.class})
URI createDirectory(@Nullable String tenantId, @Nullable String namespace, URI uri) throws IOException;
URI createDirectory(String tenantId, @Nullable String namespace, URI uri) throws IOException;
/**
* Creates a new instance directory at the given URI.
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace
* @param uri the URI of the directory to create
* @return the URI of the created directory
* @throws IOException if creation fails
*/
@Retryable(includes = {IOException.class})
URI createInstanceDirectory(String namespace, URI uri) throws IOException;
/**
* Moves an object from one URI to another.
*
* @param tenantId the tenant identifier (optional)
* @param tenantId the tenant identifier
* @param namespace the namespace (optional)
* @param from the source URI
* @param to the destination URI
@@ -183,7 +288,7 @@ public interface StorageInterface extends AutoCloseable, Plugin {
* @throws IOException if moving fails
*/
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
URI move(@Nullable String tenantId, @Nullable String namespace, URI from, URI to) throws IOException;
URI move(String tenantId, @Nullable String namespace, URI from, URI to) throws IOException;
/**
* Deletes all objects that match the given URI prefix.
@@ -226,23 +331,32 @@ public interface StorageInterface extends AutoCloseable, Plugin {
}
/**
* Builds the internal storage path based on tenant ID and URI.
* Builds the internal storage path based on the URI.
*
* @param tenantId the tenant identifier (maybe null)
* @param uri the URI of the object
* @return a normalized internal path
*/
default String getPath(@Nullable String tenantId, URI uri) {
default String getPath(URI uri) {
if (uri == null) {
uri = URI.create("/");
}
parentTraversalGuard(uri);
String path = uri.getPath();
if (tenantId != null) {
path = tenantId + (path.startsWith("/") ? path : "/" + path);
}
path = path.replaceFirst("^/", "");
return path;
}
/**
* Builds the internal storage path based on tenant ID and URI.
*
* @param tenantId the tenant identifier
* @param uri the URI of the object
* @return a normalized internal path
*/
default String getPath(String tenantId, URI uri) {
String path = getPath(uri);
path = tenantId + (path.startsWith("/") ? path : "/" + path);
return path;
}

View File

@@ -27,14 +27,21 @@ public record TestSuiteRunResult(
public static TestSuiteRunResult of(String id, String testSuiteId, String namespace, String flowId, Instant startDate, Instant endDate, List<UnitTestResult> results) {
boolean allSkipped = true;
boolean anyFailed = false;
for (UnitTestResult result : results) {
if(!result.state().equals(TestState.SKIPPED)) {
allSkipped = false;
}
if(result.state().equals(TestState.ERROR) || result.state().equals(TestState.FAILED)) {
if (result.state().equals(TestState.FAILED)) {
anyFailed = true;
}
if (result.state().equals(TestState.ERROR)) {
return new TestSuiteRunResult(id, testSuiteId, namespace, flowId, result.state(), startDate, endDate, results);
}
}
if (anyFailed) {
return new TestSuiteRunResult(id, testSuiteId, namespace, flowId, TestState.FAILED, startDate, endDate, results);
}
var state = allSkipped ? TestState.SKIPPED : TestState.SUCCESS;
return new TestSuiteRunResult(id, testSuiteId, namespace, flowId, state, startDate, endDate, results);
}

View File

@@ -4,6 +4,7 @@ import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -13,6 +14,7 @@ import java.util.List;
@Builder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
public class UnitTest {
@NotNull
private String id;

View File

@@ -1,37 +1,193 @@
package io.kestra.core.utils;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
public class Either<L, R> {
private final Optional<L> left;
private final Optional<R> right;
private Either(Optional<L> left, Optional<R> right) {
this.left = left;
this.right = right;
}
/**
* Simple {@link Either} monad type.
*
* @param <L> the {@link Left} type.
* @param <R> the {@link Right} type.
*/
public abstract sealed class Either<L, R> permits Either.Left, Either.Right {
public static <L, R> Either<L, R> left(L value) {
return new Either<>(Optional.ofNullable(value), Optional.empty());
return new Left<>(value);
}
public boolean isLeft() {
return this.left.isPresent();
}
public L getLeft() {
return this.left.get();
}
public static <L, R> Either<L, R> right(R value) {
return new Either<>(Optional.empty(), Optional.ofNullable(value));
return new Right<>(value);
}
public boolean isRight() {
return this.right.isPresent();
/**
* Returns {@code true} if this is a {@link Left}, {@code false} otherwise.
*/
public abstract boolean isLeft();
/**
* Returns {@code true} if this is a {@link Right}, {@code false} otherwise.
*/
public abstract boolean isRight();
/**
* Returns the left value.
*
* @throws NoSuchElementException if is not left.
*/
public abstract L getLeft();
/**
* Returns the right value.
*
* @throws NoSuchElementException if is not right.
*/
public abstract R getRight();
public LeftProjection<L, R> left() {
return new LeftProjection<>(this);
}
public R getRight() {
return this.right.get();
public RightProjection<L, R> right() {
return new RightProjection<>(this);
}
}
public <T> T fold(final Function<L, T> fl, final Function<R, T> fr) {
return isLeft() ? fl.apply(getLeft()) : fr.apply(getRight());
}
public static final class Left<L, R> extends Either<L, R> {
private final L value;
private Left(L value) {
this.value = value;
}
/**
* @return {@code true}.
*/
@Override
public boolean isLeft() {
return true;
}
/**
* @return {@code false}.
*/
@Override
public boolean isRight() {
return false;
}
@Override
public L getLeft() {
return value;
}
@Override
public R getRight() {
throw new NoSuchElementException("This is Left");
}
}
public static final class Right<L, R> extends Either<L, R> {
private final R value;
private Right(R value) {
this.value = value;
}
/**
* @return {@code false}.
*/
@Override
public boolean isLeft() {
return false;
}
/**
* @return {@code true}.
*/
@Override
public boolean isRight() {
return true;
}
@Override
public L getLeft() {
throw new NoSuchElementException("This is Right");
}
@Override
public R getRight() {
return value;
}
}
public static class LeftProjection<L, R> {
private final Either<L, R> either;
LeftProjection(final Either<L, R> either) {
Objects.requireNonNull(either, "either can't be null");
this.either = either;
}
public boolean exists() {
return either.isLeft();
}
public L get() {
return either.getLeft();
}
public <LL> Either<LL, R> map(final Function<? super L, ? extends LL> fn) {
if (either.isLeft()) return Either.left(fn.apply(either.getLeft()));
else return Either.right(either.getRight());
}
public <LL> Either<LL, R> flatMap(final Function<? super L, Either<LL, R>> fn) {
if (either.isLeft()) return fn.apply(either.getLeft());
else return Either.right(either.getRight());
}
public Optional<L> toOptional() {
return exists() ? Optional.of(either.getLeft()) : Optional.empty();
}
}
public static class RightProjection<L, R> {
private final Either<L, R> either;
RightProjection(final Either<L, R> either) {
Objects.requireNonNull(either, "either can't be null");
this.either = either;
}
public boolean exists() {
return either.isRight();
}
public R get() {
return either.getRight();
}
public <RR> Either<L, RR> map(final Function<? super R, ? extends RR> fn) {
if (either.isRight()) return Either.right(fn.apply(either.getRight()));
else return Either.left(either.getLeft());
}
public <RR> Either<L, RR> flatMap(final Function<? super R, Either<L, RR>> fn) {
if (either.isRight()) return fn.apply(either.getRight());
else return Either.left(either.getLeft());
}
public Optional<R> toOptional() {
return exists() ? Optional.of(either.getRight()) : Optional.empty();
}
}
}

View File

@@ -4,6 +4,7 @@ import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
@@ -118,6 +119,25 @@ public final class Enums {
));
}
/**
* Convert an object to a list of a specific enum.
* @param value the object to convert to list of enum.
* @param enumClass the class of the enum to convert to.
* @return A list of the corresponding enum type
* @param <T> The type of the enum.
* @throws IllegalArgumentException If the value does not match any enum value.
*/
public static <T extends Enum<T>> List<T> fromList(Object value, Class<T> enumClass) {
return switch (value) {
case List<?> list when !list.isEmpty() && enumClass.isInstance(list.getFirst()) -> (List<T>) list;
case List<?> list when !list.isEmpty() && list.getFirst() instanceof String ->
list.stream().map(item -> Enum.valueOf(enumClass, item.toString().toUpperCase())).collect(Collectors.toList());
case Enum<?> enumValue when enumClass.isInstance(enumValue) -> List.of(enumClass.cast(enumValue));
case String stringValue -> List.of(Enum.valueOf(enumClass, stringValue.toUpperCase()));
default -> throw new IllegalArgumentException("Field requires a " + enumClass.getSimpleName() + " or List<" + enumClass.getSimpleName() + "> value");
};
}
private Enums() {
}
}

View File

@@ -3,12 +3,16 @@ package io.kestra.core.utils;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.*;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
@Singleton
@Slf4j
public class ExecutorsUtils {
@Inject
private ThreadMainFactoryBuilder threadFactoryBuilder;
@@ -61,6 +65,29 @@ public class ExecutorsUtils {
);
}
public static void closeScheduledThreadPool(ScheduledExecutorService scheduledExecutorService, Duration gracePeriod, List<ScheduledFuture<?>> taskFutures) {
scheduledExecutorService.shutdown();
if (scheduledExecutorService.isTerminated()) {
return;
}
try {
if (!scheduledExecutorService.awaitTermination(gracePeriod.toMillis(), TimeUnit.MILLISECONDS)) {
log.warn("Failed to shutdown the ScheduledThreadPoolExecutor during grace period, forcing it to shutdown now");
// Ensure the scheduled task reaches a terminal state to avoid possible memory leak
ListUtils.emptyOnNull(taskFutures).forEach(taskFuture -> taskFuture.cancel(true));
scheduledExecutorService.shutdownNow();
}
log.debug("Stopped scheduled ScheduledThreadPoolExecutor.");
} catch (InterruptedException e) {
scheduledExecutorService.shutdownNow();
Thread.currentThread().interrupt();
log.debug("Failed to shutdown the ScheduledThreadPoolExecutor.");
}
}
private ExecutorService wrap(String name, ExecutorService executorService) {
return ExecutorServiceMetrics.monitor(
meterRegistry,

View File

@@ -55,4 +55,20 @@ public class ListUtils {
return newList;
}
public static List<?> convertToList(Object object){
if (object instanceof List<?> list) {
return list;
} else {
throw new IllegalArgumentException("%s in not an instance of List".formatted(object.getClass()));
}
}
public static List<String> convertToListString(Object object){
if (object instanceof List<?> list && (list.isEmpty() || list.getFirst() instanceof String)) {
return (List<String>) list;
} else {
throw new IllegalArgumentException("%s in not an instance of List of String".formatted(object));
}
}
}

Some files were not shown because too many files have changed in this diff Show More