Compare commits

...

88 Commits

Author SHA1 Message Date
Loïc Mathieu
2adb008dfb chore(system): simulate FIFO processing for messages of the same execution inside the JDBC executor
The JDBC executor process execution and worker task result messages concurrently for performance.
But the first thing it does is to lock the execution to avoid incosistent processing of the same execution.

By simulating a FIFO queue on execution to avoid processing concurrently the same execution (but keep processing different execution concurrently) we improve performance for concurrent task processing as we avoid the cost of lock & wait on the DB.
2025-08-25 09:16:07 +02:00
Anna Geller
552b3d7476 docs: add agents guidelines (#10875) 2025-08-25 08:53:17 +02:00
Florian Hussonnois
795f9c9a17 fix(core): add missing equals/hashcode methods on UnitTest 2025-08-22 13:29:13 +02:00
Florian Hussonnois
df430ded61 fix(system): fix count in AbstractJdbcRepository 2025-08-22 13:29:13 +02:00
Roman Acevedo
a6844e0ecf ci: fix by making inputs accept both dispatch and callable 2025-08-22 11:13:58 +02:00
Roman Acevedo
f71574cfb5 ci: simplify docker ci and push minor semver (#10848)
This PR modify our existing CI to allow publishing our docker image with 2 semver tags.
Example: for a CI on a tag v0.24.99 it will push both tags v0.24.99 and v0.24.

When this CI is settled on this repo (after one micro release for example), I will do the same for EE.

What has been done:

merge docker.yml into workflow-publish-docker.yml
make workflow-publish-docker.yml handle both tags (releases) and develop CI
when in a tag CI, extract the minor version, push it as well as the full vMAJOR.MINOR.PATCH version (see the related issue Add Multiple Semantic Version (SemVer) Tags for Docker Images #10575)
2025-08-22 10:41:59 +02:00
Nicolas K.
c5341e56e9 fix(tests): flaky consumer test (#10853)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-08-22 09:37:43 +02:00
Piyush Bhaskar
88b0723147 fix(flows): fixes revision restore for a flow (#10841) 2025-08-22 12:50:13 +05:30
Piyush Bhaskar
f79fcf5734 chore(versions) : bump the ui-libs (#10862) 2025-08-22 12:45:12 +05:30
Barthélémy Ledoux
cf27827f20 fix(core): when refreshing a multipanel editor, sizes are not kept (#10858) 2025-08-22 12:16:51 +05:30
Barthélémy Ledoux
408b6b97a7 fix(flows): in no-code refreshing a message will update its value (#10851) 2025-08-21 22:32:39 +02:00
Piyush Bhaskar
d57753e62b fix(core): Choose File button and hover on btn text in light theme (#10857) 2025-08-21 23:20:27 +05:30
Nicolas K.
2571eaf56c fix: #4442 extract tenant id from file path (#10850)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-08-21 17:40:10 +02:00
Florian Hussonnois
37ea7f31a0 feat(flows): add pebble expression support for all input defaults (#9762)
Fix: #9762
2025-08-21 17:14:15 +02:00
Roman Acevedo
478c911718 fix(tests): a TestSuite with any ERROR could not end in ERROR 2025-08-21 16:43:16 +02:00
Piyush Bhaskar
1bce0d673f fix(core): update params for flow navigation (#10847) 2025-08-21 19:43:26 +05:30
Florian Hussonnois
609a5b8066 feat(flow): add support for optional flow outputs
Add the new required property to the flow output
model. By default, all flow's outputs are required

Fixes: kestra-io/kestra-ee#3969
2025-08-21 16:09:22 +02:00
Florian Hussonnois
6182015a6f feat(system): report additional server events
Part-of: kestra-io/kestra-ee#3014
2025-08-21 14:50:11 +02:00
brian.mulier
6f8044f347 fix(ai): make sure accept / decline AI banner doesn't hide code editor (#10835)
closes kestra-io/kestra-ee#4273
2025-08-21 14:45:32 +02:00
brian.mulier
b3b7596bf4 fix(ai): AI Copilot instructions for better results (#10835)
closes kestra-io/kestra-ee#4273
2025-08-21 14:45:32 +02:00
brian.mulier
36b1c14424 fix(ai): add instructions for AI Copilot configuration if not enabled yet (#10835)
closes kestra-io/kestra-ee#4273
2025-08-21 14:45:32 +02:00
brian-mulier-p
1aef9578d9 fix(kv): Set task should convert numbers to string if kvType == STRING (#10836) 2025-08-21 09:33:03 +02:00
Piyush Bhaskar
6a07e3c048 fix(core): truncate the overflowing text from button when zoomed #10775 2025-08-21 01:14:28 +05:30
Owen Warnack
b643954921 fix(ui): show lock icon for namespace in No-Code editor (#10667)
Co-authored-by: Piyush Bhaskar <impiyush0012@gmail.com>
2025-08-21 00:06:23 +05:30
Piyush Bhaskar
fe1ae290d0 fix(core): show validation button icon 2025-08-20 23:48:14 +05:30
Hamza
6ae2fde78f fix(core): truncate the overflowing text from button when zoomed (#10775)
Co-authored-by: Piyush Bhaskar <impiyush0012@gmail.com>
2025-08-20 23:28:53 +05:30
Loïc Mathieu
260f5c427b fix(system): properly close the ScheduledExecutorService tasks
This avoids having running threads while the component is supposed to be closed.
2025-08-20 14:23:13 +02:00
dependabot[bot]
f2dbc41cdb build(deps): bump opensearchRestVersion from 3.1.0 to 3.2.0
Bumps `opensearchRestVersion` from 3.1.0 to 3.2.0.

Updates `org.opensearch.client:opensearch-rest-client` from 3.1.0 to 3.2.0
- [Release notes](https://github.com/opensearch-project/OpenSearch/releases)
- [Changelog](https://github.com/opensearch-project/OpenSearch/blob/main/CHANGELOG.md)
- [Commits](https://github.com/opensearch-project/OpenSearch/compare/3.1.0...3.2.0)

Updates `org.opensearch.client:opensearch-rest-high-level-client` from 3.1.0 to 3.2.0
- [Release notes](https://github.com/opensearch-project/OpenSearch/releases)
- [Changelog](https://github.com/opensearch-project/OpenSearch/blob/main/CHANGELOG.md)
- [Commits](https://github.com/opensearch-project/OpenSearch/compare/3.1.0...3.2.0)

---
updated-dependencies:
- dependency-name: org.opensearch.client:opensearch-rest-client
  dependency-version: 3.2.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
- dependency-name: org.opensearch.client:opensearch-rest-high-level-client
  dependency-version: 3.2.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-20 14:00:48 +02:00
dependabot[bot]
39fdb7ed5d build(deps): bump com.github.oshi:oshi-core from 6.8.2 to 6.8.3
Bumps [com.github.oshi:oshi-core](https://github.com/oshi/oshi) from 6.8.2 to 6.8.3.
- [Release notes](https://github.com/oshi/oshi/releases)
- [Changelog](https://github.com/oshi/oshi/blob/master/CHANGELOG.md)
- [Commits](https://github.com/oshi/oshi/compare/oshi-parent-6.8.2...oshi-parent-6.8.3)

---
updated-dependencies:
- dependency-name: com.github.oshi:oshi-core
  dependency-version: 6.8.3
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-20 13:57:41 +02:00
dependabot[bot]
c6b9c445c5 build(deps): bump com.github.docker-java:docker-java-transport-httpclient5
Bumps [com.github.docker-java:docker-java-transport-httpclient5](https://github.com/docker-java/docker-java) from 3.5.3 to 3.6.0.
- [Release notes](https://github.com/docker-java/docker-java/releases)
- [Changelog](https://github.com/docker-java/docker-java/blob/main/CHANGELOG.md)
- [Commits](https://github.com/docker-java/docker-java/compare/3.5.3...3.6.0)

---
updated-dependencies:
- dependency-name: com.github.docker-java:docker-java-transport-httpclient5
  dependency-version: 3.6.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-20 13:57:10 +02:00
dependabot[bot]
da8992f130 build(deps): bump com.google.cloud:libraries-bom from 26.65.0 to 26.66.0
Bumps [com.google.cloud:libraries-bom](https://github.com/googleapis/java-cloud-bom) from 26.65.0 to 26.66.0.
- [Release notes](https://github.com/googleapis/java-cloud-bom/releases)
- [Changelog](https://github.com/googleapis/java-cloud-bom/blob/main/release-please-config.json)
- [Commits](https://github.com/googleapis/java-cloud-bom/compare/v26.65.0...v26.66.0)

---
updated-dependencies:
- dependency-name: com.google.cloud:libraries-bom
  dependency-version: 26.66.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-20 13:56:43 +02:00
dependabot[bot]
e448690086 build(deps): bump software.amazon.awssdk:bom from 2.32.21 to 2.32.26
Bumps software.amazon.awssdk:bom from 2.32.21 to 2.32.26.

---
updated-dependencies:
- dependency-name: software.amazon.awssdk:bom
  dependency-version: 2.32.26
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-20 13:56:21 +02:00
Florian Hussonnois
3929bf6172 feat(system): add distinct server-events for reporting
Refactor the services used to generate periodic reports on server usage.

Related-to: kestra-io/kestra-ee#3014
2025-08-20 12:20:31 +02:00
Piyush Bhaskar
ab9951466d feat(core): implement tab tracking on editor events (#10781) 2025-08-20 14:26:46 +05:30
brian.mulier
ef59a6de26 fix(tests): add test on task runners to assert they can work and transmit their wdir 2025-08-20 09:55:01 +02:00
lizi3
0a64ae7e63 perf(sql):Optimize SQL performance by replacing SQL_CALC_FOUND_ROWS with COUNT(*) 2025-08-20 09:54:39 +02:00
lizi3
8c3cd2856a perf(sql):Optimize SQL performance by replacing SQL_CALC_FOUND_ROWS with COUNT(*) 2025-08-20 09:54:39 +02:00
AJ Emerich
6def8ef831 fix(webhook-trigger): fix documentation typos (#10790) 2025-08-20 09:18:42 +02:00
Piyush Bhaskar
0cc1bffc20 refactor(core): new No Data page when no versioned plugins (#10751)
* refactor(core): new No Data page when no versioned plugins

* chore(core): localize to languages other than english (#10752)

Co-authored-by: GitHub Action <actions@github.com>

---------

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: GitHub Action <actions@github.com>
2025-08-20 12:23:51 +05:30
Barthélémy Ledoux
3bdf55a649 refactor(flows): rename MultiPanelFlowEditorView and extract MultiPanelEditorTabs component (#10783) 2025-08-19 16:31:34 +02:00
Barthélémy Ledoux
767a375292 fix(ui): update context docs menu links and section titles (#10768) 2025-08-19 16:24:16 +02:00
brian.mulier
1509ce9b98 fix(core): change cache policy on files returned by webserver that needs to stay fresh
closes #7499
2025-08-19 11:52:48 +02:00
brian.mulier
5a3f3d3312 fix(namespaces): properly send editor content upon creating / updating ns file
part of #7499
2025-08-19 11:52:48 +02:00
Roman Acevedo
6394c337ae fix(tests): filter out ExecutionKind.TEST from FlowTriggers
- fixes Silence flow trigger on TEST-kind executions kestra-ee#4689
2025-08-19 11:03:48 +02:00
Piyush Bhaskar
be4518466f fix(kv): fixes KV creation using authStore 2025-08-19 11:39:29 +05:30
Piyush Bhaskar
543bed48c9 feat(core): changes to introduce Namespace Context (#10750) 2025-08-19 11:06:58 +05:30
Barthélémy Ledoux
5e57d11b73 refactor: make auth store use pinia (#10558)
Co-authored-by: Piyush Bhaskar <102078527+Piyush-r-bhaskar@users.noreply.github.com>
Co-authored-by: Piyush-r-bhaskar <impiyush0012@gmail.com>
2025-08-18 15:32:13 +02:00
Piyush Bhaskar
98189392a2 fix(core): show flow graph inside blueprint detail (#10771) 2025-08-18 17:59:13 +05:30
Piyush Bhaskar
ac9a01964a refactor(executions): implement splitter for execution outputs (#10677) 2025-08-18 17:58:50 +05:30
Piyush Bhaskar
8479323f97 refactor: migrate Splitpanes for Element Plus el-splitter. (#10669)
Co-authored-by: Bart Ledoux <bledoux@kestra.io>
2025-08-18 16:05:34 +05:30
brian.mulier
4b80b92423 refacto(namespaces): uniformize namespaces usage & retrieval
closes kestra-io/kestra-ee#3500
closes kestra-io/kestra-ee#3356
closes kestra-io/kestra-ee#3163
closes kestra-io/kestra-ee#4713
closes kestra-io/kestra-ee#3210
closes kestra-io/kestra#10700
related to kestra-io/kestra#10701
2025-08-18 12:18:47 +02:00
brian.mulier
2e7d714bcb chore(deps): bump ui-libs from 0.0.237 to 0.0.238 2025-08-18 12:18:47 +02:00
Roman Acevedo
73cf7f04fb test(e2e): make sure used docker image is local 2025-08-18 12:03:44 +02:00
Roman Acevedo
ac0ab7e8fa Revert "build(deps): bump com.gradleup.shadow from 8.3.9 to 9.0.1"
This reverts commit fa6da9bd0b.
2025-08-18 12:03:44 +02:00
Roman Acevedo
c1876e69ed test(e2e): print logs if backend failed to start 2025-08-18 12:03:44 +02:00
Roman Acevedo
cf73a80f2e test(e2e): fix e2e marked as cancelled when near timeout 2025-08-18 12:03:44 +02:00
Barthélémy Ledoux
53687f4a1f fix(core): avoid triggering hundreds of reactivity updates for each icon (#10766) 2025-08-18 11:37:37 +02:00
Florian Hussonnois
749bf94125 fix(core): fix preconditions rendering for ExecutionOutputs (#10651)
Ensure that preconditions are always re-rendered for any
new executions

Changes:
* add new fluent skipCache methods on RunContextProperty and Property
  classes

Fix: #10651
2025-08-18 09:24:58 +02:00
Nicolas K.
25a7994f63 fix(test): disable kafka concurrency queue test (#10755)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-08-14 16:59:21 +02:00
Anna Geller
e03c894f3a fix: spelling 2025-08-14 15:27:09 +02:00
Piyush Bhaskar
99772c1a48 fix(ui): fixes logo cut off on no permission interface (#10739) 2025-08-14 18:43:28 +05:30
Roman Acevedo
93d6b816bf fix(tests): namespace binding was breaking filtering in Flow page
fixes https://github.com/kestra-io/kestra-ee/issues/4691

the additional namespace binding in Tabs was added in PR https://github.com/kestra-io/kestra/pull/10543 to solve the special case of Namespace creation
2025-08-14 13:39:42 +02:00
Nicolas K.
a3b0512bec feat(storages): #10636 add tenant id to mock trigger (#10749)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-08-14 12:07:21 +02:00
Loïc Mathieu
265f72b629 fix(execution): parallel flowable may not ends all child flowable
Parallel flowable tasks like `Parallel`, `Dag` and `ForEach` are racy. When a task fail in a branch, other concurrent branches that have flowable may never ends.
We make sure that all children are terminated when a flowable is itself terminated.

Fixes #6780
2025-08-14 12:06:15 +02:00
YannC
07a8d9a665 fix: avoid file being displayed as diff in namespace file editor (#10746)
close #10744
2025-08-14 10:38:33 +02:00
Piyush Bhaskar
59bd607db2 refactor(misc): add misc module to override (#10737) 2025-08-14 13:48:29 +05:30
Nicolas K.
1618815df4 Feat/add get path without tenant (#10741)
* feat(storages): #10636 add get path without tenant id

* feat(storages): #10636 remove first / from get path method

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-08-13 17:48:03 +02:00
Nicolas K.
a2c3799ab7 feat(storages): #10636 add get path without tenant id (#10740)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-08-13 16:51:09 +02:00
Loïc Mathieu
986a2b4d11 chore(ci): don't run docker PR image workflow on forks 2025-08-13 15:32:41 +02:00
Loïc Mathieu
cdd591dab7 fix(tests): makes JdbcQueueTest less flaky 2025-08-13 14:56:39 +02:00
Malaydewangan09
9f5cf5aeb9 fix(): subgroups for better readability 2025-08-13 14:41:47 +05:30
Nicolas K.
cc5f73ae06 wip(storages): add non tenant dependant method to storage interface (#10637)
* wip(storages): add non tenant dependant method to storage interface

* feat(storages): #10636 add instance method to retrieve resources without the tenant id

* fix(stores): #4353 failing unit tests after now that tenant id can't be null

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-08-13 11:00:25 +02:00
dependabot[bot]
e461e46a1c build(deps): bump io.micrometer:micrometer-core from 1.15.2 to 1.15.3
Bumps [io.micrometer:micrometer-core](https://github.com/micrometer-metrics/micrometer) from 1.15.2 to 1.15.3.
- [Release notes](https://github.com/micrometer-metrics/micrometer/releases)
- [Commits](https://github.com/micrometer-metrics/micrometer/compare/v1.15.2...v1.15.3)

---
updated-dependencies:
- dependency-name: io.micrometer:micrometer-core
  dependency-version: 1.15.3
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-13 10:52:52 +02:00
dependabot[bot]
fa6da9bd0b build(deps): bump com.gradleup.shadow from 8.3.9 to 9.0.1
Bumps [com.gradleup.shadow](https://github.com/GradleUp/shadow) from 8.3.9 to 9.0.1.
- [Release notes](https://github.com/GradleUp/shadow/releases)
- [Commits](https://github.com/GradleUp/shadow/compare/8.3.9...9.0.1)

---
updated-dependencies:
- dependency-name: com.gradleup.shadow
  dependency-version: 9.0.1
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-13 10:52:21 +02:00
dependabot[bot]
3cb6815eac build(deps): bump org.assertj:assertj-core from 3.27.3 to 3.27.4
Bumps [org.assertj:assertj-core](https://github.com/assertj/assertj) from 3.27.3 to 3.27.4.
- [Release notes](https://github.com/assertj/assertj/releases)
- [Commits](https://github.com/assertj/assertj/compare/assertj-build-3.27.3...assertj-build-3.27.4)

---
updated-dependencies:
- dependency-name: org.assertj:assertj-core
  dependency-version: 3.27.4
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-13 10:19:45 +02:00
dependabot[bot]
bde9972b26 build(deps): bump actions/checkout from 4 to 5
Bumps [actions/checkout](https://github.com/actions/checkout) from 4 to 5.
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](https://github.com/actions/checkout/compare/v4...v5)

---
updated-dependencies:
- dependency-name: actions/checkout
  dependency-version: '5'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-13 10:09:06 +02:00
dependabot[bot]
bc828efec9 build(deps): bump software.amazon.awssdk.crt:aws-crt
Bumps [software.amazon.awssdk.crt:aws-crt](https://github.com/awslabs/aws-crt-java) from 0.38.8 to 0.38.9.
- [Release notes](https://github.com/awslabs/aws-crt-java/releases)
- [Commits](https://github.com/awslabs/aws-crt-java/compare/v0.38.8...v0.38.9)

---
updated-dependencies:
- dependency-name: software.amazon.awssdk.crt:aws-crt
  dependency-version: 0.38.9
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-13 10:07:37 +02:00
dependabot[bot]
c62f503f1a build(deps): bump software.amazon.awssdk:bom from 2.32.16 to 2.32.21
Bumps software.amazon.awssdk:bom from 2.32.16 to 2.32.21.

---
updated-dependencies:
- dependency-name: software.amazon.awssdk:bom
  dependency-version: 2.32.21
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-13 10:06:21 +02:00
dependabot[bot]
15a6323122 build(deps): bump flyingSaucerVersion from 9.13.1 to 9.13.2
Bumps `flyingSaucerVersion` from 9.13.1 to 9.13.2.

Updates `org.xhtmlrenderer:flying-saucer-core` from 9.13.1 to 9.13.2
- [Release notes](https://github.com/flyingsaucerproject/flyingsaucer/releases)
- [Changelog](https://github.com/flyingsaucerproject/flyingsaucer/blob/main/CHANGELOG.md)
- [Commits](https://github.com/flyingsaucerproject/flyingsaucer/compare/v9.13.1...v9.13.2)

Updates `org.xhtmlrenderer:flying-saucer-pdf` from 9.13.1 to 9.13.2
- [Release notes](https://github.com/flyingsaucerproject/flyingsaucer/releases)
- [Changelog](https://github.com/flyingsaucerproject/flyingsaucer/blob/main/CHANGELOG.md)
- [Commits](https://github.com/flyingsaucerproject/flyingsaucer/compare/v9.13.1...v9.13.2)

---
updated-dependencies:
- dependency-name: org.xhtmlrenderer:flying-saucer-core
  dependency-version: 9.13.2
  dependency-type: direct:production
  update-type: version-update:semver-patch
- dependency-name: org.xhtmlrenderer:flying-saucer-pdf
  dependency-version: 9.13.2
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-13 10:05:10 +02:00
dependabot[bot]
21cb7b497d build(deps): bump org.jooq:jooq from 3.20.5 to 3.20.6
Bumps org.jooq:jooq from 3.20.5 to 3.20.6.

---
updated-dependencies:
- dependency-name: org.jooq:jooq
  dependency-version: 3.20.6
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-13 10:03:54 +02:00
Loïc Mathieu
26cb6ef9ad fix(execution): concurrency limit didn't work with afterExecutions
This is because the execution is never considered fully terminated so concurrency limit is not handled properly.
This should also affect SLA, trigger lock, and other cleaning stuff.

The root issue is that, with a worker task from afterExecution, there are no other update on the execution itself (as it's already terminated) so no execution messages are again processed by the executor.

Because of that, the worker task result message from the afterExecution block is the last message, but unfortunatly as messages from the worker task result have no flow attached, the computation of the final termination is incorrect.
The solution is to load the flow if null inside the executor and the execution is terminated which should only occurs inside afterExecution.

Fixes #10657
Fixes #8459
Fixes #8609
2025-08-13 09:29:46 +02:00
Piyush Bhaskar
95c438515d fix(core): pass viewTypes to initYamlSource (#10704) 2025-08-13 12:32:17 +05:30
Florian Hussonnois
194ae826e5 chore(system): add WorkerJobQueueInterface to properly pass workerId on subscribe 2025-08-12 19:26:31 +02:00
Prayag
31dbecec77 fix(core): Enter key is now validating filter / refreshing data (#9630)
closes #9471

---------

Co-authored-by: brian.mulier <bmmulier@hotmail.fr>
2025-08-12 17:23:10 +02:00
Anna Geller
b39bcce2e8 fix(translation): close https://github.com/kestra-io/kestra/issues/9857 2025-08-12 13:00:14 +02:00
github-actions[bot]
95ac5ce8a7 chore(core): localize to languages other than english (#10697)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-08-12 12:54:34 +02:00
Piyush Bhaskar
90f913815d fix(core): fix misc store to access configs. (#10692) 2025-08-12 16:24:17 +05:30
Anna Geller
5944db5cc8 fix: translation for sample prompt (#10696) 2025-08-12 12:51:10 +02:00
298 changed files with 6577 additions and 4290 deletions

View File

@@ -2,7 +2,7 @@ name: Auto-Translate UI keys and create PR
on:
schedule:
- cron: "0 9-21 * * *" # Every hour from 9 AM to 9 PM
- cron: "0 9-21/3 * * *" # Every 3 hours from 9 AM to 9 PM
workflow_dispatch:
inputs:
retranslate_modified_keys:
@@ -20,7 +20,7 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
name: Checkout
with:
fetch-depth: 0

View File

@@ -27,7 +27,7 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v4
uses: actions/checkout@v5
with:
# We must fetch at least the immediate parents so that if this is
# a pull request then we can checkout the head.

View File

@@ -1,178 +0,0 @@
name: Create Docker images on Release
on:
workflow_dispatch:
inputs:
retag-latest:
description: 'Retag latest Docker images'
required: true
type: string
default: "false"
options:
- "true"
- "false"
release-tag:
description: 'Kestra Release Tag'
required: false
type: string
plugin-version:
description: 'Plugin version'
required: false
type: string
default: "LATEST"
force-download-artifact:
description: 'Force download artifact'
required: false
type: string
default: "true"
options:
- "true"
- "false"
env:
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
jobs:
plugins:
name: List Plugins
runs-on: ubuntu-latest
outputs:
plugins: ${{ steps.plugins.outputs.plugins }}
steps:
# Checkout
- uses: actions/checkout@v4
# Get Plugins List
- name: Get Plugins List
uses: ./.github/actions/plugins-list
id: plugins
with:
plugin-version: ${{ env.PLUGIN_VERSION }}
# ********************************************************************************************************************
# Build
# ********************************************************************************************************************
build-artifacts:
name: Build Artifacts
if: ${{ github.event.inputs.force-download-artifact == 'true' }}
uses: ./.github/workflows/workflow-build-artifacts.yml
docker:
name: Publish Docker
needs: [ plugins, build-artifacts ]
runs-on: ubuntu-latest
strategy:
matrix:
image:
- name: "-no-plugins"
plugins: ""
packages: jattach
python-libs: ""
- name: ""
plugins: ${{needs.plugins.outputs.plugins}}
packages: python3 python-is-python3 python3-pip curl jattach
python-libs: kestra
steps:
- uses: actions/checkout@v4
# Vars
- name: Set image name
id: vars
run: |
if [[ "${{ inputs.release-tag }}" == "" ]]; then
TAG=${GITHUB_REF#refs/*/}
echo "tag=${TAG}" >> $GITHUB_OUTPUT
else
TAG="${{ inputs.release-tag }}"
echo "tag=${TAG}" >> $GITHUB_OUTPUT
fi
if [[ "${{ env.PLUGIN_VERSION }}" == *"-SNAPSHOT" ]]; then
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ ${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT;
else
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
fi
# [workflow_dispatch]
# Download executable from GitHub Release
- name: Artifacts - Download release (workflow_dispatch)
id: download-github-release
if: github.event_name == 'workflow_dispatch' && github.event.inputs.force-download-artifact == 'false'
uses: robinraju/release-downloader@v1.12
with:
tag: ${{steps.vars.outputs.tag}}
fileName: 'kestra-*'
out-file-path: build/executable
# [workflow_call]
# Download executable from artifact
- name: Artifacts - Download executable
if: github.event_name != 'workflow_dispatch' || steps.download-github-release.outcome == 'skipped'
uses: actions/download-artifact@v5
with:
name: exe
path: build/executable
- name: Copy exe to image
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
# Docker setup
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Docker - Fix Qemu
shell: bash
run: |
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
# Docker Login
- name: Login to DockerHub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_PASSWORD }}
# Docker Build and push
- name: Push to Docker Hub
uses: docker/build-push-action@v6
with:
context: .
push: true
tags: ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }}
platforms: linux/amd64,linux/arm64
build-args: |
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }}
APT_PACKAGES=${{ matrix.image.packages }}
PYTHON_LIBRARIES=${{ matrix.image.python-libs }}
- name: Install regctl
if: github.event.inputs.retag-latest == 'true'
uses: regclient/actions/regctl-installer@main
- name: Retag to latest
if: github.event.inputs.retag-latest == 'true'
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
end:
runs-on: ubuntu-latest
needs:
- docker
if: always()
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
steps:
# Slack
- name: Slack notification
uses: Gamesight/slack-workflow-status@master
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
name: GitHub Actions
icon_emoji: ':github-actions:'
channel: 'C02DQ1A7JLR' # _int_git channel

View File

@@ -19,7 +19,7 @@ on:
default: "no input"
jobs:
check:
timeout-minutes: 10
timeout-minutes: 15
runs-on: ubuntu-latest
env:
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
@@ -32,7 +32,7 @@ jobs:
password: ${{ github.token }}
- name: Checkout kestra
uses: actions/checkout@v4
uses: actions/checkout@v5
with:
path: kestra

View File

@@ -21,12 +21,12 @@ jobs:
runs-on: ubuntu-latest
steps:
# Checkout
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions

View File

@@ -33,13 +33,13 @@ jobs:
exit 1;
fi
# Checkout
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
fetch-depth: 0
path: kestra
# Checkout GitHub Actions
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions

View File

@@ -4,9 +4,8 @@ on:
workflow_dispatch:
inputs:
plugin-version:
description: "Kestra version"
default: 'LATEST'
required: true
description: "plugins version"
required: false
type: string
push:
branches:
@@ -34,7 +33,7 @@ jobs:
if: "!startsWith(github.ref, 'refs/heads/releases')"
uses: ./.github/workflows/workflow-release.yml
with:
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
plugin-version: ${{ inputs.plugin-version != '' && inputs.plugin-version || (github.ref == 'refs/heads/develop' && 'LATEST-SNAPSHOT' || 'LATEST') }}
secrets:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}

View File

@@ -17,7 +17,7 @@ jobs:
runs-on: ubuntu-latest
steps:
# Checkout
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
fetch-depth: 0

View File

@@ -34,7 +34,7 @@ jobs:
fi
# Checkout
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
fetch-depth: 0

View File

@@ -17,12 +17,12 @@ jobs:
runs-on: ubuntu-latest
steps:
# Checkout
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions
@@ -66,12 +66,12 @@ jobs:
actions: read
steps:
# Checkout
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions
@@ -111,12 +111,12 @@ jobs:
actions: read
steps:
# Checkout
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
fetch-depth: 0
# Checkout GitHub Actions
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
repository: kestra-io/actions
path: actions

View File

@@ -29,7 +29,7 @@ jobs:
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
name: Checkout - Current ref
with:
fetch-depth: 0

View File

@@ -15,7 +15,7 @@ jobs:
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
steps:
- name: Checkout - Current ref
uses: actions/checkout@v4
uses: actions/checkout@v5
with:
fetch-depth: 0

View File

@@ -20,7 +20,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v5
- name: Cache Node Modules
id: cache-node-modules

View File

@@ -20,14 +20,14 @@ jobs:
steps:
# Check out
- name: Checkout - Repository
uses: actions/checkout@v4
uses: actions/checkout@v5
with:
fetch-depth: 0
submodules: true
# Checkout GitHub Actions
- name: Checkout - Actions
uses: actions/checkout@v4
uses: actions/checkout@v5
with:
repository: kestra-io/actions
sparse-checkout-cone-mode: true

View File

@@ -1,22 +1,37 @@
name: Publish - Docker
name: Create Docker images on Release
on:
workflow_dispatch:
inputs:
plugin-version:
description: "Kestra version"
default: 'LATEST'
retag-latest:
description: 'Retag latest Docker images'
required: true
type: choice
default: "false"
options:
- "true"
- "false"
release-tag:
description: 'Kestra Release Tag (by default, deduced with the ref)'
required: false
type: string
plugin-version:
description: 'Plugin version'
required: false
type: string
default: "LATEST"
force-download-artifact:
description: 'Force download artifact'
required: false
type: string
type: choice
default: "true"
options:
- "true"
- "false"
workflow_call:
inputs:
plugin-version:
description: "Kestra version"
description: "Plugin version"
default: 'LATEST'
required: false
type: string
@@ -33,45 +48,93 @@ on:
description: "The Dockerhub password."
required: true
env:
PLUGIN_VERSION: ${{ inputs.plugin-version != null && inputs.plugin-version || 'LATEST' }}
jobs:
plugins:
name: List Plugins
runs-on: ubuntu-latest
outputs:
plugins: ${{ steps.plugins.outputs.plugins }}
steps:
# Checkout
- uses: actions/checkout@v5
# Get Plugins List
- name: Get Plugins List
uses: ./.github/actions/plugins-list
id: plugins
with: # remap LATEST-SNAPSHOT to LATEST
plugin-version: ${{ env.PLUGIN_VERSION == 'LATEST-SNAPSHOT' && 'LATEST' || env.PLUGIN_VERSION }}
# ********************************************************************************************************************
# Build
# ********************************************************************************************************************
build-artifacts:
name: Build Artifacts
if: ${{ github.event.inputs.force-download-artifact == 'true' }}
if: ${{ inputs.force-download-artifact == 'true' }}
uses: ./.github/workflows/workflow-build-artifacts.yml
# ********************************************************************************************************************
# Docker
# ********************************************************************************************************************
publish:
name: Publish - Docker
docker:
name: Publish Docker
needs: [ plugins, build-artifacts ]
if: always()
runs-on: ubuntu-latest
needs: build-artifacts
if: |
always() &&
(needs.build-artifacts.result == 'success' ||
github.event.inputs.force-download-artifact != 'true')
env:
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
strategy:
matrix:
image:
- tag: -no-plugins
- name: "-no-plugins"
plugins: ""
packages: jattach
plugins: false
python-libraries: ""
- tag: ""
plugins: true
packages: python3 python3-venv python-is-python3 python3-pip nodejs npm curl zip unzip jattach
python-libraries: kestra
python-libs: ""
- name: ""
plugins: ${{needs.plugins.outputs.plugins}}
packages: python3 python-is-python3 python3-pip curl jattach
python-libs: kestra
steps:
- name: Checkout - Current ref
uses: actions/checkout@v4
- uses: actions/checkout@v5
# Vars
- name: Set image name
id: vars
run: |
if [[ "${{ inputs.release-tag }}" == "" ]]; then
TAG=${GITHUB_REF#refs/*/}
echo "tag=${TAG}" >> $GITHUB_OUTPUT
else
TAG="${{ inputs.release-tag }}"
echo "tag=${TAG}" >> $GITHUB_OUTPUT
fi
if [[ $GITHUB_REF == refs/tags/* ]]; then
if [[ $TAG =~ ^v[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
# this will remove the patch version number
MINOR_SEMVER=${TAG%.*}
echo "minor_semver=${MINOR_SEMVER}" >> $GITHUB_OUTPUT
else
echo "Tag '$TAG' is not a valid semver (vMAJOR.MINOR.PATCH), skipping minor_semver"
fi
fi
if [[ "${{ env.PLUGIN_VERSION }}" == *"-SNAPSHOT" ]]; then
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ ${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT;
else
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
fi
# Download executable from artifact
- name: Artifacts - Download executable
uses: actions/download-artifact@v5
with:
name: exe
path: build/executable
- name: Copy exe to image
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
# Docker setup
- name: Docker - Setup QEMU
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Docker - Fix Qemu
@@ -79,66 +142,59 @@ jobs:
run: |
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
- name: Docker - Setup Docker Buildx
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
# Docker Login
- name: Docker - Login to DockerHub
- name: Login to DockerHub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_PASSWORD }}
# # Get Plugins List
- name: Plugins - Get List
uses: ./.github/actions/plugins-list
id: plugins-list
if: ${{ matrix.image.plugins}}
with:
plugin-version: ${{ env.PLUGIN_VERSION }}
# Vars
- name: Docker - Set variables
shell: bash
id: vars
run: |
TAG=${GITHUB_REF#refs/*/}
PLUGINS="${{ matrix.image.plugins == true && steps.plugins-list.outputs.plugins || '' }}"
if [[ $TAG == v* ]]; then
TAG="${TAG}";
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
elif [[ $TAG = "develop" ]]; then
TAG="develop";
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ $PLUGINS" >> $GITHUB_OUTPUT
else
TAG="build-${{ github.run_id }}";
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ $PLUGINS" >> $GITHUB_OUTPUT
fi
echo "tag=${TAG}${{ matrix.image.tag }}" >> $GITHUB_OUTPUT
# Build Docker Image
- name: Artifacts - Download executable
uses: actions/download-artifact@v5
with:
name: exe
path: build/executable
- name: Docker - Copy exe to image
shell: bash
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
# Docker Build and push
- name: Docker - Build image
- name: Push to Docker Hub
uses: docker/build-push-action@v6
with:
context: .
push: true
tags: kestra/kestra:${{ steps.vars.outputs.tag }}
tags: ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }}
platforms: linux/amd64,linux/arm64
build-args: |
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }}
APT_PACKAGES=${{ matrix.image.packages }}
PYTHON_LIBRARIES=${{ matrix.image.python-libraries }}
PYTHON_LIBRARIES=${{ matrix.image.python-libs }}
- name: Install regctl
if: startsWith(github.ref, 'refs/tags/v')
uses: regclient/actions/regctl-installer@main
- name: Retag to minor semver version
if: startsWith(github.ref, 'refs/tags/v') && steps.vars.outputs.minor_semver != ''
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.minor_semver, matrix.image.name) }}
- name: Retag to latest
if: startsWith(github.ref, 'refs/tags/v') && inputs.retag-latest == 'true'
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
end:
runs-on: ubuntu-latest
needs:
- docker
if: always()
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
steps:
# Slack
- name: Slack notification
uses: Gamesight/slack-workflow-status@master
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
name: GitHub Actions
icon_emoji: ':github-actions:'
channel: 'C02DQ1A7JLR' # _int_git channel

View File

@@ -25,7 +25,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout - Current ref
uses: actions/checkout@v4
uses: actions/checkout@v5
# Setup build
- name: Setup - Build

View File

@@ -7,6 +7,7 @@ on:
jobs:
publish:
name: Pull Request - Delete Docker
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
runs-on: ubuntu-latest
steps:
- uses: dataaxiom/ghcr-cleanup-action@v1

View File

@@ -8,17 +8,19 @@ on:
jobs:
build-artifacts:
name: Build Artifacts
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
uses: ./.github/workflows/workflow-build-artifacts.yml
publish:
name: Publish Docker
if: github.repository == github.event.pull_request.head.repo.full_name # prevent running on forks
runs-on: ubuntu-latest
needs: build-artifacts
env:
GITHUB_IMAGE_PATH: "ghcr.io/kestra-io/kestra-pr"
steps:
- name: Checkout - Current ref
uses: actions/checkout@v4
uses: actions/checkout@v5
with:
fetch-depth: 0

View File

@@ -4,7 +4,7 @@ on:
workflow_dispatch:
inputs:
plugin-version:
description: "Kestra version"
description: "plugins version"
default: 'LATEST'
required: false
type: string
@@ -16,7 +16,7 @@ on:
workflow_call:
inputs:
plugin-version:
description: "Kestra version"
description: "plugins version"
default: 'LATEST'
required: false
type: string
@@ -57,10 +57,10 @@ jobs:
name: Publish Docker
needs: build-artifacts
uses: ./.github/workflows/workflow-publish-docker.yml
if: startsWith(github.ref, 'refs/heads/develop') || github.event.inputs.publish-docker == 'true'
if: github.ref == 'refs/heads/develop' || inputs.publish-docker == 'true'
with:
force-download-artifact: 'false'
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
plugin-version: ${{ inputs.plugin-version != null && inputs.plugin-version || 'LATEST' }}
secrets:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}

View File

@@ -27,7 +27,7 @@ jobs:
ui: ${{ steps.changes.outputs.ui }}
backend: ${{ steps.changes.outputs.backend }}
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
if: "!startsWith(github.ref, 'refs/tags/v')"
- uses: dorny/paths-filter@v3
if: "!startsWith(github.ref, 'refs/tags/v')"

305
AGENTS.md Normal file
View File

@@ -0,0 +1,305 @@
# Kestra AGENTS.md
This file provides guidance for AI coding agents working on the Kestra project. Kestra is an open-source data orchestration and scheduling platform built with Java (Micronaut) and Vue.js.
## Repository Layout
- **`core/`**: Core Kestra framework and task definitions
- **`cli/`**: Command-line interface and server implementation
- **`webserver/`**: REST API server implementation
- **`ui/`**: Vue.js frontend application
- **`jdbc-*`**: Database connector modules (H2, MySQL, PostgreSQL)
- **`script/`**: Script execution engine
- **`storage-local/`**: Local file storage implementation
- **`repository-memory/`**: In-memory repository implementation
- **`runner-memory/`**: In-memory execution runner
- **`processor/`**: Task processing engine
- **`model/`**: Data models and Data Transfer Objects
- **`platform/`**: Platform-specific implementations
- **`tests/`**: Integration test framework
- **`e2e-tests/`**: End-to-end testing suite
## Development Environment
### Prerequisites
- Java 21+
- Node.js 22+ and npm
- Python 3, pip, and python venv
- Docker & Docker Compose
- Gradle (wrapper included)
### Quick Setup with Devcontainer
The easiest way to get started is using the provided devcontainer:
1. Install VSCode Remote Development extension
2. Run `Dev Containers: Open Folder in Container...` from command palette
3. Select the Kestra root folder
4. Wait for Gradle build to complete
### Manual Setup
1. Clone the repository
2. Run `./gradlew build` to build the backend
3. Navigate to `ui/` and run `npm install`
4. Create configuration files as described below
## Configuration Files
### Backend Configuration
Create `cli/src/main/resources/application-override.yml`:
**Local Mode (H2 database):**
```yaml
micronaut:
server:
cors:
enabled: true
configurations:
all:
allowedOrigins:
- http://localhost:5173
```
**Standalone Mode (PostgreSQL):**
```yaml
kestra:
repository:
type: postgres
storage:
type: local
local:
base-path: "/app/storage"
queue:
type: postgres
tasks:
tmp-dir:
path: /tmp/kestra-wd/tmp
anonymous-usage-report:
enabled: false
datasources:
postgres:
url: jdbc:postgresql://host.docker.internal:5432/kestra
driverClassName: org.postgresql.Driver
username: kestra
password: k3str4
flyway:
datasources:
postgres:
enabled: true
locations:
- classpath:migrations/postgres
ignore-migration-patterns: "*:missing,*:future"
out-of-order: true
micronaut:
server:
cors:
enabled: true
configurations:
all:
allowedOrigins:
- http://localhost:5173
```
### Frontend Configuration
Create `ui/.env.development.local` for environment variables.
## Running the Application
### Backend
- **Local mode**: `./gradlew runLocal` (uses H2 database)
- **Standalone mode**: Use VSCode Run and Debug with main class `io.kestra.cli.App` and args `server standalone`
### Frontend
- Navigate to `ui/` directory
- Run `npm run dev` for development server (port 5173)
- Run `npm run build` for production build
## Building and Testing
### Backend
```bash
# Build the project
./gradlew build
# Run tests
./gradlew test
# Run specific module tests
./gradlew :core:test
# Clean build
./gradlew clean build
```
### Frontend
```bash
cd ui
npm install
npm run test
npm run lint
npm run build
```
### End-to-End Tests
```bash
# Build and start E2E tests
./build-and-start-e2e-tests.sh
# Or use the Makefile
make install
make install-plugins
make start-standalone-postgres
```
## Development Guidelines
### Java Backend
- Use Java 21 features
- Follow Micronaut framework patterns
- Add Swagger annotations for API documentation
- Use annotation processors (enable in IDE)
- Set `MICRONAUT_ENVIRONMENTS=local,override` for custom config
- Set `KESTRA_PLUGINS_PATH` for custom plugin loading
### Vue.js Frontend
- Vue 3 with Composition API
- TypeScript for type safety
- Vite for build tooling
- ESLint and Prettier for code quality
- Component-based architecture in `src/components/`
### Code Style
- Follow `.editorconfig` settings
- Use 4 spaces for Java, 2 spaces for YAML/JSON/CSS
- Enable format on save in VSCode
- Use Prettier for frontend code formatting
## Testing Strategy
### Backend Testing
- Unit tests in `src/test/java/`
- Integration tests in `tests/` module
- Use Micronaut test framework
- Test both local and standalone modes
### Frontend Testing
- Unit tests with Jest
- E2E tests with Playwright
- Component testing with Storybook
- Run `npm run test:unit` and `npm run test:e2e`
## Plugin Development
### Creating Plugins
- Follow the [Plugin Developer Guide](https://kestra.io/docs/plugin-developer-guide/)
- Place JAR files in `KESTRA_PLUGINS_PATH`
- Use the plugin template structure
- Test with both local and standalone modes
### Plugin Loading
- Set `KESTRA_PLUGINS_PATH` environment variable
- Use devcontainer mounts for local development
- Plugins are loaded at startup
## Common Issues and Solutions
### JavaScript Heap Out of Memory
Set `NODE_OPTIONS=--max-old-space-size=4096` environment variable.
### CORS Issues
Ensure backend CORS is configured for `http://localhost:5173` when using frontend dev server.
### Database Connection Issues
- Use `host.docker.internal` instead of `localhost` when connecting from devcontainer
- Verify PostgreSQL is running and accessible
- Check database credentials and permissions
### Gradle Build Issues
- Clear Gradle cache: `./gradlew clean`
- Check Java version compatibility
- Verify all dependencies are available
## Pull Request Guidelines
### Before Submitting
1. Run all tests: `./gradlew test` and `npm test`
2. Check code formatting: `./gradlew spotlessCheck`
3. Verify CORS configuration if changing API
4. Test both local and standalone modes
5. Update documentation for user-facing changes
### Commit Messages
- Follow conventional commit format
- Use present tense ("Add feature" not "Added feature")
- Reference issue numbers when applicable
- Keep commits focused and atomic
### Review Checklist
- [ ] All tests pass
- [ ] Code follows project style guidelines
- [ ] Documentation is updated
- [ ] No breaking changes without migration guide
- [ ] CORS properly configured if API changes
- [ ] Both local and standalone modes tested
## Useful Commands
```bash
# Quick development commands
./gradlew runLocal # Start local backend
./gradlew :ui:build # Build frontend
./gradlew clean build # Clean rebuild
npm run dev # Start frontend dev server
make install # Install Kestra locally
make start-standalone-postgres # Start with PostgreSQL
# Testing commands
./gradlew test # Run all backend tests
./gradlew :core:test # Run specific module tests
npm run test # Run frontend tests
npm run lint # Lint frontend code
```
## Getting Help
- Open a [GitHub issue](https://github.com/kestra-io/kestra/issues)
- Join the [Kestra Slack community](https://kestra.io/slack)
- Check the [main documentation](https://kestra.io/docs)
## Environment Variables
| Variable | Description | Default |
|----------|-------------|---------|
| `MICRONAUT_ENVIRONMENTS` | Custom config environments | `local,override` |
| `KESTRA_PLUGINS_PATH` | Path to custom plugins | `/workspaces/kestra/local/plugins` |
| `NODE_OPTIONS` | Node.js options | `--max-old-space-size=4096` |
| `JAVA_HOME` | Java installation path | `/usr/java/jdk-21` |
Remember: Always test your changes in both local and standalone modes, and ensure CORS is properly configured for frontend development.

View File

@@ -7,7 +7,7 @@ set -e
# run tests on this image
LOCAL_IMAGE_VERSION="local-e2e"
LOCAL_IMAGE_VERSION="local-e2e-$(date +%s)"
echo "Running E2E"
echo "Start time: $(date '+%Y-%m-%d %H:%M:%S')"
@@ -15,6 +15,7 @@ start_time=$(date +%s)
echo ""
echo "Building the image for this current repository"
make clean
make build-docker VERSION=$LOCAL_IMAGE_VERSION
end_time=$(date +%s)
@@ -32,7 +33,7 @@ echo "npm i"
npm i
echo 'sh ./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"'
sh ./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"
./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"
end_time2=$(date +%s)
elapsed2=$(( end_time2 - start_time2 ))

View File

@@ -10,24 +10,21 @@ import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.services.PluginDefaultService;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
import io.micronaut.scheduling.io.watch.FileWatchConfiguration;
import jakarta.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import jakarta.validation.ConstraintViolationException;
import java.util.concurrent.CopyOnWriteArrayList;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
@Singleton
@Slf4j
@Requires(property = "micronaut.io.watch.enabled", value = "true")
@@ -49,13 +46,9 @@ public class FileChangedEventListener {
@Inject
protected FlowListenersInterface flowListeners;
@Nullable
@Value("${micronaut.io.watch.tenantId}")
private String tenantId;
FlowFilesManager flowFilesManager;
private List<FlowWithPath> flows = new ArrayList<>();
private List<FlowWithPath> flows = new CopyOnWriteArrayList<>();
private boolean isStarted = false;
@@ -113,8 +106,6 @@ public class FileChangedEventListener {
}
public void startListening(List<Path> paths) throws IOException, InterruptedException {
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
for (Path path : paths) {
path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY);
}
@@ -157,7 +148,7 @@ public class FileChangedEventListener {
flows.add(FlowWithPath.of(flow.get(), filePath.toString()));
}
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(tenantId, content));
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(getTenantIdFromPath(filePath), content));
log.info("Flow {} from file {} has been created or modified", flow.get().getId(), entry);
}
@@ -201,8 +192,6 @@ public class FileChangedEventListener {
}
private void loadFlowsFromFolder(Path folder) {
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
try {
Files.walkFileTree(folder, new SimpleFileVisitor<Path>() {
@Override
@@ -222,7 +211,7 @@ public class FileChangedEventListener {
if (flow.isPresent() && flows.stream().noneMatch(flowWithPath -> flowWithPath.uidWithoutRevision().equals(flow.get().uidWithoutRevision()))) {
flows.add(FlowWithPath.of(flow.get(), file.toString()));
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(tenantId, content));
flowFilesManager.createOrUpdateFlow(GenericFlow.fromYaml(getTenantIdFromPath(file), content));
}
}
return FileVisitResult.CONTINUE;
@@ -246,10 +235,8 @@ public class FileChangedEventListener {
}
private Optional<FlowWithSource> parseFlow(String content, Path entry) {
String tenantId = this.tenantId != null ? this.tenantId : MAIN_TENANT;
try {
FlowWithSource flow = pluginDefaultService.parseFlowWithAllDefaults(tenantId, content, false);
FlowWithSource flow = pluginDefaultService.parseFlowWithAllDefaults(getTenantIdFromPath(entry), content, false);
modelValidator.validate(flow);
return Optional.of(flow);
} catch (ConstraintViolationException | FlowProcessingException e) {
@@ -273,4 +260,8 @@ public class FileChangedEventListener {
private Path buildPath(FlowInterface flow) {
return fileWatchConfiguration.getPaths().getFirst().resolve(flow.uidWithoutRevision() + ".yml");
}
private String getTenantIdFromPath(Path path) {
return path.getFileName().toString().split("_")[0];
}
}

View File

@@ -212,7 +212,7 @@ kestra:
retention: 30d
anonymous-usage-report:
enabled: true
uri: https://api.kestra.io/v1/reports/usages
uri: https://api.kestra.io/v1/server-events/
initial-delay: 5m
fixed-delay: 1h

View File

@@ -1,6 +1,7 @@
package io.kestra.cli.services;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.utils.Await;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
@@ -71,7 +72,9 @@ class FileChangedEventListenerTest {
type: io.kestra.plugin.core.log.Log
message: Hello World! 🚀
""";
Files.write(Path.of(FILE_WATCH + "/myflow.yaml"), flow.getBytes());
GenericFlow genericFlow = GenericFlow.fromYaml(MAIN_TENANT, flow);
Files.write(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"), flow.getBytes());
Await.until(
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").isPresent(),
Duration.ofMillis(100),
@@ -83,7 +86,7 @@ class FileChangedEventListenerTest {
assertThat(myflow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
// delete the flow
Files.delete(Path.of(FILE_WATCH + "/myflow.yaml"));
Files.delete(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"));
Await.until(
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "myflow").isEmpty(),
Duration.ofMillis(100),
@@ -110,7 +113,8 @@ class FileChangedEventListenerTest {
values:
message: Hello World!
""";
Files.write(Path.of(FILE_WATCH + "/plugin-default.yaml"), pluginDefault.getBytes());
GenericFlow genericFlow = GenericFlow.fromYaml(MAIN_TENANT, pluginDefault);
Files.write(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"), pluginDefault.getBytes());
Await.until(
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").isPresent(),
Duration.ofMillis(100),
@@ -122,7 +126,7 @@ class FileChangedEventListenerTest {
assertThat(pluginDefaultFlow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
// delete both files
Files.delete(Path.of(FILE_WATCH + "/plugin-default.yaml"));
Files.delete(Path.of(FILE_WATCH + "/" + genericFlow.uidWithoutRevision() + ".yaml"));
Await.until(
() -> flowRepository.findById(MAIN_TENANT, "io.kestra.tests.watch", "pluginDefault").isEmpty(),
Duration.ofMillis(100),

View File

@@ -53,6 +53,8 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static io.kestra.core.docs.AbstractClassDocumentation.flattenWithoutType;
import static io.kestra.core.docs.AbstractClassDocumentation.required;
import static io.kestra.core.serializers.JacksonMapper.MAP_TYPE_REFERENCE;
@Singleton
@@ -92,12 +94,16 @@ public class JsonSchemaGenerator {
}
public <T> Map<String, Object> schemas(Class<? extends T> cls, boolean arrayOf, List<String> allowedPluginTypes) {
return this.schemas(cls, arrayOf, allowedPluginTypes, false);
}
public <T> Map<String, Object> schemas(Class<? extends T> cls, boolean arrayOf, List<String> allowedPluginTypes, boolean withOutputs) {
SchemaGeneratorConfigBuilder builder = new SchemaGeneratorConfigBuilder(
SchemaVersion.DRAFT_7,
OptionPreset.PLAIN_JSON
);
this.build(builder, true, allowedPluginTypes);
this.build(builder, true, allowedPluginTypes, withOutputs);
SchemaGeneratorConfig schemaGeneratorConfig = builder.build();
@@ -249,6 +255,10 @@ public class JsonSchemaGenerator {
}
protected void build(SchemaGeneratorConfigBuilder builder, boolean draft7, List<String> allowedPluginTypes) {
this.build(builder, draft7, allowedPluginTypes, false);
}
protected void build(SchemaGeneratorConfigBuilder builder, boolean draft7, List<String> allowedPluginTypes, boolean withOutputs) {
// builder.withObjectMapper(builder.getObjectMapper().configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false));
builder
.with(new JakartaValidationModule(
@@ -430,6 +440,13 @@ public class JsonSchemaGenerator {
if (pluginAnnotation.beta()) {
collectedTypeAttributes.put("$beta", true);
}
if (withOutputs) {
Map<String, Object> outputsSchema = this.outputs(null, scope.getType().getErasedType());
collectedTypeAttributes.set("outputs", context.getGeneratorConfig().createObjectNode().pojoNode(
flattenWithoutType(AbstractClassDocumentation.properties(outputsSchema), required(outputsSchema))
));
}
}
// handle deprecated tasks

View File

@@ -62,6 +62,7 @@ public record ServiceUsage(
List<DailyServiceStatistics> statistics = Arrays
.stream(ServiceType.values())
.filter(it -> !it.equals(ServiceType.INVALID))
.map(type -> of(from, to, repository, type, interval))
.toList();
return new ServiceUsage(statistics);

View File

@@ -1,74 +0,0 @@
package io.kestra.core.models.collectors;
import io.kestra.core.models.ServerType;
import io.micronaut.core.annotation.Introspected;
import jakarta.annotation.Nullable;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.experimental.SuperBuilder;
import lombok.extern.jackson.Jacksonized;
import java.time.Instant;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;
import java.util.Set;
@SuperBuilder(toBuilder = true)
@Getter
@Jacksonized
@Introspected
@AllArgsConstructor
public class Usage {
@NotNull
private final String uuid;
@NotNull
private final String startUuid;
@NotNull
private final String instanceUuid;
@NotNull
private final ServerType serverType;
@NotNull
private final String version;
@NotNull
private final ZoneId zoneId;
@Nullable
private final String uri;
@Nullable
private final Set<String> environments;
@NotNull
private final Instant startTime;
@Valid
private final HostUsage host;
@Valid
private final ConfigurationUsage configurations;
@Valid
private final List<PluginUsage> plugins;
@Valid
private final FlowUsage flows;
@Valid
private final ExecutionUsage executions;
@Valid
@Nullable
private ServiceUsage services;
@Valid
@Nullable
private List<PluginMetric> pluginMetrics;
}

View File

@@ -1040,6 +1040,16 @@ public class Execution implements DeletedInterface, TenantInterface {
return result;
}
/**
* Find all children of this {@link TaskRun}.
*/
public List<TaskRun> findChildren(TaskRun parentTaskRun) {
return taskRunList.stream()
.filter(taskRun -> parentTaskRun.getId().equals(taskRun.getParentTaskRunId()))
.toList();
}
public List<String> findParentsValues(TaskRun taskRun, boolean withCurrent) {
return (withCurrent ?
Stream.concat(findParents(taskRun).stream(), Stream.of(taskRun)) :

View File

@@ -4,6 +4,8 @@ import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.kestra.core.models.flows.input.*;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.ConstraintViolationException;
@@ -16,6 +18,8 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.function.Function;
@SuppressWarnings("deprecation")
@SuperBuilder
@Getter
@@ -78,7 +82,7 @@ public abstract class Input<T> implements Data {
@Schema(
title = "The default value to use if no value is specified."
)
T defaults;
Property<T> defaults;
@Schema(
title = "The display name of the input."

View File

@@ -43,4 +43,11 @@ public class Output implements Data {
Type type;
String displayName;
/**
* Specifies whether the output is required or not.
* <p>
* By default, an output is always required.
*/
Boolean required;
}

View File

@@ -6,19 +6,21 @@ import jakarta.validation.ConstraintViolationException;
import jakarta.validation.constraints.NotNull;
/**
* Represents a
* Represents an input along with its associated value and validation state.
*
* @param input The flow's {@link Input}.
* @param value The flow's input value/data.
* @param enabled Specify whether the input is enabled.
* @param exception The input validation exception.
* @param input The {@link Input} definition of the flow.
* @param value The provided value for the input.
* @param enabled {@code true} if the input is enabled; {@code false} otherwise.
* @param isDefault {@code true} if the provided value is the default; {@code false} otherwise.
* @param exception The validation exception, if the input value is invalid; {@code null} otherwise.
*/
public record InputAndValue(
Input<?> input,
Object value,
boolean enabled,
boolean isDefault,
ConstraintViolationException exception) {
/**
* Creates a new {@link InputAndValue} instance.
*
@@ -26,6 +28,6 @@ public record InputAndValue(
* @param value The value.
*/
public InputAndValue(@NotNull Input<?> input, @Nullable Object value) {
this(input, value, true, null);
this(input, value, true, false, null);
}
}

View File

@@ -68,6 +68,19 @@ public class Property<T> {
String getExpression() {
return expression;
}
/**
* Returns a new {@link Property} with no cached rendered value,
* so that the next render will evaluate its original Pebble expression.
* <p>
* The returned property will still cache its rendered result.
* To re-evaluate on a subsequent render, call {@code skipCache()} again.
*
* @return a new {@link Property} without a pre-rendered value
*/
public Property<T> skipCache() {
return Property.ofExpression(expression);
}
/**
* Build a new Property object with a value already set.<br>
@@ -132,8 +145,8 @@ public class Property<T> {
*
* @see io.kestra.core.runners.RunContextProperty#as(Class)
*/
public static <T> T as(Property<T> property, RunContext runContext, Class<T> clazz) throws IllegalVariableEvaluationException {
return as(property, runContext, clazz, Map.of());
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz) throws IllegalVariableEvaluationException {
return as(property, context, clazz, Map.of());
}
/**
@@ -143,9 +156,9 @@ public class Property<T> {
*
* @see io.kestra.core.runners.RunContextProperty#as(Class, Map)
*/
public static <T> T as(Property<T> property, RunContext runContext, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.value == null) {
String rendered = runContext.render(property.expression, variables);
String rendered = context.render(property.expression, variables);
property.value = MAPPER.convertValue(rendered, clazz);
}
@@ -159,8 +172,8 @@ public class Property<T> {
*
* @see io.kestra.core.runners.RunContextProperty#asList(Class)
*/
public static <T, I> T asList(Property<T> property, RunContext runContext, Class<I> itemClazz) throws IllegalVariableEvaluationException {
return asList(property, runContext, itemClazz, Map.of());
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz) throws IllegalVariableEvaluationException {
return asList(property, context, itemClazz, Map.of());
}
/**
@@ -171,7 +184,7 @@ public class Property<T> {
* @see io.kestra.core.runners.RunContextProperty#asList(Class, Map)
*/
@SuppressWarnings("unchecked")
public static <T, I> T asList(Property<T> property, RunContext runContext, Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.value == null) {
JavaType type = MAPPER.getTypeFactory().constructCollectionLikeType(List.class, itemClazz);
try {
@@ -179,7 +192,7 @@ public class Property<T> {
// We need to detect if the expression is already a list or if it's a pebble expression (for eg. referencing a variable containing a list).
// Doing that allows us to, if it's an expression, first render then read it as a list.
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
property.value = MAPPER.readValue(runContext.render(property.expression, variables), type);
property.value = MAPPER.readValue(context.render(property.expression, variables), type);
}
// Otherwise, if it's already a list, we read it as a list first then render it from run context which handle list rendering by rendering each item of the list
else {
@@ -187,9 +200,9 @@ public class Property<T> {
property.value = (T) asRawList.stream()
.map(throwFunction(item -> {
if (item instanceof String str) {
return MAPPER.convertValue(runContext.render(str, variables), itemClazz);
return MAPPER.convertValue(context.render(str, variables), itemClazz);
} else if (item instanceof Map map) {
return MAPPER.convertValue(runContext.render(map, variables), itemClazz);
return MAPPER.convertValue(context.render(map, variables), itemClazz);
}
return item;
}))

View File

@@ -0,0 +1,38 @@
package io.kestra.core.models.property;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.VariableRenderer;
import java.util.Map;
/**
* Contextual object for rendering properties.
*
* @see Property
*/
public interface PropertyContext {
String render(String inline, Map<String, Object> variables) throws IllegalVariableEvaluationException;
Map<String, Object> render(Map<String, Object> inline, Map<String, Object> variables) throws IllegalVariableEvaluationException;
/**
* Static helper method for creating a new {@link PropertyContext} from a given {@link VariableRenderer}.
*
* @param renderer the {@link VariableRenderer}.
* @return a new {@link PropertyContext}.
*/
static PropertyContext create(final VariableRenderer renderer) {
return new PropertyContext() {
@Override
public String render(String inline, Map<String, Object> variables) throws IllegalVariableEvaluationException {
return renderer.render(inline, variables);
}
@Override
public Map<String, Object> render(Map<String, Object> inline, Map<String, Object> variables) throws IllegalVariableEvaluationException {
return renderer.render(inline, variables);
}
};
}
}

View File

@@ -5,11 +5,9 @@ import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.MetricEntry;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.runners.*;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.templates.Template;
public interface QueueFactoryInterface {
String EXECUTION_NAMED = "executionQueue";
@@ -34,7 +32,7 @@ public interface QueueFactoryInterface {
QueueInterface<Executor> executor();
QueueInterface<WorkerJob> workerJob();
WorkerJobQueueInterface workerJob();
QueueInterface<WorkerTaskResult> workerTaskResult();

View File

@@ -0,0 +1,12 @@
package io.kestra.core.queues;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.runners.WorkerJob;
import io.kestra.core.utils.Either;
import java.util.function.Consumer;
public interface WorkerJobQueueInterface extends QueueInterface<WorkerJob> {
Runnable subscribe(String workerId, String workerGroup, Consumer<Either<WorkerJob, DeserializationException>> consumer);
}

View File

@@ -0,0 +1,29 @@
package io.kestra.core.reporter;
public abstract class AbstractReportable<T extends Reportable.Event> implements Reportable<T> {
private final Type type;
private final ReportingSchedule schedule;
private final boolean isTenantSupported;
public AbstractReportable(Type type, ReportingSchedule schedule, boolean isTenantSupported) {
this.type = type;
this.schedule = schedule;
this.isTenantSupported = isTenantSupported;
}
@Override
public boolean isTenantSupported() {
return isTenantSupported;
}
@Override
public Type type() {
return type;
}
@Override
public ReportingSchedule schedule() {
return schedule;
}
}

View File

@@ -0,0 +1,94 @@
package io.kestra.core.reporter;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
/**
* Interface for reporting server event for a specific type.
*
* @param <T>
*/
public interface Reportable<T extends Reportable.Event> {
/**
* Gets the type of the event to report.
*/
Type type();
/**
* Gets the reporting schedule.
*/
ReportingSchedule schedule();
/**
* Generates a report for the given timestamp.
*
* @param now the time when the report is triggered.
* @return an Optional containing the report data if available.
*/
T report(Instant now, TimeInterval interval);
default T report(Instant now) {
ZonedDateTime to = now.atZone(ZoneId.systemDefault());
ZonedDateTime from = to.minus(Duration.ofDays(1));
return report(now, new TimeInterval(from, to));
}
/**
* Checks whether this reportable is enabled for scheduled reporting.
*/
boolean isEnabled();
/**
* Generates a report for the given timestamp and tenant.
*
* @param now the time when the report is triggered.
* @param tenant the tenant for which the report is triggered.
* @return the event to report.
*/
default T report(Instant now, TimeInterval interval, String tenant) {
throw new UnsupportedOperationException();
}
default T report(Instant now, String tenant) {
ZonedDateTime to = now.atZone(ZoneId.systemDefault());
ZonedDateTime from = to.minus(Duration.ofDays(1));
return report(now, new TimeInterval(from, to), tenant);
}
/**
* Checks whether this {@link Reportable} can accept a tenant.
*
* @return {@code true} a {@link #report(Instant, TimeInterval, String)} can called, Otherwise {@code false}.
*/
default boolean isTenantSupported() {
return false;
}
record TimeInterval(ZonedDateTime from, ZonedDateTime to){
public static TimeInterval of(ZonedDateTime from, ZonedDateTime to) {
return new TimeInterval(from, to);
}
}
/**
* Marker interface indicating that the returned event
* must be a structured, domain-specific object
* (not a primitive wrapper, String, collection, or other basic type).
*/
interface Event {
}
/**
* Defines the schedule for a report.
*/
interface ReportingSchedule {
/**
* Determines whether a report should run at the given instant.
*/
boolean shouldRun(Instant now);
}
}

View File

@@ -0,0 +1,40 @@
package io.kestra.core.reporter;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@Singleton
@Slf4j
public class ReportableRegistry {
private final Map<Type, Reportable<?>> reportables = new ConcurrentHashMap<>();
/**
* Creates a new {@link ReportableRegistry} instance.
*
* @param reportables The {@link Reportable reportables}
*/
@Inject
public ReportableRegistry(final List<Reportable<?>> reportables) {
reportables.forEach(reportable -> this.reportables.put(reportable.type(), reportable));
}
public void register(final Reportable<?> reportable) {
Objects.requireNonNull(reportable, "reportable must not be null");
if (reportables.containsKey(reportable.type())) {
log.warn("Event already registered for type '{}'", reportable.type());
} else {
reportables.put(reportable.type(), reportable);
}
}
public List<Reportable<?>> getAll() {
return List.copyOf(reportables.values());
}
}

View File

@@ -0,0 +1,43 @@
package io.kestra.core.reporter;
import io.micronaut.context.annotation.Requires;
import io.micronaut.scheduling.annotation.Scheduled;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.time.Clock;
import java.time.Instant;
@Singleton
@Requires(property = "kestra.anonymous-usage-report.enabled", value = "true")
@Requires(property = "kestra.server-type")
@Slf4j
public class ReportableScheduler {
private final ReportableRegistry registry;
private final ServerEventSender sender;
private final Clock clock;
@Inject
public ReportableScheduler(ReportableRegistry registry, ServerEventSender sender) {
this.registry = registry;
this.sender = sender;
this.clock = Clock.systemDefaultZone();
}
@Scheduled(fixedDelay = "5m", initialDelay = "${kestra.anonymous-usage-report.initial-delay}")
public void tick() {
Instant now = clock.instant();
for (Reportable<?> r : registry.getAll()) {
if (r.isEnabled() && r.schedule().shouldRun(now)) {
try {
Object value = r.report(now);
if (value != null) sender.send(now, r.type(), value);
} catch (Exception e) {
log.debug("Failed to send report for event-type '{}'", r.type(), e);
}
}
}
}
}

View File

@@ -0,0 +1,57 @@
package io.kestra.core.reporter;
import io.kestra.core.reporter.Reportable.ReportingSchedule;
import java.time.Duration;
import java.time.Instant;
/**
* Utility class providing common implementations of {@link Reportable.ReportingSchedule}.
*/
public class Schedules {
/**
* Creates a reporting schedule that triggers after the specified period has elapsed
* since the last execution.
*
* @param period the duration between successive runs; must be positive
* @return a {@link Reportable.ReportingSchedule} that runs at the given interval
* @throws IllegalArgumentException if {@code period} is zero or negative
*/
public static ReportingSchedule every(final Duration period) {
if (period.isZero() || period.isNegative()) {
throw new IllegalArgumentException("Period must be positive");
}
return new ReportingSchedule() {
private Instant lastRun = Instant.EPOCH;
@Override
public boolean shouldRun(Instant now) {
if (Duration.between(lastRun, now).compareTo(period) >= 0) {
lastRun = now;
return true;
}
return false;
}
};
}
/**
* Creates a reporting schedule that triggers once every hour.
*
* @return a schedule running every 1 hour
*/
public static ReportingSchedule hourly() {
return every(Duration.ofHours(1));
}
/**
* Creates a reporting schedule that triggers once every day.
*
* @return a schedule running every 24 hours
*/
public static ReportingSchedule daily() {
return every(Duration.ofDays(1));
}
}

View File

@@ -0,0 +1,31 @@
package io.kestra.core.reporter;
import com.fasterxml.jackson.annotation.JsonUnwrapped;
import io.kestra.core.models.ServerType;
import lombok.Builder;
import java.time.ZoneId;
import java.time.ZonedDateTime;
/**
* Represents a Kestra Server Event.
*/
@Builder(toBuilder = true)
public record ServerEvent(
String instanceUuid,
String sessionUuid,
ServerType serverType,
String serverVersion,
ZoneId zoneId,
Object payload,
String uuid,
ZonedDateTime reportedAt
) {
@JsonUnwrapped
public Object payload() {
return payload;
}
}

View File

@@ -0,0 +1,91 @@
package io.kestra.core.reporter;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.models.collectors.Result;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.services.InstanceService;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.VersionProvider;
import io.micronaut.context.annotation.Value;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.annotation.Client;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.hateoas.JsonError;
import io.micronaut.reactor.http.client.ReactorHttpClient;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.net.URI;
import java.time.Instant;
import java.time.ZoneId;
import java.util.UUID;
@Singleton
@Slf4j
public class ServerEventSender {
private static final String SESSION_UUID = IdUtils.create();
private static final ObjectMapper OBJECT_MAPPER = JacksonMapper.ofJson();
@Inject
@Client
private ReactorHttpClient client;
@Inject
private VersionProvider versionProvider;
@Inject
private InstanceService instanceService;
private final ServerType serverType;
@Value("${kestra.anonymous-usage-report.uri}")
protected URI url;
public ServerEventSender( ) {
this.serverType = KestraContext.getContext().getServerType();
}
public void send(final Instant now, final Type type, Object event) {
ServerEvent serverEvent = ServerEvent
.builder()
.uuid(UUID.randomUUID().toString())
.sessionUuid(SESSION_UUID)
.instanceUuid(instanceService.fetch())
.serverType(serverType)
.serverVersion(versionProvider.getVersion())
.reportedAt(now.atZone(ZoneId.systemDefault()))
.payload(event)
.zoneId(ZoneId.systemDefault())
.build();
try {
MutableHttpRequest<ServerEvent> request = this.request(serverEvent, type);
if (log.isTraceEnabled()) {
log.trace("Report anonymous usage: '{}'", OBJECT_MAPPER.writeValueAsString(serverEvent));
}
this.handleResponse(client.toBlocking().retrieve(request, Argument.of(Result.class), Argument.of(JsonError.class)));
} catch (HttpClientResponseException t) {
log.trace("Unable to report anonymous usage with body '{}'", t.getResponse().getBody(String.class), t);
} catch (Exception t) {
log.trace("Unable to handle anonymous usage", t);
}
}
private void handleResponse (Result result){
}
protected MutableHttpRequest<ServerEvent> request(ServerEvent event, Type type) throws Exception {
URI baseUri = URI.create(this.url.toString().endsWith("/") ? this.url.toString() : this.url + "/");
URI resolvedUri = baseUri.resolve(type.name().toLowerCase());
return HttpRequest.POST(resolvedUri, event)
.header("User-Agent", "Kestra/" + versionProvider.getVersion());
}
}

View File

@@ -0,0 +1,9 @@
package io.kestra.core.reporter;
/**
* A reportable event type.
*/
public interface Type {
String name();
}

View File

@@ -0,0 +1,12 @@
package io.kestra.core.reporter;
/**
* All supported reportable event type.
*/
public enum Types implements Type {
USAGE,
SYSTEM_INFORMATION,
PLUGIN_METRICS,
SERVICE_USAGE,
PLUGIN_USAGE;
}

View File

@@ -0,0 +1,6 @@
package io.kestra.core.reporter.model;
public record Count(
long count
) {
}

View File

@@ -0,0 +1,80 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.models.collectors.ExecutionUsage;
import io.kestra.core.models.collectors.FlowUsage;
import io.kestra.core.reporter.AbstractReportable;
import io.kestra.core.reporter.Schedules;
import io.kestra.core.reporter.Types;
import io.kestra.core.reporter.model.Count;
import io.kestra.core.repositories.DashboardRepositoryInterface;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.micronaut.core.annotation.Introspected;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Getter;
import lombok.experimental.SuperBuilder;
import lombok.extern.jackson.Jacksonized;
import java.time.Instant;
import java.util.Objects;
@Singleton
public class FeatureUsageReport extends AbstractReportable<FeatureUsageReport.UsageEvent> {
private final FlowRepositoryInterface flowRepository;
private final ExecutionRepositoryInterface executionRepository;
private final DashboardRepositoryInterface dashboardRepository;
private final boolean enabled;
@Inject
public FeatureUsageReport(FlowRepositoryInterface flowRepository,
ExecutionRepositoryInterface executionRepository,
DashboardRepositoryInterface dashboardRepository) {
super(Types.USAGE, Schedules.hourly(), true);
this.flowRepository = flowRepository;
this.executionRepository = executionRepository;
this.dashboardRepository = dashboardRepository;
ServerType serverType = KestraContext.getContext().getServerType();
this.enabled = ServerType.EXECUTOR.equals(serverType) || ServerType.STANDALONE.equals(serverType);
}
@Override
public UsageEvent report(final Instant now, TimeInterval interval) {
return UsageEvent
.builder()
.flows(FlowUsage.of(flowRepository))
.executions(ExecutionUsage.of(executionRepository, interval.from(), interval.to()))
.dashboards(new Count(dashboardRepository.count()))
.build();
}
@Override
public boolean isEnabled() {
return enabled;
}
@Override
public UsageEvent report(Instant now, TimeInterval interval, String tenant) {
Objects.requireNonNull(tenant, "tenant is null");
Objects.requireNonNull(interval, "interval is null");
return UsageEvent
.builder()
.flows(FlowUsage.of(tenant, flowRepository))
.executions(ExecutionUsage.of(tenant, executionRepository, interval.from(), interval.to()))
.build();
}
@SuperBuilder(toBuilder = true)
@Getter
@Jacksonized
@Introspected
public static class UsageEvent implements Event {
private ExecutionUsage executions;
private FlowUsage flows;
private Count dashboards;
}
}

View File

@@ -0,0 +1,105 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.ServerType;
import io.kestra.core.models.collectors.PluginMetric;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.reporter.AbstractReportable;
import io.kestra.core.reporter.Schedules;
import io.kestra.core.reporter.Types;
import io.kestra.core.utils.ListUtils;
import io.micrometer.core.instrument.Timer;
import io.micronaut.core.annotation.Introspected;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Builder;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@Singleton
public class PluginMetricReport extends AbstractReportable<PluginMetricReport.PluginMetricEvent> {
private final PluginRegistry pluginRegistry;
private final MetricRegistry metricRegistry;
private final boolean enabled;
@Inject
public PluginMetricReport(PluginRegistry pluginRegistry,
MetricRegistry metricRegistry) {
super(Types.PLUGIN_METRICS, Schedules.daily(), false);
this.metricRegistry = metricRegistry;
this.pluginRegistry = pluginRegistry;
ServerType serverType = KestraContext.getContext().getServerType();
this.enabled = ServerType.SCHEDULER.equals(serverType) || ServerType.WORKER.equals(serverType) || ServerType.STANDALONE.equals(serverType);
}
@Override
public PluginMetricEvent report(final Instant now, final TimeInterval period) {
return PluginMetricEvent
.builder()
.pluginMetrics(pluginMetrics())
.build();
}
@Override
public boolean isEnabled() {
return enabled;
}
@Builder
@Introspected
public record PluginMetricEvent (
List<PluginMetric> pluginMetrics
) implements Event {
}
private List<PluginMetric> pluginMetrics() {
List<PluginMetric> taskMetrics = pluginRegistry.plugins().stream()
.flatMap(registeredPlugin -> registeredPlugin.getTasks().stream())
.map(Class::getName)
.map(this::taskMetric)
.flatMap(Optional::stream)
.toList();
List<PluginMetric> triggerMetrics = pluginRegistry.plugins().stream()
.flatMap(registeredPlugin -> registeredPlugin.getTriggers().stream())
.map(Class::getName)
.map(this::triggerMetric)
.flatMap(Optional::stream)
.toList();
return ListUtils.concat(taskMetrics, triggerMetrics);
}
private Optional<PluginMetric> taskMetric(String type) {
Timer duration = metricRegistry.find(MetricRegistry.METRIC_WORKER_ENDED_DURATION).tag(MetricRegistry.TAG_TASK_TYPE, type).timer();
return fromTimer(type, duration);
}
private Optional<PluginMetric> triggerMetric(String type) {
Timer duration = metricRegistry.find(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION).tag(MetricRegistry.TAG_TRIGGER_TYPE, type).timer();
if (duration == null) {
// this may be because this is a trigger executed by the scheduler, we search there instead
duration = metricRegistry.find(MetricRegistry.METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION).tag(MetricRegistry.TAG_TRIGGER_TYPE, type).timer();
}
return fromTimer(type, duration);
}
private Optional<PluginMetric> fromTimer(String type, Timer timer) {
if (timer == null || timer.count() == 0) {
return Optional.empty();
}
double count = timer.count();
double totalTime = timer.totalTime(TimeUnit.MILLISECONDS);
double meanTime = timer.mean(TimeUnit.MILLISECONDS);
return Optional.of(new PluginMetric(type, count, totalTime, meanTime));
}
}

View File

@@ -0,0 +1,51 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.models.collectors.PluginUsage;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.reporter.AbstractReportable;
import io.kestra.core.reporter.Schedules;
import io.kestra.core.reporter.Types;
import io.micronaut.core.annotation.Introspected;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Builder;
import java.time.Instant;
import java.util.List;
@Singleton
public class PluginUsageReport extends AbstractReportable<PluginUsageReport.PluginUsageEvent> {
private final PluginRegistry pluginRegistry;
private final boolean enabled;
@Inject
public PluginUsageReport(PluginRegistry pluginRegistry) {
super(Types.PLUGIN_USAGE, Schedules.daily(), false);
this.pluginRegistry = pluginRegistry;
ServerType serverType = KestraContext.getContext().getServerType();
this.enabled = ServerType.EXECUTOR.equals(serverType) || ServerType.STANDALONE.equals(serverType);
}
@Override
public PluginUsageEvent report(final Instant now, final TimeInterval period) {
return PluginUsageEvent
.builder()
.plugins(PluginUsage.of(pluginRegistry))
.build();
}
@Override
public boolean isEnabled() {
return enabled;
}
@Builder
@Introspected
public record PluginUsageEvent(
List<PluginUsage> plugins
) implements Event {
}
}

View File

@@ -0,0 +1,53 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.contexts.KestraContext;
import io.kestra.core.models.ServerType;
import io.kestra.core.models.collectors.ServiceUsage;
import io.kestra.core.reporter.AbstractReportable;
import io.kestra.core.reporter.Schedules;
import io.kestra.core.reporter.Types;
import io.kestra.core.repositories.ServiceInstanceRepositoryInterface;
import io.micronaut.core.annotation.Introspected;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Builder;
import java.time.Duration;
import java.time.Instant;
@Singleton
public class ServiceUsageReport extends AbstractReportable<ServiceUsageReport.ServiceUsageEvent> {
private final ServiceInstanceRepositoryInterface serviceInstanceRepository;
private final boolean isEnabled;
@Inject
public ServiceUsageReport(ServiceInstanceRepositoryInterface serviceInstanceRepository) {
super(Types.SERVICE_USAGE, Schedules.daily(), false);
this.serviceInstanceRepository = serviceInstanceRepository;
ServerType serverType = KestraContext.getContext().getServerType();
this.isEnabled = ServerType.STANDALONE.equals(serverType) || ServerType.EXECUTOR.equals(serverType);
}
@Override
public ServiceUsageEvent report(final Instant now, final TimeInterval period) {
return ServiceUsageEvent
.builder()
.services(ServiceUsage.of(period.from().toInstant(), period.to().toInstant(), serviceInstanceRepository, Duration.ofMinutes(5)))
.build();
}
@Override
public boolean isEnabled() {
return isEnabled;
}
@Builder
@Introspected
public record ServiceUsageEvent(
ServiceUsage services
) implements Event {
}
}

View File

@@ -0,0 +1,63 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.models.collectors.ConfigurationUsage;
import io.kestra.core.models.collectors.HostUsage;
import io.kestra.core.reporter.AbstractReportable;
import io.kestra.core.reporter.Schedules;
import io.kestra.core.reporter.Types;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.core.annotation.Introspected;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Builder;
import java.lang.management.ManagementFactory;
import java.time.Instant;
import java.util.Set;
@Singleton
public class SystemInformationReport extends AbstractReportable<SystemInformationReport.SystemInformationEvent> {
private final Environment environment;
private final ApplicationContext applicationContext;
private final String kestraUrl;
private final Instant startTime;
@Inject
public SystemInformationReport(ApplicationContext applicationContext) {
super(Types.SYSTEM_INFORMATION, Schedules.daily(), false);
this.environment = applicationContext.getEnvironment();
this.applicationContext = applicationContext;
this.kestraUrl = applicationContext.getProperty("kestra.url", String.class).orElse(null);
this.startTime = Instant.ofEpochMilli(ManagementFactory.getRuntimeMXBean().getStartTime());
}
@Override
public SystemInformationEvent report(final Instant now, final TimeInterval timeInterval) {
return SystemInformationEvent
.builder()
.environments(environment.getActiveNames())
.configurations(ConfigurationUsage.of(applicationContext))
.startTime(startTime)
.host(HostUsage.of())
.uri(kestraUrl)
.build();
}
@Override
public boolean isEnabled() {
return true;
}
@Builder
@Introspected
public record SystemInformationEvent(
Set<String> environments,
HostUsage host,
ConfigurationUsage configurations,
Instant startTime,
String uri
) implements Event {
}
}

View File

@@ -16,6 +16,14 @@ import java.util.Map;
import java.util.Optional;
public interface DashboardRepositoryInterface {
/**
* Gets the total number of Dashboards.
*
* @return the total number.
*/
long count();
Boolean isEnabled();
Optional<Dashboard> get(String tenantId, String id);

View File

@@ -237,9 +237,9 @@ public class ExecutorService {
try {
state = flowableParent.resolveState(runContext, execution, parentTaskRun);
} catch (Exception e) {
// This will lead to the next task being still executed but at least Kestra will not crash.
// This will lead to the next task being still executed, but at least Kestra will not crash.
// This is the best we can do, Flowable task should not fail, so it's a kind of panic mode.
runContext.logger().error("Unable to resolve state from the Flowable task: " + e.getMessage(), e);
runContext.logger().error("Unable to resolve state from the Flowable task: {}", e.getMessage(), e);
state = Optional.of(State.Type.FAILED);
}
Optional<WorkerTaskResult> endedTask = childWorkerTaskTypeToWorkerTask(
@@ -380,11 +380,9 @@ public class ExecutorService {
if (flow.getOutputs() != null) {
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
try {
Map<String, Object> outputs = flow.getOutputs()
.stream()
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
outputs = runContext.render(outputs);
Map<String, Object> outputs = FlowInputOutput.renderFlowOutputs(flow.getOutputs(), runContext);
outputs = flowInputOutput.typedOutputs(flow, executor.getExecution(), outputs);
newExecution = newExecution.withOutputs(outputs);
} catch (Exception e) {
@@ -589,6 +587,23 @@ public class ExecutorService {
list = list.stream().filter(workerTaskResult -> !workerTaskResult.getTaskRun().getId().equals(taskRun.getParentTaskRunId()))
.collect(Collectors.toCollection(ArrayList::new));
}
// If the task is a flowable and its terminated, check that all children are terminated.
// This may not be the case for parallel flowable tasks like Parallel, Dag, ForEach...
// After a fail task, some child flowable may not be correctly terminated.
if (task instanceof FlowableTask<?> && taskRun.getState().isTerminated()) {
List<TaskRun> updated = executor.getExecution().findChildren(taskRun).stream()
.filter(child -> !child.getState().isTerminated())
.map(throwFunction(child -> child.withState(taskRun.getState().getCurrent())))
.toList();
if (!updated.isEmpty()) {
Execution execution = executor.getExecution();
for (TaskRun child : updated) {
execution = execution.withTaskRun(child);
}
executor = executor.withExecution(execution, "handledTerminatedFlowableTasks");
}
}
}
metricRegistry

View File

@@ -2,7 +2,6 @@ package io.kestra.core.runners;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.encryption.EncryptionService;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.KestraRuntimeException;
@@ -12,11 +11,14 @@ import io.kestra.core.models.flows.DependsOn;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.Output;
import io.kestra.core.models.flows.RenderableInput;
import io.kestra.core.models.flows.Type;
import io.kestra.core.models.flows.input.FileInput;
import io.kestra.core.models.flows.input.InputAndValue;
import io.kestra.core.models.flows.input.ItemTypeInterface;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.property.PropertyContext;
import io.kestra.core.models.property.URIFetcher;
import io.kestra.core.models.tasks.common.EncryptedString;
import io.kestra.core.models.validations.ManualConstraintViolation;
@@ -75,16 +77,19 @@ public class FlowInputOutput {
private final StorageInterface storageInterface;
private final Optional<String> secretKey;
private final RunContextFactory runContextFactory;
private final VariableRenderer variableRenderer;
@Inject
public FlowInputOutput(
StorageInterface storageInterface,
RunContextFactory runContextFactory,
VariableRenderer variableRenderer,
@Nullable @Value("${kestra.encryption.secret-key}") String secretKey
) {
this.storageInterface = storageInterface;
this.runContextFactory = runContextFactory;
this.secretKey = Optional.ofNullable(secretKey);
this.variableRenderer = variableRenderer;
}
/**
@@ -249,11 +254,7 @@ public class FlowInputOutput {
}
final Map<String, ResolvableInput> resolvableInputMap = Collections.unmodifiableMap(inputs.stream()
.map(input -> {
// get value or default
Object value = Optional.ofNullable((Object) data.get(input.getId())).orElseGet(input::getDefaults);
return ResolvableInput.of(input, value);
})
.map(input -> ResolvableInput.of(input,data.get(input.getId())))
.collect(Collectors.toMap(it -> it.get().input().getId(), Function.identity(), (o1, o2) -> o1, LinkedHashMap::new)));
resolvableInputMap.values().forEach(input -> resolveInputValue(input, flow, execution, resolvableInputMap));
@@ -312,8 +313,16 @@ public class FlowInputOutput {
});
resolvable.setInput(input);
Object value = resolvable.get().value();
// resolve default if needed
if (value == null && input.getDefaults() != null) {
value = resolveDefaultValue(input, runContext);
resolvable.isDefault(true);
}
// validate and parse input value
final Object value = resolvable.get().value();
if (value == null) {
if (input.getRequired()) {
resolvable.resolveWithError(input.toConstraintViolationException("missing required input", null));
@@ -341,7 +350,33 @@ public class FlowInputOutput {
return resolvable.get();
}
public static Object resolveDefaultValue(Input<?> input, PropertyContext renderer) throws IllegalVariableEvaluationException {
return switch (input.getType()) {
case STRING, ENUM, SELECT, SECRET, EMAIL -> resolveDefaultPropertyAs(input, renderer, String.class);
case INT -> resolveDefaultPropertyAs(input, renderer, Integer.class);
case FLOAT -> resolveDefaultPropertyAs(input, renderer, Float.class);
case BOOLEAN, BOOL -> resolveDefaultPropertyAs(input, renderer, Boolean.class);
case DATETIME -> resolveDefaultPropertyAs(input, renderer, Instant.class);
case DATE -> resolveDefaultPropertyAs(input, renderer, LocalDate.class);
case TIME -> resolveDefaultPropertyAs(input, renderer, LocalTime.class);
case DURATION -> resolveDefaultPropertyAs(input, renderer, Duration.class);
case FILE, URI -> resolveDefaultPropertyAs(input, renderer, URI.class);
case JSON, YAML -> resolveDefaultPropertyAs(input, renderer, Object.class);
case ARRAY -> resolveDefaultPropertyAsList(input, renderer, Object.class);
case MULTISELECT -> resolveDefaultPropertyAsList(input, renderer, String.class);
};
}
@SuppressWarnings("unchecked")
private static <T> Object resolveDefaultPropertyAs(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException {
return Property.as((Property<T>) input.getDefaults(), renderer, clazz);
}
@SuppressWarnings("unchecked")
private static <T> Object resolveDefaultPropertyAsList(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException {
return Property.asList((Property<List<T>>) input.getDefaults(), renderer, clazz);
}
private RunContext buildRunContextForExecutionAndInputs(final FlowInterface flow, final Execution execution, Map<String, InputAndValue> dependencies) {
Map<String, Object> flattenInputs = MapUtils.flattenToNestedMap(dependencies.entrySet()
.stream()
@@ -368,7 +403,7 @@ public class FlowInputOutput {
final Map<String, Object> in
) {
if (flow.getOutputs() == null) {
return ImmutableMap.of();
return Map.of();
}
Map<String, Object> results = flow
.getOutputs()
@@ -376,6 +411,9 @@ public class FlowInputOutput {
.map(output -> {
Object current = in == null ? null : in.get(output.getId());
try {
if (current == null && Boolean.FALSE.equals(output.getRequired())) {
return Optional.of(new AbstractMap.SimpleEntry<>(output.getId(), null));
}
return parseData(execution, output, current)
.map(entry -> {
if (output.getType().equals(Type.SECRET)) {
@@ -406,7 +444,7 @@ public class FlowInputOutput {
if (data.getType() == null) {
return Optional.of(new AbstractMap.SimpleEntry<>(data.getId(), current));
}
final Type elementType = data instanceof ItemTypeInterface itemTypeInterface ? itemTypeInterface.getItemType() : null;
return Optional.of(new AbstractMap.SimpleEntry<>(
@@ -483,6 +521,30 @@ public class FlowInputOutput {
throw new Exception("Expected `" + type + "` but received `" + current + "` with errors:\n```\n" + e.getMessage() + "\n```");
}
}
public static Map<String, Object> renderFlowOutputs(List<Output> outputs, RunContext runContext) throws IllegalVariableEvaluationException {
if (outputs == null) return Map.of();
// render required outputs
Map<String, Object> outputsById = outputs
.stream()
.filter(output -> output.getRequired() == null || output.getRequired())
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
outputsById = runContext.render(outputsById);
// render optional outputs one by one to catch, log, and skip any error.
for (io.kestra.core.models.flows.Output output : outputs) {
if (Boolean.FALSE.equals(output.getRequired())) {
try {
outputsById.putAll(runContext.render(Map.of(output.getId(), output.getValue())));
} catch (Exception e) {
runContext.logger().warn("Failed to render optional flow output '{}'. Output is ignored.", output.getId(), e);
outputsById.put(output.getId(), null);
}
}
}
return outputsById;
}
/**
* Mutable wrapper to hold a flow's input, and it's resolved value.
@@ -511,22 +573,26 @@ public class FlowInputOutput {
return input;
}
public void isDefault(boolean isDefault) {
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), isDefault, this.input.exception());
}
public void setInput(final Input<?> input) {
this.input = new InputAndValue(input, this.input.value(), this.input.enabled(), this.input.exception());
this.input = new InputAndValue(input, this.input.value(), this.input.enabled(), this.input.isDefault(), this.input.exception());
}
public void resolveWithEnabled(boolean enabled) {
this.input = new InputAndValue(this.input.input(), input.value(), enabled, this.input.exception());
this.input = new InputAndValue(this.input.input(), input.value(), enabled, this.input.isDefault(), this.input.exception());
markAsResolved();
}
public void resolveWithValue(@Nullable Object value) {
this.input = new InputAndValue(this.input.input(), value, this.input.enabled(), this.input.exception());
this.input = new InputAndValue(this.input.input(), value, this.input.enabled(), this.input.isDefault(), this.input.exception());
markAsResolved();
}
public void resolveWithError(@Nullable ConstraintViolationException exception) {
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), exception);
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), this.input.isDefault(), exception);
markAsResolved();
}

View File

@@ -6,6 +6,7 @@ import io.kestra.core.encryption.EncryptionService;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.executions.AbstractMetricEntry;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.property.PropertyContext;
import io.kestra.core.storages.StateStore;
import io.kestra.core.storages.Storage;
import io.kestra.core.storages.kv.KVStore;
@@ -18,7 +19,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
public abstract class RunContext {
public abstract class RunContext implements PropertyContext {
/**
* Returns the trigger execution id attached to this context.

View File

@@ -9,6 +9,7 @@ import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Type;
import io.kestra.core.models.property.PropertyContext;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.plugins.PluginConfigurations;
@@ -77,7 +78,7 @@ public class RunContextFactory {
public RunContextInitializer initializer() {
return applicationContext.getBean(RunContextInitializer.class);
}
public RunContext of(FlowInterface flow, Execution execution) {
return of(flow, execution, Function.identity());
}
@@ -98,7 +99,7 @@ public class RunContextFactory {
.withDecryptVariables(true)
.withSecretInputs(secretInputsFromFlow(flow))
)
.build(runContextLogger))
.build(runContextLogger, PropertyContext.create(variableRenderer)))
.withSecretInputs(secretInputsFromFlow(flow))
.build();
}
@@ -127,7 +128,7 @@ public class RunContextFactory {
.withTaskRun(taskRun)
.withDecryptVariables(decryptVariables)
.withSecretInputs(secretInputsFromFlow(flow))
.build(runContextLogger))
.build(runContextLogger, PropertyContext.create(variableRenderer)))
.withKvStoreService(kvStoreService)
.withSecretInputs(secretInputsFromFlow(flow))
.withTask(task)
@@ -146,7 +147,7 @@ public class RunContextFactory {
.withFlow(flow)
.withTrigger(trigger)
.withSecretInputs(secretInputsFromFlow(flow))
.build(runContextLogger)
.build(runContextLogger, PropertyContext.create(variableRenderer))
)
.withSecretInputs(secretInputsFromFlow(flow))
.withTrigger(trigger)

View File

@@ -4,15 +4,11 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.Validator;
import lombok.extern.slf4j.Slf4j;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static io.kestra.core.utils.Rethrow.throwFunction;
@@ -27,12 +23,19 @@ public class RunContextProperty<T> {
private final RunContext runContext;
private final Task task;
private final AbstractTrigger trigger;
private final boolean skipCache;
RunContextProperty(Property<T> property, RunContext runContext) {
this(property, runContext, false);
}
RunContextProperty(Property<T> property, RunContext runContext, boolean skipCache) {
this.property = property;
this.runContext = runContext;
this.task = ((DefaultRunContext) runContext).getTask();
this.trigger = ((DefaultRunContext) runContext).getTrigger();
this.skipCache = skipCache;
}
private void validate() {
@@ -45,6 +48,19 @@ public class RunContextProperty<T> {
log.trace("Unable to do validation: no task or trigger found");
}
}
/**
* Returns a new {@link RunContextProperty} that will always be rendered by evaluating
* its original Pebble expression, without using any previously cached value.
* <p>
* This ensures that each time the property is rendered, the underlying
* expression is re-evaluated to produce a fresh result.
*
* @return a new {@link Property} that bypasses the cache
*/
public RunContextProperty<T> skipCache() {
return new RunContextProperty<>(this.property, this.runContext, true);
}
/**
* Render a property then convert it to its target type and validate it.<br>
@@ -55,13 +71,13 @@ public class RunContextProperty<T> {
* Warning, due to the caching mechanism, this method is not thread-safe.
*/
public Optional<T> as(Class<T> clazz) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.as(prop, this.runContext, clazz)));
validate();
return as;
}
/**
* Render a property with additional variables, then convert it to its target type and validate it.<br>
*
@@ -71,7 +87,7 @@ public class RunContextProperty<T> {
* Warning, due to the caching mechanism, this method is not thread-safe.
*/
public Optional<T> as(Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.as(prop, this.runContext, clazz, variables)));
validate();
@@ -89,7 +105,7 @@ public class RunContextProperty<T> {
*/
@SuppressWarnings("unchecked")
public <I> T asList(Class<I> itemClazz) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz)))
.orElse((T) Collections.emptyList());
@@ -108,7 +124,7 @@ public class RunContextProperty<T> {
*/
@SuppressWarnings("unchecked")
public <I> T asList(Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz, variables)))
.orElse((T) Collections.emptyList());
@@ -127,7 +143,7 @@ public class RunContextProperty<T> {
*/
@SuppressWarnings("unchecked")
public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass)))
.orElse((T) Collections.emptyMap());
@@ -146,11 +162,15 @@ public class RunContextProperty<T> {
*/
@SuppressWarnings("unchecked")
public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property)
var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass, variables)))
.orElse((T) Collections.emptyMap());
validate();
return as;
}
private Property<T> getProperty() {
return skipCache ? this.property.skipCache() : this.property;
}
}

View File

@@ -1,6 +1,7 @@
package io.kestra.core.runners;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
@@ -9,6 +10,7 @@ import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.input.SecretInput;
import io.kestra.core.models.property.PropertyContext;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.utils.ListUtils;
@@ -138,10 +140,10 @@ public final class RunVariables {
* @param logger The {@link RunContextLogger logger}
* @return The immutable map of variables.
*/
Map<String, Object> build(final RunContextLogger logger);
Map<String, Object> build(RunContextLogger logger, PropertyContext propertyContext);
}
public record KestraConfiguration(String environment, String url) { }
public record KestraConfiguration(String environment, String url) { }
/**
* Default builder class for constructing variables.
@@ -174,7 +176,7 @@ public final class RunVariables {
// Note: for performance reason, cloning maps should be avoided as much as possible.
@Override
public Map<String, Object> build(final RunContextLogger logger) {
public Map<String, Object> build(final RunContextLogger logger, final PropertyContext propertyContext) {
ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
builder.put("envs", envs != null ? envs : Map.of());
@@ -280,9 +282,15 @@ public final class RunVariables {
if (flow != null && flow.getInputs() != null) {
// we add default inputs value from the flow if not already set, this will be useful for triggers
flow.getInputs().stream()
.filter(input -> input.getDefaults() != null && !inputs.containsKey(input.getId()))
.forEach(input -> inputs.put(input.getId(), input.getDefaults()));
flow.getInputs().stream()
.filter(input -> input.getDefaults() != null && !inputs.containsKey(input.getId()))
.forEach(input -> {
try {
inputs.put(input.getId(), FlowInputOutput.resolveDefaultValue(input, propertyContext));
} catch (IllegalVariableEvaluationException e) {
throw new RuntimeException("Unable to inject default value for input '" + input.getId() + "'", e);
}
});
}
if (!inputs.isEmpty()) {

View File

@@ -85,7 +85,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
@Inject
@Named(QueueFactoryInterface.WORKERJOB_NAMED)
private QueueInterface<WorkerJob> workerJobQueue;
private WorkerJobQueueInterface workerJobQueue;
@Inject
@Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED)
@@ -274,12 +274,11 @@ public class Worker implements Service, Runnable, AutoCloseable {
}
}));
this.receiveCancellations.addFirst(this.workerJobQueue.receive(
this.receiveCancellations.addFirst(this.workerJobQueue.subscribe(
this.id,
this.workerGroup,
Worker.class,
either -> {
pendingJobCount.incrementAndGet();
executorService.execute(() -> {
pendingJobCount.decrementAndGet();
runningJobCount.incrementAndGet();

View File

@@ -30,10 +30,7 @@ import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.server.ServiceType;
import io.kestra.core.services.*;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.Either;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.*;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.core.util.CollectionUtils;
@@ -92,7 +89,9 @@ public abstract class AbstractScheduler implements Scheduler, Service {
private volatile Boolean isReady = false;
private final ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> scheduledFuture;
private final ScheduledExecutorService executionMonitorExecutor = Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> executionMonitorFuture;
@Getter
protected SchedulerTriggerStateInterface triggerState;
@@ -153,7 +152,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
this.flowListeners.run();
this.flowListeners.listen(this::initializedTriggers);
ScheduledFuture<?> evaluationLoop = scheduleExecutor.scheduleAtFixedRate(
scheduledFuture = scheduleExecutor.scheduleAtFixedRate(
this::handle,
0,
1,
@@ -163,10 +162,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
// look at exception on the evaluation loop thread
Thread.ofVirtual().name("scheduler-evaluation-loop-watch").start(
() -> {
Await.until(evaluationLoop::isDone);
Await.until(scheduledFuture::isDone);
try {
evaluationLoop.get();
scheduledFuture.get();
} catch (CancellationException ignored) {
} catch (ExecutionException | InterruptedException e) {
@@ -178,7 +177,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
);
// Periodically report metrics and logs of running executions
ScheduledFuture<?> monitoringLoop = executionMonitorExecutor.scheduleWithFixedDelay(
executionMonitorFuture = executionMonitorExecutor.scheduleWithFixedDelay(
this::executionMonitor,
30,
10,
@@ -188,10 +187,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
// look at exception on the monitoring loop thread
Thread.ofVirtual().name("scheduler-monitoring-loop-watch").start(
() -> {
Await.until(monitoringLoop::isDone);
Await.until(executionMonitorFuture::isDone);
try {
monitoringLoop.get();
executionMonitorFuture.get();
} catch (CancellationException ignored) {
} catch (ExecutionException | InterruptedException e) {
@@ -1007,8 +1006,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
setState(ServiceState.TERMINATING);
this.receiveCancellations.forEach(Runnable::run);
this.scheduleExecutor.shutdown();
this.executionMonitorExecutor.shutdown();
ExecutorsUtils.closeScheduledThreadPool(this.scheduleExecutor, Duration.ofSeconds(5), List.of(scheduledFuture));
ExecutorsUtils.closeScheduledThreadPool(executionMonitorExecutor, Duration.ofSeconds(5), List.of(executionMonitorFuture));
try {
if (onClose != null) {
onClose.run();

View File

@@ -1,6 +1,7 @@
package io.kestra.core.server;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.utils.ExecutorsUtils;
import io.micronaut.core.annotation.Introspected;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
@@ -8,9 +9,11 @@ import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -25,6 +28,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
protected final ServerConfig serverConfig;
private final AtomicBoolean isStopped = new AtomicBoolean(false);
private ScheduledExecutorService scheduledExecutorService;
private ScheduledFuture<?> scheduledFuture;
private Instant lastScheduledExecution;
/**
@@ -98,7 +102,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, name));
Duration scheduleInterval = getScheduleInterval();
log.debug("Scheduling '{}' at fixed rate {}.", name, scheduleInterval);
scheduledExecutorService.scheduleAtFixedRate(
scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(
this,
0,
scheduleInterval.toSeconds(),
@@ -133,20 +137,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
@Override
public void close() {
if (isStopped.compareAndSet(false, true) && scheduledExecutorService != null) {
scheduledExecutorService.shutdown();
if (scheduledExecutorService.isTerminated()) {
return;
}
try {
if (!scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS)) {
log.debug("Failed to wait for scheduled '{}' task termination. Cause: Timeout", name);
}
log.debug("Stopped scheduled '{}' task.", name);
} catch (InterruptedException e) {
scheduledExecutorService.shutdownNow();
Thread.currentThread().interrupt();
log.debug("Failed to wait for scheduled '{}' task termination. Cause: Interrupted.", name);
}
ExecutorsUtils.closeScheduledThreadPool(scheduledExecutorService, Duration.ofSeconds(5), List.of(scheduledFuture));
}
}
}

View File

@@ -1,22 +0,0 @@
package io.kestra.core.services;
import io.micronaut.context.annotation.Requires;
import io.micronaut.scheduling.annotation.Scheduled;
import lombok.extern.slf4j.Slf4j;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@Singleton
@Slf4j
@Requires(property = "kestra.anonymous-usage-report.enabled", value = "true")
@Requires(property = "kestra.server-type")
public class CollectorScheduler {
@Inject
protected CollectorService collectorService;
@Scheduled(initialDelay = "${kestra.anonymous-usage-report.initial-delay}", fixedDelay = "${kestra.anonymous-usage-report.fixed-delay}")
public void report() {
collectorService.report();
}
}

View File

@@ -1,220 +0,0 @@
package io.kestra.core.services;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.ServerType;
import io.kestra.core.models.collectors.*;
import io.kestra.core.plugins.PluginRegistry;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.ServiceInstanceRepositoryInterface;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.VersionProvider;
import io.micrometer.core.instrument.Timer;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Value;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.annotation.Client;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.hateoas.JsonError;
import io.micronaut.reactor.http.client.ReactorHttpClient;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import java.lang.management.ManagementFactory;
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@Singleton
@Slf4j
public class CollectorService {
protected static final String UUID = IdUtils.create();
@Inject
@Client
protected ReactorHttpClient client;
@Inject
protected ApplicationContext applicationContext;
@Inject
private FlowRepositoryInterface flowRepository;
@Inject
private ExecutionRepositoryInterface executionRepository;
@Inject
protected InstanceService instanceService;
@Inject
protected VersionProvider versionProvider;
@Inject
protected PluginRegistry pluginRegistry;
@Nullable
@Value("${kestra.server-type}")
protected ServerType serverType;
@Nullable
@Value("${kestra.url:}")
protected String kestraUrl;
@Value("${kestra.anonymous-usage-report.uri}")
protected URI url;
@Inject
private ServiceInstanceRepositoryInterface serviceRepository;
@Inject
private MetricRegistry metricRegistry;
private transient Usage defaultUsage;
protected synchronized Usage defaultUsage() {
boolean first = defaultUsage == null;
if (first) {
defaultUsage = Usage.builder()
.startUuid(UUID)
.instanceUuid(instanceService.fetch())
.serverType(serverType)
.version(versionProvider.getVersion())
.zoneId(ZoneId.systemDefault())
.uri(kestraUrl == null ? null : kestraUrl)
.environments(applicationContext.getEnvironment().getActiveNames())
.startTime(Instant.ofEpochMilli(ManagementFactory.getRuntimeMXBean().getStartTime()))
.host(HostUsage.of())
.configurations(ConfigurationUsage.of(applicationContext))
.plugins(PluginUsage.of(pluginRegistry))
.build();
}
return defaultUsage;
}
public Usage metrics(boolean details) {
return metrics(details, serverType == ServerType.WORKER || serverType == ServerType.SCHEDULER || serverType == ServerType.STANDALONE);
}
public Usage metrics(boolean details, boolean metrics) {
ZonedDateTime to = ZonedDateTime.now();
ZonedDateTime from = to
.toLocalDate()
.atStartOfDay(ZoneId.systemDefault())
.minusDays(1);
return metrics(details, metrics, from, to);
}
public Usage metrics(boolean details, boolean metrics, ZonedDateTime from, ZonedDateTime to) {
Usage.UsageBuilder<?, ?> builder = defaultUsage()
.toBuilder()
.uuid(IdUtils.create());
if (details) {
builder = builder
.flows(FlowUsage.of(flowRepository))
.executions(ExecutionUsage.of(executionRepository, from, to))
.services(ServiceUsage.of(from.toInstant(), to.toInstant(), serviceRepository, Duration.ofMinutes(5)));
}
if (metrics) {
builder = builder.pluginMetrics(pluginMetrics());
}
return builder.build();
}
public void report() {
try {
Usage metrics = this.metrics(serverType == ServerType.EXECUTOR || serverType == ServerType.STANDALONE);
MutableHttpRequest<Usage> post = this.request(metrics);
if (log.isTraceEnabled()) {
log.trace("Report anonymous usage: '{}'", JacksonMapper.ofJson().writeValueAsString(metrics));
}
Result result = client.toBlocking()
.retrieve(
post,
Argument.of(Result.class),
Argument.of(JsonError.class)
);
this.handleResponse(result);
} catch (HttpClientResponseException t) {
log.debug("Unable to report anonymous usage with body '{}'", t.getResponse().getBody(String.class), t);
} catch (Exception t) {
log.debug("Unable to handle anonymous usage", t);
}
}
private void handleResponse(Result result) {
}
protected MutableHttpRequest<Usage> request(Usage metrics) throws Exception {
return HttpRequest.POST(this.url, metrics)
.header("User-Agent", "Kestra/" + versionProvider.getVersion());
}
private List<PluginMetric> pluginMetrics() {
List<PluginMetric> taskMetrics = pluginRegistry.plugins().stream()
.flatMap(registeredPlugin -> registeredPlugin.getTasks().stream())
.map(cls -> cls.getName())
.map(type -> taskMetric(type))
.filter(opt -> opt.isPresent())
.map(opt -> opt.get())
.toList();
List<PluginMetric> triggerMetrics = pluginRegistry.plugins().stream()
.flatMap(registeredPlugin -> registeredPlugin.getTriggers().stream())
.map(cls -> cls.getName())
.map(type -> triggerMetric(type))
.filter(opt -> opt.isPresent())
.map(opt -> opt.get())
.toList();
return ListUtils.concat(taskMetrics, triggerMetrics);
}
private Optional<PluginMetric> taskMetric(String type) {
Timer duration = metricRegistry.find(MetricRegistry.METRIC_WORKER_ENDED_DURATION).tag(MetricRegistry.TAG_TASK_TYPE, type).timer();
return fromTimer(type, duration);
}
private Optional<PluginMetric> triggerMetric(String type) {
Timer duration = metricRegistry.find(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION).tag(MetricRegistry.TAG_TRIGGER_TYPE, type).timer();
if (duration == null) {
// this may be because this is a trigger executed by the scheduler, we search there instead
duration = metricRegistry.find(MetricRegistry.METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION).tag(MetricRegistry.TAG_TRIGGER_TYPE, type).timer();
}
return fromTimer(type, duration);
}
private Optional<PluginMetric> fromTimer(String type, Timer timer) {
if (timer == null || timer.count() == 0) {
return Optional.empty();
}
double count = timer.count();
double totalTime = timer.totalTime(TimeUnit.MILLISECONDS);
double meanTime = timer.mean(TimeUnit.MILLISECONDS);
return Optional.of(new PluginMetric(type, count, totalTime, meanTime));
}
}

View File

@@ -1,6 +1,7 @@
package io.kestra.core.services;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKind;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
@@ -10,7 +11,6 @@ import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInte
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.utils.ListUtils;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.AllArgsConstructor;
import lombok.Getter;
@@ -24,14 +24,15 @@ import java.util.stream.Stream;
@Singleton
public class FlowTriggerService {
@Inject
private ConditionService conditionService;
private final ConditionService conditionService;
private final RunContextFactory runContextFactory;
private final FlowService flowService;
@Inject
private RunContextFactory runContextFactory;
@Inject
private FlowService flowService;
public FlowTriggerService(ConditionService conditionService, RunContextFactory runContextFactory, FlowService flowService) {
this.conditionService = conditionService;
this.runContextFactory = runContextFactory;
this.flowService = flowService;
}
// used in EE only
public Stream<FlowWithFlowTrigger> withFlowTriggersOnly(Stream<FlowWithSource> allFlows) {
@@ -53,6 +54,8 @@ public class FlowTriggerService {
List<FlowWithFlowTrigger> validTriggersBeforeMultipleConditionEval = allFlows.stream()
// prevent recursive flow triggers
.filter(flow -> flowService.removeUnwanted(flow, execution))
// filter out Test Executions
.filter(flow -> execution.getKind() == null)
// ensure flow & triggers are enabled
.filter(flow -> !flow.isDisabled() && !(flow instanceof FlowWithException))
.filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty())

View File

@@ -10,22 +10,34 @@ import lombok.extern.slf4j.Slf4j;
@Singleton
@Slf4j
public class InstanceService {
private final SettingRepositoryInterface settingRepository;
@Inject
private SettingRepositoryInterface settingRepository;
private Setting instanceIdSetting;
public InstanceService(SettingRepositoryInterface settingRepository) {
this.settingRepository = settingRepository;
}
private volatile Setting instanceIdSetting;
public String fetch() {
if (this.instanceIdSetting == null) {
instanceIdSetting = settingRepository
.findByKey(Setting.INSTANCE_UUID)
.orElseGet(() -> settingRepository.save(Setting.builder()
.key(Setting.INSTANCE_UUID)
.value(IdUtils.create())
.build()
));
synchronized (this) {
if (this.instanceIdSetting == null) {
instanceIdSetting = fetchInstanceUuid();
}
}
}
return this.instanceIdSetting.getValue().toString();
}
private Setting fetchInstanceUuid() {
return settingRepository
.findByKey(Setting.INSTANCE_UUID)
.orElseGet(() -> settingRepository.save(Setting.builder()
.key(Setting.INSTANCE_UUID)
.value(IdUtils.create())
.build()
));
}
}

View File

@@ -54,6 +54,18 @@ public interface StorageInterface extends AutoCloseable, Plugin {
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
InputStream get(String tenantId, @Nullable String namespace, URI uri) throws IOException;
/**
* Retrieves an input stream of a instance resource for the given storage URI.
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace of the object (may be null)
* @param uri the URI of the object to retrieve
* @return an InputStream to read the object's contents
* @throws IOException if the object cannot be read
*/
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
InputStream getInstanceResource(@Nullable String namespace, URI uri) throws IOException;
/**
* Retrieves a storage object along with its metadata.
*
@@ -91,6 +103,18 @@ public interface StorageInterface extends AutoCloseable, Plugin {
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
List<FileAttributes> list(String tenantId, @Nullable String namespace, URI uri) throws IOException;
/**
* Lists the attributes of all instance files and instance directories under the given URI.
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace (may be null)
* @param uri the URI to list
* @return a list of file attributes
* @throws IOException if the listing fails
*/
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
List<FileAttributes> listInstanceResource(@Nullable String namespace, URI uri) throws IOException;
/**
* Checks whether the given URI exists in the internal storage.
*
@@ -108,6 +132,23 @@ public interface StorageInterface extends AutoCloseable, Plugin {
}
}
/**
* Checks whether the given URI exists in the instance internal storage.
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace (may be null)
* @param uri the URI to check
* @return true if the URI exists, false otherwise
*/
@SuppressWarnings("try")
default boolean existsInstanceResource(@Nullable String namespace, URI uri) {
try (InputStream ignored = getInstanceResource(namespace, uri)) {
return true;
} catch (IOException ieo) {
return false;
}
}
/**
* Retrieves the metadata attributes for the given URI.
*
@@ -120,6 +161,18 @@ public interface StorageInterface extends AutoCloseable, Plugin {
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
FileAttributes getAttributes(String tenantId, @Nullable String namespace, URI uri) throws IOException;
/**
* Retrieves the metadata attributes for the given URI.
* n instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace (may be null)
* @param uri the URI of the object
* @return the file attributes
* @throws IOException if the attributes cannot be retrieved
*/
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
FileAttributes getInstanceAttributes(@Nullable String namespace, URI uri) throws IOException;
/**
* Stores data at the given URI.
*
@@ -148,34 +201,86 @@ public interface StorageInterface extends AutoCloseable, Plugin {
@Retryable(includes = {IOException.class})
URI put(String tenantId, @Nullable String namespace, URI uri, StorageObject storageObject) throws IOException;
/**
* Stores instance data at the given URI.
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace (may be null)
* @param uri the target URI
* @param data the input stream containing the data to store
* @return the URI of the stored object
* @throws IOException if storing fails
*/
@Retryable(includes = {IOException.class})
default URI putInstanceResource(@Nullable String namespace, URI uri, InputStream data) throws IOException {
return this.putInstanceResource(namespace, uri, new StorageObject(null, data));
}
/**
* Stores a instance storage object at the given URI.
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace (may be null)
* @param uri the target URI
* @param storageObject the storage object to store
* @return the URI of the stored object
* @throws IOException if storing fails
*/
@Retryable(includes = {IOException.class})
URI putInstanceResource(@Nullable String namespace, URI uri, StorageObject storageObject) throws IOException;
/**
* Deletes the object at the given URI.
*
* @param tenantId the tenant identifier (may be null for global deletion)
* @param tenantId the tenant identifier
* @param namespace the namespace (may be null)
* @param uri the URI of the object to delete
* @return true if deletion was successful
* @throws IOException if deletion fails
*/
@Retryable(includes = {IOException.class})
boolean delete(@Nullable String tenantId, @Nullable String namespace, URI uri) throws IOException;
boolean delete(String tenantId, @Nullable String namespace, URI uri) throws IOException;
/**
* Deletes the instance object at the given URI.
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace (may be null)
* @param uri the URI of the object to delete
* @return true if deletion was successful
* @throws IOException if deletion fails
*/
@Retryable(includes = {IOException.class})
boolean deleteInstanceResource(@Nullable String namespace, URI uri) throws IOException;
/**
* Creates a new directory at the given URI.
*
* @param tenantId the tenant identifier (optional)
* @param tenantId the tenant identifier
* @param namespace the namespace (optional)
* @param uri the URI of the directory to create
* @return the URI of the created directory
* @throws IOException if creation fails
*/
@Retryable(includes = {IOException.class})
URI createDirectory(@Nullable String tenantId, @Nullable String namespace, URI uri) throws IOException;
URI createDirectory(String tenantId, @Nullable String namespace, URI uri) throws IOException;
/**
* Creates a new instance directory at the given URI.
* An instance resource is a resource stored outside any tenant storage, accessible for the whole instance
*
* @param namespace the namespace
* @param uri the URI of the directory to create
* @return the URI of the created directory
* @throws IOException if creation fails
*/
@Retryable(includes = {IOException.class})
URI createInstanceDirectory(String namespace, URI uri) throws IOException;
/**
* Moves an object from one URI to another.
*
* @param tenantId the tenant identifier (optional)
* @param tenantId the tenant identifier
* @param namespace the namespace (optional)
* @param from the source URI
* @param to the destination URI
@@ -183,7 +288,7 @@ public interface StorageInterface extends AutoCloseable, Plugin {
* @throws IOException if moving fails
*/
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
URI move(@Nullable String tenantId, @Nullable String namespace, URI from, URI to) throws IOException;
URI move(String tenantId, @Nullable String namespace, URI from, URI to) throws IOException;
/**
* Deletes all objects that match the given URI prefix.
@@ -226,23 +331,32 @@ public interface StorageInterface extends AutoCloseable, Plugin {
}
/**
* Builds the internal storage path based on tenant ID and URI.
* Builds the internal storage path based on the URI.
*
* @param tenantId the tenant identifier (maybe null)
* @param uri the URI of the object
* @return a normalized internal path
*/
default String getPath(@Nullable String tenantId, URI uri) {
default String getPath(URI uri) {
if (uri == null) {
uri = URI.create("/");
}
parentTraversalGuard(uri);
String path = uri.getPath();
if (tenantId != null) {
path = tenantId + (path.startsWith("/") ? path : "/" + path);
}
path = path.replaceFirst("^/", "");
return path;
}
/**
* Builds the internal storage path based on tenant ID and URI.
*
* @param tenantId the tenant identifier
* @param uri the URI of the object
* @return a normalized internal path
*/
default String getPath(String tenantId, URI uri) {
String path = getPath(uri);
path = tenantId + (path.startsWith("/") ? path : "/" + path);
return path;
}

View File

@@ -27,14 +27,21 @@ public record TestSuiteRunResult(
public static TestSuiteRunResult of(String id, String testSuiteId, String namespace, String flowId, Instant startDate, Instant endDate, List<UnitTestResult> results) {
boolean allSkipped = true;
boolean anyFailed = false;
for (UnitTestResult result : results) {
if(!result.state().equals(TestState.SKIPPED)) {
allSkipped = false;
}
if(result.state().equals(TestState.ERROR) || result.state().equals(TestState.FAILED)) {
if (result.state().equals(TestState.FAILED)) {
anyFailed = true;
}
if (result.state().equals(TestState.ERROR)) {
return new TestSuiteRunResult(id, testSuiteId, namespace, flowId, result.state(), startDate, endDate, results);
}
}
if (anyFailed) {
return new TestSuiteRunResult(id, testSuiteId, namespace, flowId, TestState.FAILED, startDate, endDate, results);
}
var state = allSkipped ? TestState.SKIPPED : TestState.SUCCESS;
return new TestSuiteRunResult(id, testSuiteId, namespace, flowId, state, startDate, endDate, results);
}

View File

@@ -4,6 +4,7 @@ import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -13,6 +14,7 @@ import java.util.List;
@Builder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
public class UnitTest {
@NotNull
private String id;

View File

@@ -3,12 +3,16 @@ package io.kestra.core.utils;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.*;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
@Singleton
@Slf4j
public class ExecutorsUtils {
@Inject
private ThreadMainFactoryBuilder threadFactoryBuilder;
@@ -61,6 +65,29 @@ public class ExecutorsUtils {
);
}
public static void closeScheduledThreadPool(ScheduledExecutorService scheduledExecutorService, Duration gracePeriod, List<ScheduledFuture<?>> taskFutures) {
scheduledExecutorService.shutdown();
if (scheduledExecutorService.isTerminated()) {
return;
}
try {
if (!scheduledExecutorService.awaitTermination(gracePeriod.toMillis(), TimeUnit.MILLISECONDS)) {
log.warn("Failed to shutdown the ScheduledThreadPoolExecutor during grace period, forcing it to shutdown now");
// Ensure the scheduled task reaches a terminal state to avoid possible memory leak
ListUtils.emptyOnNull(taskFutures).forEach(taskFuture -> taskFuture.cancel(true));
scheduledExecutorService.shutdownNow();
}
log.debug("Stopped scheduled ScheduledThreadPoolExecutor.");
} catch (InterruptedException e) {
scheduledExecutorService.shutdownNow();
Thread.currentThread().interrupt();
log.debug("Failed to shutdown the ScheduledThreadPoolExecutor.");
}
}
private ExecutorService wrap(String name, ExecutorService executorService) {
return ExecutorServiceMetrics.monitor(
meterRegistry,

View File

@@ -1,7 +1,10 @@
package io.kestra.core.validations.validator;
import io.kestra.core.models.flows.input.FileInput;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.property.PropertyContext;
import io.kestra.core.runners.LocalPath;
import io.kestra.core.runners.VariableRenderer;
import io.kestra.core.storages.Namespace;
import io.kestra.core.validations.FileInputValidation;
import io.micronaut.core.annotation.AnnotationValue;
@@ -10,24 +13,43 @@ import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.validation.validator.constraints.ConstraintValidator;
import io.micronaut.validation.validator.constraints.ConstraintValidatorContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.net.URI;
@Singleton
@Introspected
public class FileInputValidator implements ConstraintValidator<FileInputValidation, FileInput> {
@Inject
VariableRenderer variableRenderer;
@Override
public boolean isValid(@Nullable FileInput value, @NonNull AnnotationValue<FileInputValidation> annotationMetadata, @NonNull ConstraintValidatorContext context) {
if (value == null) {
return true; // nulls are allowed according to spec
}
if (value.getDefaults() != null && !LocalPath.FILE_SCHEME.equals(value.getDefaults().getScheme()) && !Namespace.NAMESPACE_FILE_SCHEME.equals(value.getDefaults().getScheme())) {
context.disableDefaultConstraintViolation();
context.buildConstraintViolationWithTemplate("inputs of type 'FILE' only support `defaults` as local files using a file URI or as namespace files using a nsfile URI")
.addConstraintViolation();
return false;
if (value.getDefaults() != null) {
PropertyContext propertyContext = PropertyContext.create(variableRenderer);
try {
URI uri = Property.as(value.getDefaults(), propertyContext, URI.class);
if (uri != null && !LocalPath.FILE_SCHEME.equals(uri.getScheme()) && !Namespace.NAMESPACE_FILE_SCHEME.equals(uri.getScheme())) {
context.disableDefaultConstraintViolation();
context
.buildConstraintViolationWithTemplate("inputs of type 'FILE' only support `defaults` as local files using a file URI or as namespace files using a nsfile URI")
.addConstraintViolation();
return false;
}
} catch (Exception ignore) {
context.disableDefaultConstraintViolation();
context
.buildConstraintViolationWithTemplate("inputs of type 'FILE' only support `defaults` with expression that can be rendered immediately")
.addConstraintViolation();
return false;
}
}
return true;
}
}

View File

@@ -90,7 +90,7 @@ public class ExecutionOutputs extends Condition implements ScheduleCondition {
private static final String OUTPUTS_VAR = "outputs";
@NotNull
private Property<String> expression;
private Property<Boolean> expression;
/** {@inheritDoc} **/
@SuppressWarnings("unchecked")
@@ -105,9 +105,8 @@ public class ExecutionOutputs extends Condition implements ScheduleCondition {
conditionContext.getVariables(),
Map.of(TRIGGER_VAR, Map.of(OUTPUTS_VAR, conditionContext.getExecution().getOutputs()))
);
String render = conditionContext.getRunContext().render(expression).as(String.class, variables).orElseThrow();
return !(render.isBlank() || render.trim().equals("false"));
return conditionContext.getRunContext().render(expression).skipCache().as(Boolean.class, variables).orElseThrow();
}
private boolean hasNoOutputs(final Execution execution) {

View File

@@ -535,19 +535,13 @@ public class ForEachItem extends Task implements FlowableTask<VoidOutput>, Child
// We only resolve subflow outputs for an execution result when the execution is terminated.
if (taskRun.getState().isTerminated() && flow.getOutputs() != null && waitForExecution()) {
final Map<String, Object> outputs = flow.getOutputs()
.stream()
.collect(Collectors.toMap(
io.kestra.core.models.flows.Output::getId,
io.kestra.core.models.flows.Output::getValue)
);
final ForEachItem.Output.OutputBuilder builder = Output
.builder()
.iterations((Map<State.Type, Integer>) taskRun.getOutputs().get(ExecutableUtils.TASK_VARIABLE_ITERATIONS))
.numberOfBatches((Integer) taskRun.getOutputs().get(ExecutableUtils.TASK_VARIABLE_NUMBER_OF_BATCHES));
try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
FileSerde.write(bos, runContext.render(outputs));
FileSerde.write(bos, FlowInputOutput.renderFlowOutputs(flow.getOutputs(), runContext));
URI uri = runContext.storage().putFile(
new ByteArrayInputStream(bos.toByteArray()),
URI.create((String) taskRun.getOutputs().get("uri"))

View File

@@ -8,18 +8,18 @@ import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.executions.Variables;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.ExecutableUtils;
import io.kestra.core.runners.FlowMetaStoreInterface;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.ExecutableUtils;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.FlowMetaStoreInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.SubflowExecution;
import io.kestra.core.runners.SubflowExecutionResult;
@@ -30,20 +30,22 @@ import io.kestra.core.storages.StorageContext;
import io.kestra.core.validations.NoSystemLabelValidation;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.Min;
import lombok.experimental.SuperBuilder;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.experimental.SuperBuilder;
import org.slf4j.event.Level;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.stream.Collectors;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@SuperBuilder
@ToString
@@ -218,20 +220,26 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
.<Boolean>pluginConfiguration(PLUGIN_FLOW_OUTPUTS_ENABLED)
.orElse(true);
final Map<String, Object> subflowOutputs = Optional
.ofNullable(flow.getOutputs())
.map(outputs -> outputs
.stream()
.collect(Collectors.toMap(
io.kestra.core.models.flows.Output::getId,
io.kestra.core.models.flows.Output::getValue)
List<io.kestra.core.models.flows.Output> subflowOutputs = flow.getOutputs();
// region [deprecated] Subflow outputs feature
if (subflowOutputs == null && isOutputsAllowed && this.getOutputs() != null) {
subflowOutputs = this.getOutputs().entrySet().stream()
.<io.kestra.core.models.flows.Output>map(entry -> io.kestra.core.models.flows.Output
.builder()
.id(entry.getKey())
.value(entry.getValue())
.required(true)
.build()
)
)
.orElseGet(() -> isOutputsAllowed ? this.getOutputs() : null);
.toList();
}
//endregion
if (subflowOutputs != null) {
if (subflowOutputs != null && !subflowOutputs.isEmpty()) {
try {
Map<String, Object> outputs = runContext.render(subflowOutputs);
Map<String, Object> outputs = FlowInputOutput.renderFlowOutputs(subflowOutputs, runContext);
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
if (flow.getOutputs() != null && flowInputOutput != null) {
outputs = flowInputOutput.typedOutputs(flow, execution, outputs);

View File

@@ -1,4 +1,5 @@
@PluginSubGroup(
title = "HTTP",
description = "This sub-group of plugins contains tasks for making HTTP requests.",
categories = PluginSubGroup.PluginCategory.STORAGE
)

View File

@@ -103,8 +103,10 @@ public class Set extends Task implements RunnableTask<VoidOutput> {
KVStore kvStore = runContext.namespaceKv(renderedNamespace);
if (kvType != null && renderedValue instanceof String renderedValueStr) {
renderedValue = switch (runContext.render(kvType).as(KVType.class).orElseThrow()) {
if (kvType != null){
KVType renderedKvType = runContext.render(kvType).as(KVType.class).orElseThrow();
if (renderedValue instanceof String renderedValueStr) {
renderedValue = switch (renderedKvType) {
case NUMBER -> JacksonMapper.ofJson().readValue(renderedValueStr, Number.class);
case BOOLEAN -> Boolean.parseBoolean((String) renderedValue);
case DATETIME, DATE -> Instant.parse(renderedValueStr);
@@ -112,7 +114,10 @@ public class Set extends Task implements RunnableTask<VoidOutput> {
case JSON -> JacksonMapper.toObject(renderedValueStr);
default -> renderedValue;
};
} else if (renderedValue instanceof Number valueNumber && renderedKvType == KVType.STRING) {
renderedValue = valueNumber.toString();
}
}
kvStore.put(renderedKey, new KVValueAndMetadata(
new KVMetadata(

View File

@@ -1,4 +1,8 @@
@PluginSubGroup(categories = PluginSubGroup.PluginCategory.CORE)
@PluginSubGroup(
title = "KV",
description = "This sub-group of plugins contains tasks for interacting with the key-value (KV) store.\n",
categories = PluginSubGroup.PluginCategory.CORE
)
package io.kestra.plugin.core.kv;
import io.kestra.core.models.annotations.PluginSubGroup;

View File

@@ -41,7 +41,7 @@ import jakarta.validation.constraints.Size;
You can access the request body and headers sent by another application using the following template variables:
- `{{ trigger.body }}`
- `{{ trigger.headers }}`.
- `{{ trigger.headers }}`
The webhook response will be one of the following HTTP status codes:
- 404 if the namespace, flow or webhook key is not found.
@@ -72,7 +72,7 @@ import jakarta.validation.constraints.Size;
),
@Example(
title = """
Add a trigger matching specific webhook event condition. The flow will be executed only if the condition is met.`.
Add a trigger matching specific webhook event condition. The flow will be executed only if the condition is met.
""",
code = """
id: condition_based_webhook_flow

View File

@@ -10,6 +10,7 @@ import org.junit.jupiter.api.Test;
import java.net.URI;
import java.util.Map;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
@@ -37,9 +38,9 @@ class VariablesTest {
@Test
@SuppressWarnings("unchecked")
void inStorage() {
var storageContext = StorageContext.forTask(null, "namespace", "flow", "execution", "task", "taskRun", null);
var storageContext = StorageContext.forTask(MAIN_TENANT, "namespace", "flow", "execution", "task", "taskRun", null);
var internalStorage = new InternalStorage(storageContext, storageInterface);
Variables.StorageContext variablesContext = new Variables.StorageContext(null, "namespace");
Variables.StorageContext variablesContext = new Variables.StorageContext(MAIN_TENANT, "namespace");
// simple
Map<String, Object> outputs = Map.of("key", "value");

View File

@@ -0,0 +1,86 @@
package io.kestra.core.reporter;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
class SchedulesTest {
@Test
void shouldTriggerAfterPeriodGivenEnoughTimeHasPassed() {
// Given
var schedule = Schedules.every(Duration.ofHours(1));
Instant now = Instant.now();
// When
boolean firstRun = schedule.shouldRun(now);
boolean fiveMinutesLater = schedule.shouldRun(now.plus(Duration.ofMinutes(5)));
boolean oneHourLater = schedule.shouldRun(now.plus(Duration.ofHours(1)));
// Then
assertThat(firstRun).isTrue();
assertThat(fiveMinutesLater).isFalse();
assertThat(oneHourLater).isTrue();
}
@Test
void shouldNotTriggerGivenPeriodHasNotElapsed() {
// Given
var schedule = Schedules.every(Duration.ofMinutes(30));
Instant now = Instant.now();
// When
boolean firstRun = schedule.shouldRun(now);
boolean almost30Minutes = schedule.shouldRun(now.plus(Duration.ofMinutes(29)));
// Then
assertThat(firstRun).isTrue();
assertThat(almost30Minutes).isFalse();
}
@Test
void shouldTriggerHourlyGivenOneHourHasElapsed() {
// Given
var schedule = Schedules.hourly();
Instant now = Instant.now();
// When
boolean firstRun = schedule.shouldRun(now);
boolean nextHour = schedule.shouldRun(now.plus(Duration.ofHours(1)));
// Then
assertThat(firstRun).isTrue();
assertThat(nextHour).isTrue();
}
@Test
void shouldTriggerDailyGivenOneDayHasElapsed() {
// Given
var schedule = Schedules.daily();
Instant now = Instant.now();
// When
boolean firstRun = schedule.shouldRun(now);
boolean nextDay = schedule.shouldRun(now.plus(Duration.ofDays(1)));
// Then
assertThat(firstRun).isTrue();
assertThat(nextDay).isTrue();
}
@Test
void shouldThrowExceptionGivenZeroOrNegativeDuration() {
// Given / When / Then
assertThatThrownBy(() -> Schedules.every(Duration.ZERO))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Period must be positive");
assertThatThrownBy(() -> Schedules.every(Duration.ofSeconds(-5)))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Period must be positive");
}
}

View File

@@ -0,0 +1,33 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.reporter.Reportable;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
public abstract class AbstractFeatureUsageReportTest {
@Inject
FeatureUsageReport featureUsageReport;
@Test
public void shouldGetReport() {
// When
Instant now = Instant.now();
FeatureUsageReport.UsageEvent event = featureUsageReport.report(
now,
Reportable.TimeInterval.of(now.minus(Duration.ofDays(1)).atZone(ZoneId.systemDefault()), now.atZone(ZoneId.systemDefault()))
);
// Then
assertThat(event.getExecutions().getDailyExecutionsCount().size()).isGreaterThan(0);
assertThat(event.getExecutions().getDailyTaskRunsCount()).isNull();
}
}

View File

@@ -0,0 +1,81 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.collectors.ServiceUsage;
import io.kestra.core.reporter.Reportable;
import io.kestra.core.repositories.ServiceInstanceRepositoryInterface;
import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceInstance;
import io.kestra.core.server.ServiceType;
import io.kestra.core.utils.IdUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;
import java.util.Set;
@KestraTest
public abstract class AbstractServiceUsageReportTest {
@Inject
ServiceUsageReport serviceUsageReport;
@Inject
ServiceInstanceRepositoryInterface serviceInstanceRepository;
@Test
public void shouldGetReport() {
// Given
final LocalDate start = LocalDate.now().withDayOfMonth(1);
final LocalDate end = start.withDayOfMonth(start.getMonth().length(start.isLeapYear()));
final ZoneId zoneId = ZoneId.systemDefault();
LocalDate from = start;
int days = 0;
// generate one month of service instance
while (from.toEpochDay() < end.toEpochDay()) {
Instant createAt = from.atStartOfDay(zoneId).toInstant();
Instant updatedAt = from.atStartOfDay(zoneId).plus(Duration.ofHours(10)).toInstant();
ServiceInstance instance = new ServiceInstance(
IdUtils.create(),
ServiceType.EXECUTOR,
Service.ServiceState.EMPTY,
null,
createAt,
updatedAt,
List.of(),
null,
Map.of(),
Set.of()
);
instance = instance
.state(Service.ServiceState.RUNNING, createAt)
.state(Service.ServiceState.NOT_RUNNING, updatedAt);
serviceInstanceRepository.save(instance);
from = from.plusDays(1);
days++;
}
// When
Instant now = end.plusDays(1).atStartOfDay(zoneId).toInstant();
ServiceUsageReport.ServiceUsageEvent event = serviceUsageReport.report(now,
Reportable.TimeInterval.of(start.atStartOfDay(zoneId), end.plusDays(1).atStartOfDay(zoneId))
);
// Then
List<ServiceUsage.DailyServiceStatistics> statistics = event.services().dailyStatistics();
Assertions.assertEquals(ServiceType.values().length - 1, statistics.size());
Assertions.assertEquals(
days,
statistics.stream().filter(it -> it.type().equalsIgnoreCase("EXECUTOR")).findFirst().get().values().size()
);
}
}

View File

@@ -0,0 +1,41 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.plugin.core.http.Trigger;
import io.kestra.plugin.core.log.Log;
import io.kestra.plugin.core.trigger.Schedule;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
class PluginMetricReportTest {
@Inject
MetricRegistry metricRegistry;
@Inject
PluginMetricReport pluginMetricReport;
@Test
void shouldGetReport() {
// Given
metricRegistry.timer(MetricRegistry.METRIC_WORKER_ENDED_DURATION, MetricRegistry.METRIC_WORKER_ENDED_DURATION_DESCRIPTION, MetricRegistry.TAG_TASK_TYPE, Log.class.getName())
.record(() -> Duration.ofSeconds(1));
metricRegistry.timer(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION, MetricRegistry.METRIC_WORKER_TRIGGER_DURATION_DESCRIPTION, MetricRegistry.TAG_TRIGGER_TYPE, Trigger.class.getName())
.record(() -> Duration.ofSeconds(1));
metricRegistry.timer(MetricRegistry.METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION, MetricRegistry.METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION_DESCRIPTION, MetricRegistry.TAG_TRIGGER_TYPE, Schedule.class.getName())
.record(() -> Duration.ofSeconds(1));
// When
PluginMetricReport.PluginMetricEvent event = pluginMetricReport.report(Instant.now());
// Then
assertThat(event.pluginMetrics()).hasSize(3);
}
}

View File

@@ -0,0 +1,68 @@
package io.kestra.core.reporter.reports;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.Setting;
import io.kestra.core.repositories.SettingRepositoryInterface;
import io.micronaut.test.annotation.MockBean;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import jakarta.validation.ConstraintViolationException;
import org.junit.jupiter.api.Test;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
class SystemInformationReportTest {
@Inject
private SystemInformationReport systemInformationReport;
@Test
void shouldGetReport() {
SystemInformationReport.SystemInformationEvent event = systemInformationReport.report(Instant.now());
assertThat(event.uri()).isEqualTo("https://mysuperhost.com/subpath");
assertThat(event.environments()).contains("test");
assertThat(event.startTime()).isNotNull();
assertThat(event.host().getUuid()).isNotNull();
assertThat(event.host().getHardware().getLogicalProcessorCount()).isNotNull();
assertThat(event.host().getJvm().getName()).isNotNull();
assertThat(event.host().getOs().getFamily()).isNotNull();
assertThat(event.configurations().getRepositoryType()).isEqualTo("memory");
assertThat(event.configurations().getQueueType()).isEqualTo("memory");
}
@MockBean(SettingRepositoryInterface.class)
@Singleton
static class TestSettingRepository implements SettingRepositoryInterface {
public static Object UUID = null;
@Override
public Optional<Setting> findByKey(String key) {
return Optional.empty();
}
@Override
public List<Setting> findAll() {
return new ArrayList<>();
}
@Override
public Setting save(Setting setting) throws ConstraintViolationException {
if (setting.getKey().equals(Setting.INSTANCE_UUID)) {
UUID = setting.getValue();
}
return setting;
}
@Override
public Setting delete(Setting setting) {
return setting;
}
}
}

View File

@@ -8,7 +8,6 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.storages.StorageInterface;
import io.kestra.plugin.core.debug.Return;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.junit.annotations.KestraTest;
@@ -26,6 +25,7 @@ import java.time.ZonedDateTime;
import java.util.Map;
import java.util.Objects;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
@@ -39,9 +39,6 @@ public abstract class AbstractExecutionServiceTest {
@Inject
LogRepositoryInterface logRepository;
@Inject
StorageInterface storageInterface;
@Inject
RunContextFactory runContextFactory;
@@ -56,12 +53,14 @@ public abstract class AbstractExecutionServiceTest {
Flow flow = Flow.builder()
.namespace("io.kestra.test")
.id("abc")
.tenantId(MAIN_TENANT)
.revision(1)
.build();
Execution execution = Execution
.builder()
.id(IdUtils.create())
.tenantId(MAIN_TENANT)
.state(state)
.flowId(flow.getId())
.namespace(flow.getNamespace())
@@ -74,6 +73,7 @@ public abstract class AbstractExecutionServiceTest {
.builder()
.namespace(flow.getNamespace())
.id(IdUtils.create())
.tenantId(MAIN_TENANT)
.executionId(execution.getId())
.flowId(flow.getId())
.taskId(task.getId())
@@ -94,6 +94,7 @@ public abstract class AbstractExecutionServiceTest {
for (int i = 0; i < 10; i++) {
logRepository.save(LogEntry.builder()
.executionId(execution.getId())
.tenantId(MAIN_TENANT)
.timestamp(Instant.now())
.message("Message " + i)
.flowId(flow.getId())
@@ -108,7 +109,7 @@ public abstract class AbstractExecutionServiceTest {
true,
true,
true,
null,
MAIN_TENANT,
flow.getNamespace(),
flow.getId(),
null,
@@ -126,7 +127,7 @@ public abstract class AbstractExecutionServiceTest {
true,
true,
true,
null,
MAIN_TENANT,
flow.getNamespace(),
flow.getId(),
null,

View File

@@ -387,6 +387,13 @@ public abstract class AbstractRunnerTest {
forEachItemCaseTest.forEachItemInIf();
}
@Test
@LoadFlows({"flows/valids/for-each-item-subflow-after-execution.yaml",
"flows/valids/for-each-item-after-execution.yaml"})
protected void forEachItemWithAfterExecution() throws Exception {
forEachItemCaseTest.forEachItemWithAfterExecution();
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-cancel.yml"})
void concurrencyCancel() throws Exception {
@@ -425,10 +432,16 @@ public abstract class AbstractRunnerTest {
@Test
@LoadFlows({"flows/valids/flow-concurrency-queue-fail.yml"})
void concurrencyQueueRestarted() throws Exception {
protected void concurrencyQueueRestarted() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueueRestarted();
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-queue-after-execution.yml"})
void concurrencyQueueAfterExecution() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueueAfterExecution();
}
@Test
@ExecuteFlow("flows/valids/executable-fail.yml")
void badExecutable(Execution execution) {

View File

@@ -342,6 +342,55 @@ public class FlowConcurrencyCaseTest {
assertThat(executionResult2.get().getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
}
public void flowConcurrencyQueueAfterExecution() throws TimeoutException, QueueException, InterruptedException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-after-execution", null, null, Duration.ofSeconds(30));
Flow flow = flowRepository
.findById(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-after-execution", Optional.empty())
.orElseThrow();
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
executionQueue.emit(execution2);
assertThat(execution1.getState().isRunning()).isTrue();
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CREATED);
var executionResult1 = new AtomicReference<Execution>();
var executionResult2 = new AtomicReference<Execution>();
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
CountDownLatch latch3 = new CountDownLatch(1);
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
if (e.getLeft().getId().equals(execution1.getId())) {
executionResult1.set(e.getLeft());
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
latch1.countDown();
}
}
if (e.getLeft().getId().equals(execution2.getId())) {
executionResult2.set(e.getLeft());
if (e.getLeft().getState().getCurrent() == State.Type.RUNNING) {
latch2.countDown();
}
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
latch3.countDown();
}
}
});
assertTrue(latch1.await(1, TimeUnit.MINUTES));
assertTrue(latch2.await(1, TimeUnit.MINUTES));
assertTrue(latch3.await(1, TimeUnit.MINUTES));
receive.blockLast();
assertThat(executionResult1.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(executionResult2.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(executionResult2.get().getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
assertThat(executionResult2.get().getState().getHistories().get(1).getState()).isEqualTo(State.Type.QUEUED);
assertThat(executionResult2.get().getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
}
private URI storageUpload() throws URISyntaxException, IOException {
File tempFile = File.createTempFile("file", ".txt");

View File

@@ -9,6 +9,7 @@ import io.kestra.core.models.flows.input.FileInput;
import io.kestra.core.models.flows.input.InputAndValue;
import io.kestra.core.models.flows.input.IntInput;
import io.kestra.core.models.flows.input.StringInput;
import io.kestra.core.models.property.Property;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.IdUtils;
import io.micronaut.http.MediaType;
@@ -21,7 +22,6 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
@@ -30,6 +30,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
@KestraTest
class FlowInputOutputTest {
@@ -70,8 +72,8 @@ class FlowInputOutputTest {
// Then
Assertions.assertEquals(
List.of(
new InputAndValue(input1, "value1", true, null),
new InputAndValue(input2, "value2", true, null)),
new InputAndValue(input1, "value1", true, false, null),
new InputAndValue(input2, "value2", true, false, null)),
values
);
}
@@ -103,9 +105,9 @@ class FlowInputOutputTest {
// Then
Assertions.assertEquals(
List.of(
new InputAndValue(input1, "v1", true, null),
new InputAndValue(input2, "v2", true, null),
new InputAndValue(input3, "v3", true, null)),
new InputAndValue(input1, "v1", true, false, null),
new InputAndValue(input2, "v2", true, false, null),
new InputAndValue(input3, "v3", true, false, null)),
values
);
}
@@ -137,9 +139,9 @@ class FlowInputOutputTest {
// Then
Assertions.assertEquals(
List.of(
new InputAndValue(input1, "v1", true, null),
new InputAndValue(input2, "v2", false, null),
new InputAndValue(input3, "v3", false, null)),
new InputAndValue(input1, "v1", true, false, null),
new InputAndValue(input2, "v2", false, false, null),
new InputAndValue(input3, "v3", false, false, null)),
values
);
}
@@ -167,8 +169,8 @@ class FlowInputOutputTest {
// Then
Assertions.assertEquals(
List.of(
new InputAndValue(input1, "value1", true, null),
new InputAndValue(input2, "value2", false, null)),
new InputAndValue(input1, "value1", true, false, null),
new InputAndValue(input2, "value2", false, false, null)),
values
);
}
@@ -200,7 +202,7 @@ class FlowInputOutputTest {
}
@Test
void shouldNotUploadFileInputAfterValidation() throws IOException {
void shouldNotUploadFileInputAfterValidation() {
// Given
FileInput input = FileInput
.builder()
@@ -215,7 +217,7 @@ class FlowInputOutputTest {
// Then
Assertions.assertNull(values.getFirst().exception());
Assertions.assertFalse(storageInterface.exists(null, null, URI.create(values.getFirst().value().toString())));
Assertions.assertFalse(storageInterface.exists(MAIN_TENANT, null, URI.create(values.getFirst().value().toString())));
}
@Test
@@ -223,13 +225,15 @@ class FlowInputOutputTest {
// Given
StringInput input1 = StringInput.builder()
.id("input1")
.type(Type.STRING)
.validator("\\d")
.defaults("0")
.defaults(Property.ofValue("0"))
.required(false)
.build();
IntInput input2 = IntInput.builder()
.type(Type.INT)
.id("input2")
.defaults(0)
.defaults(Property.ofValue(0))
.required(false)
.build();
@@ -243,8 +247,41 @@ class FlowInputOutputTest {
// Then
Assertions.assertEquals(
List.of(
new InputAndValue(input1, "0", true, null),
new InputAndValue(input2, 0, true, null)),
new InputAndValue(input1, "0", true, true, null),
new InputAndValue(input2, 0, true, true, null)),
values
);
}
@Test
void resolveInputsGivenDefaultExpressions() {
// Given
StringInput input1 = StringInput.builder()
.id("input1")
.type(Type.STRING)
.defaults(Property.ofExpression("{{ 'hello' }}"))
.required(false)
.build();
StringInput input2 = StringInput.builder()
.id("input2")
.type(Type.STRING)
.defaults(Property.ofExpression("{{ inputs.input1 }}_world"))
.required(false)
.dependsOn(new DependsOn(List.of("input1"),null))
.build();
List<Input<?>> inputs = List.of(input1, input2);
Map<String, Object> data = Map.of("input42", "foo");
// When
List<InputAndValue> values = flowInputOutput.resolveInputs(inputs, null, DEFAULT_TEST_EXECUTION, data);
// Then
Assertions.assertEquals(
List.of(
new InputAndValue(input1, "hello", true, true, null),
new InputAndValue(input2, "hello_world", true, true, null)),
values
);
}

View File

@@ -83,4 +83,24 @@ class RunContextPropertyTest {
runContextProperty = new RunContextProperty<>(Property.<Map<String, String>>builder().expression("{ \"key\": \"{{ key }}\"}").build(), runContext);
assertThat(runContextProperty.asMap(String.class, String.class, Map.of("key", "value"))).containsEntry("key", "value");
}
@Test
void asShouldReturnCachedRenderedProperty() throws IllegalVariableEvaluationException {
var runContext = runContextFactory.of();
var runContextProperty = new RunContextProperty<>(Property.<String>builder().expression("{{ variable }}").build(), runContext);
assertThat(runContextProperty.as(String.class, Map.of("variable", "value1"))).isEqualTo(Optional.of("value1"));
assertThat(runContextProperty.as(String.class, Map.of("variable", "value2"))).isEqualTo(Optional.of("value1"));
}
@Test
void asShouldNotReturnCachedRenderedPropertyWithSkipCache() throws IllegalVariableEvaluationException {
var runContext = runContextFactory.of();
var runContextProperty = new RunContextProperty<>(Property.<String>builder().expression("{{ variable }}").build(), runContext);
assertThat(runContextProperty.as(String.class, Map.of("variable", "value1"))).isEqualTo(Optional.of("value1"));
assertThat(runContextProperty.skipCache().as(String.class, Map.of("variable", "value2"))).isEqualTo(Optional.of("value2"));
}
}

View File

@@ -257,7 +257,7 @@ class RunContextTest {
@Test
void withDefaultInput() throws IllegalVariableEvaluationException {
Flow flow = Flow.builder().id("triggerWithDefaultInput").namespace("io.kestra.test").revision(1).inputs(List.of(StringInput.builder().id("test").type(Type.STRING).defaults("test").build())).build();
Flow flow = Flow.builder().id("triggerWithDefaultInput").namespace("io.kestra.test").revision(1).inputs(List.of(StringInput.builder().id("test").type(Type.STRING).defaults(io.kestra.core.models.property.Property.ofValue("test")).build())).build();
Execution execution = Execution.builder().id(IdUtils.create()).flowId("triggerWithDefaultInput").namespace("io.kestra.test").state(new State()).build();
RunContext runContext = runContextFactory.of(flow, execution);
@@ -267,7 +267,7 @@ class RunContextTest {
@Test
void withNullLabel() throws IllegalVariableEvaluationException {
Flow flow = Flow.builder().id("triggerWithDefaultInput").namespace("io.kestra.test").revision(1).inputs(List.of(StringInput.builder().id("test").type(Type.STRING).defaults("test").build())).build();
Flow flow = Flow.builder().id("triggerWithDefaultInput").namespace("io.kestra.test").revision(1).inputs(List.of(StringInput.builder().id("test").type(Type.STRING).defaults(io.kestra.core.models.property.Property.ofValue("test")).build())).build();
Execution execution = Execution.builder().id(IdUtils.create()).flowId("triggerWithDefaultInput").namespace("io.kestra.test").state(new State()).labels(List.of(new Label("key", null), new Label(null, "value"))).build();
RunContext runContext = runContextFactory.of(flow, execution);

View File

@@ -1,22 +1,25 @@
package io.kestra.core.runners;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.property.PropertyContext;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.micronaut.context.annotation.Property;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class RunVariablesTest {
private final PropertyContext propertyContext = Mockito.mock(PropertyContext.class);
@Test
@SuppressWarnings("unchecked")
void shouldGetEmptyVariables() {
Map<String, Object> variables = new RunVariables.DefaultBuilder().build(new RunContextLogger());
Map<String, Object> variables = new RunVariables.DefaultBuilder().build(new RunContextLogger(), propertyContext);
assertThat(variables.size()).isEqualTo(3);
assertThat((Map<String, Object>) variables.get("envs")).isEqualTo(Map.of());
assertThat((Map<String, Object>) variables.get("globals")).isEqualTo(Map.of());
@@ -33,7 +36,7 @@ class RunVariablesTest {
.revision(42)
.build()
)
.build(new RunContextLogger());
.build(new RunContextLogger(), propertyContext);
Assertions.assertEquals(Map.of(
"id", "id-value",
"namespace", "namespace-value",
@@ -52,7 +55,7 @@ class RunVariablesTest {
.tenantId("tenant-value")
.build()
)
.build(new RunContextLogger());
.build(new RunContextLogger(), propertyContext);
Assertions.assertEquals(Map.of(
"id", "id-value",
"namespace", "namespace-value",
@@ -75,7 +78,7 @@ class RunVariablesTest {
return "type-value";
}
})
.build(new RunContextLogger());
.build(new RunContextLogger(), propertyContext);
Assertions.assertEquals(Map.of("id", "id-value", "type", "type-value"), variables.get("task"));
}
@@ -93,7 +96,7 @@ class RunVariablesTest {
return "type-value";
}
})
.build(new RunContextLogger());
.build(new RunContextLogger(), propertyContext);
Assertions.assertEquals(Map.of("id", "id-value", "type", "type-value"), variables.get("trigger"));
}
@@ -102,7 +105,7 @@ class RunVariablesTest {
void shouldGetKestraConfiguration() {
Map<String, Object> variables = new RunVariables.DefaultBuilder()
.withKestraConfiguration(new RunVariables.KestraConfiguration("test", "http://localhost:8080"))
.build(new RunContextLogger());
.build(new RunContextLogger(), propertyContext);
assertThat(variables.size()).isEqualTo(4);
Map<String, Object> kestra = (Map<String, Object>) variables.get("kestra");
assertThat(kestra).hasSize(2);

View File

@@ -0,0 +1,39 @@
package io.kestra.core.runners.pebble.functions;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import io.kestra.core.utils.IdUtils;
import java.util.Map;
public class FunctionTestUtils {
public static final String NAMESPACE = "io.kestra.tests";
public static Map<String, Object> getVariables() {
return getVariables(NAMESPACE);
}
public static Map<String, Object> getVariables(String namespace) {
return Map.of(
"flow", Map.of(
"id", "kv",
"tenantId", MAIN_TENANT,
"namespace", namespace)
);
}
public static Map<String, Object> getVariablesWithExecution(String namespace) {
return getVariablesWithExecution(namespace, IdUtils.create());
}
public static Map<String, Object> getVariablesWithExecution(String namespace, String executionId) {
return Map.of(
"flow", Map.of(
"id", "flow",
"namespace", namespace,
"tenantId", MAIN_TENANT),
"execution", Map.of("id", executionId)
);
}
}

View File

@@ -1,39 +1,24 @@
package io.kestra.core.runners.pebble.functions;
import static io.kestra.core.runners.pebble.functions.FunctionTestUtils.getVariables;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.VariableRenderer;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.kv.InternalKVStore;
import io.kestra.core.storages.kv.KVStore;
import io.kestra.core.storages.kv.KVValueAndMetadata;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.debug.Return;
import io.pebbletemplates.pebble.error.PebbleException;
import jakarta.inject.Inject;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
@KestraTest(startRunner = true)
public class KvFunctionTest {
@@ -46,20 +31,16 @@ public class KvFunctionTest {
@BeforeEach
void reset() throws IOException {
storageInterface.deleteByPrefix(null, null, URI.create(StorageContext.kvPrefix("io.kestra.tests")));
storageInterface.deleteByPrefix(MAIN_TENANT, null, URI.create(StorageContext.kvPrefix("io.kestra.tests")));
}
@Test
void shouldGetValueFromKVGivenExistingKey() throws IllegalVariableEvaluationException, IOException {
// Given
KVStore kv = new InternalKVStore(null, "io.kestra.tests", storageInterface);
KVStore kv = new InternalKVStore(MAIN_TENANT, "io.kestra.tests", storageInterface);
kv.put("my-key", new KVValueAndMetadata(null, Map.of("field", "value")));
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "kv",
"namespace", "io.kestra.tests")
);
Map<String, Object> variables = getVariables("io.kestra.tests");
// When
String rendered = variableRenderer.render("{{ kv('my-key') }}", variables);
@@ -71,17 +52,13 @@ public class KvFunctionTest {
@Test
void shouldGetValueFromKVGivenExistingKeyWithInheritance() throws IllegalVariableEvaluationException, IOException {
// Given
KVStore kv = new InternalKVStore(null, "my.company", storageInterface);
KVStore kv = new InternalKVStore(MAIN_TENANT, "my.company", storageInterface);
kv.put("my-key", new KVValueAndMetadata(null, Map.of("field", "value")));
KVStore firstKv = new InternalKVStore(null, "my", storageInterface);
KVStore firstKv = new InternalKVStore(MAIN_TENANT, "my", storageInterface);
firstKv.put("my-key", new KVValueAndMetadata(null, Map.of("field", "firstValue")));
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "kv",
"namespace", "my.company.team")
);
Map<String, Object> variables = getVariables("my.company.team");
// When
String rendered = variableRenderer.render("{{ kv('my-key') }}", variables);
@@ -93,14 +70,10 @@ public class KvFunctionTest {
@Test
void shouldNotGetValueFromKVWithGivenNamespaceAndInheritance() throws IOException {
// Given
KVStore kv = new InternalKVStore(null, "kv", storageInterface);
KVStore kv = new InternalKVStore(MAIN_TENANT, "kv", storageInterface);
kv.put("my-key", new KVValueAndMetadata(null, Map.of("field", "value")));
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "kv",
"namespace", "my.company.team")
);
Map<String, Object> variables = getVariables("my.company.team");
// When
Assertions.assertThrows(IllegalVariableEvaluationException.class, () ->
@@ -110,14 +83,10 @@ public class KvFunctionTest {
@Test
void shouldGetValueFromKVGivenExistingAndNamespace() throws IllegalVariableEvaluationException, IOException {
// Given
KVStore kv = new InternalKVStore(null, "kv", storageInterface);
KVStore kv = new InternalKVStore(MAIN_TENANT, "kv", storageInterface);
kv.put("my-key", new KVValueAndMetadata(null, Map.of("field", "value")));
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "kv",
"namespace", "io.kestra.tests")
);
Map<String, Object> variables = getVariables("io.kestra.tests");
// When
String rendered = variableRenderer.render("{{ kv('my-key', namespace='kv') }}", variables);
@@ -129,11 +98,7 @@ public class KvFunctionTest {
@Test
void shouldGetEmptyGivenNonExistingKeyAndErrorOnMissingFalse() throws IllegalVariableEvaluationException {
// Given
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "kv",
"namespace", "io.kestra.tests")
);
Map<String, Object> variables = getVariables("io.kestra.tests");
// When
String rendered = variableRenderer.render("{{ kv('my-key', errorOnMissing=false) }}", variables);
@@ -145,11 +110,7 @@ public class KvFunctionTest {
@Test
void shouldFailGivenNonExistingKeyAndErrorOnMissingTrue() {
// Given
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "kv",
"namespace", "io.kestra.tests")
);
Map<String, Object> variables = getVariables("io.kestra.tests");
// When
IllegalVariableEvaluationException exception = Assertions.assertThrows(IllegalVariableEvaluationException.class, () -> {
@@ -163,11 +124,7 @@ public class KvFunctionTest {
@Test
void shouldFailGivenNonExistingKeyUsingDefaults() {
// Given
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "kv",
"namespace", "io.kestra.tests")
);
Map<String, Object> variables = getVariables("io.kestra.tests");
// When
IllegalVariableEvaluationException exception = Assertions.assertThrows(IllegalVariableEvaluationException.class, () -> {
variableRenderer.render("{{ kv('my-key') }}", variables);
@@ -176,4 +133,5 @@ public class KvFunctionTest {
// Then
assertThat(exception.getMessage()).isEqualTo("io.pebbletemplates.pebble.error.PebbleException: The key 'my-key' does not exist in the namespace 'io.kestra.tests'. ({{ kv('my-key') }}:1)");
}
}

View File

@@ -20,6 +20,9 @@ import java.net.URI;
import java.nio.file.Files;
import java.util.Map;
import static io.kestra.core.runners.pebble.functions.FunctionTestUtils.NAMESPACE;
import static io.kestra.core.runners.pebble.functions.FunctionTestUtils.getVariables;
import static io.kestra.core.runners.pebble.functions.FunctionTestUtils.getVariablesWithExecution;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -35,48 +38,39 @@ class ReadFileFunctionTest {
@Test
void readNamespaceFile() throws IllegalVariableEvaluationException, IOException {
String namespace = "io.kestra.tests";
String filePath = "file.txt";
storageInterface.createDirectory(MAIN_TENANT, namespace, URI.create(StorageContext.namespaceFilePrefix(namespace)));
storageInterface.put(MAIN_TENANT, namespace, URI.create(StorageContext.namespaceFilePrefix(namespace) + "/" + filePath), new ByteArrayInputStream("Hello from {{ flow.namespace }}".getBytes()));
storageInterface.createDirectory(MAIN_TENANT, NAMESPACE, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE)));
storageInterface.put(MAIN_TENANT, NAMESPACE, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + "/" + filePath), new ByteArrayInputStream("Hello from {{ flow.namespace }}".getBytes()));
String render = variableRenderer.render("{{ render(read('" + filePath + "')) }}", Map.of("flow", Map.of("namespace", namespace, "tenantId", MAIN_TENANT)));
assertThat(render).isEqualTo("Hello from " + namespace);
String render = variableRenderer.render("{{ render(read('" + filePath + "')) }}", getVariables());
assertThat(render).isEqualTo("Hello from " + NAMESPACE);
}
@Test
void readNamespaceFileFromURI() throws IllegalVariableEvaluationException, IOException {
String namespace = "io.kestra.tests";
String filePath = "file.txt";
storageInterface.createDirectory(MAIN_TENANT, namespace, URI.create(StorageContext.namespaceFilePrefix(namespace)));
storageInterface.put(MAIN_TENANT, namespace, URI.create(StorageContext.namespaceFilePrefix(namespace) + "/" + filePath), new ByteArrayInputStream("Hello from {{ flow.namespace }}".getBytes()));
storageInterface.createDirectory(MAIN_TENANT, NAMESPACE, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE)));
storageInterface.put(MAIN_TENANT, NAMESPACE, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + "/" + filePath), new ByteArrayInputStream("Hello from {{ flow.namespace }}".getBytes()));
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "flow",
"namespace", namespace,
"tenantId", MAIN_TENANT),
"execution", Map.of("id", IdUtils.create())
);
Map<String, Object> variables = getVariablesWithExecution(NAMESPACE);
String render = variableRenderer.render("{{ render(read(fileURI('" + filePath + "'))) }}", variables);
assertThat(render).isEqualTo("Hello from " + namespace);
assertThat(render).isEqualTo("Hello from " + NAMESPACE);
}
@Test
void readNamespaceFileWithNamespace() throws IllegalVariableEvaluationException, IOException {
String namespace = "io.kestra.tests";
String filePath = "file.txt";
storageInterface.createDirectory(MAIN_TENANT, namespace, URI.create(StorageContext.namespaceFilePrefix(namespace)));
storageInterface.put(MAIN_TENANT, namespace, URI.create(StorageContext.namespaceFilePrefix(namespace) + "/" + filePath), new ByteArrayInputStream("Hello but not from flow.namespace".getBytes()));
storageInterface.createDirectory(MAIN_TENANT, NAMESPACE, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE)));
storageInterface.put(MAIN_TENANT, NAMESPACE, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + "/" + filePath), new ByteArrayInputStream("Hello but not from flow.namespace".getBytes()));
String render = variableRenderer.render("{{ read('" + filePath + "', namespace='" + namespace + "') }}", Map.of("flow", Map.of("namespace", "flow.namespace", "tenantId", MAIN_TENANT)));
String render = variableRenderer.render("{{ read('" + filePath + "', namespace='" + NAMESPACE + "') }}", getVariables("different.namespace"));
assertThat(render).isEqualTo("Hello but not from flow.namespace");
}
@Test
void readUnknownNamespaceFile() {
IllegalVariableEvaluationException illegalVariableEvaluationException = assertThrows(IllegalVariableEvaluationException.class, () -> variableRenderer.render("{{ read('unknown.txt') }}", Map.of("flow", Map.of("namespace", "io.kestra.tests"))));
IllegalVariableEvaluationException illegalVariableEvaluationException = assertThrows(IllegalVariableEvaluationException.class, () -> variableRenderer.render("{{ read('unknown.txt') }}", getVariables()));
assertThat(illegalVariableEvaluationException.getCause().getCause().getClass()).isEqualTo(FileNotFoundException.class);
}
@@ -90,13 +84,7 @@ class ReadFileFunctionTest {
URI internalStorageFile = storageInterface.put(MAIN_TENANT, namespace, internalStorageURI, new ByteArrayInputStream("Hello from a task output".getBytes()));
// test for an authorized execution
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", flowId,
"namespace", namespace,
"tenantId", MAIN_TENANT),
"execution", Map.of("id", executionId)
);
Map<String, Object> variables = getVariablesWithExecution(namespace, executionId);
String render = variableRenderer.render("{{ read('" + internalStorageFile + "') }}", variables);
assertThat(render).isEqualTo("Hello from a task output");
@@ -169,13 +157,7 @@ class ReadFileFunctionTest {
URI internalStorageURI = URI.create("/" + namespace.replace(".", "/") + "/" + flowId + "/executions/" + executionId + "/tasks/task/" + IdUtils.create() + "/123456.ion");
URI internalStorageFile = storageInterface.put(MAIN_TENANT, namespace, internalStorageURI, new ByteArrayInputStream("Hello from a task output".getBytes()));
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "notme",
"namespace", "notme",
"tenantId", MAIN_TENANT),
"execution", Map.of("id", "notme")
);
Map<String, Object> variables = getVariablesWithExecution("notme", "notme");
String render = variableRenderer.render("{{ read('" + internalStorageFile + "') }}", variables);
assertThat(render).isEqualTo("Hello from a task output");
@@ -191,13 +173,7 @@ class ReadFileFunctionTest {
@Test
void shouldFailProcessingUnsupportedScheme() {
Map<String, Object> variables = Map.of(
"flow", Map.of(
"id", "notme",
"namespace", "notme",
"tenantId", MAIN_TENANT),
"execution", Map.of("id", "notme")
);
Map<String, Object> variables = getVariablesWithExecution("notme", "notme");
assertThrows(IllegalArgumentException.class, () -> variableRenderer.render("{{ read('unsupported://path-to/file.txt') }}", variables));
}

View File

@@ -90,13 +90,13 @@ abstract public class AbstractSchedulerTest {
.type(Type.STRING)
.id("testInputs")
.required(false)
.defaults("test")
.defaults(Property.ofValue("test"))
.build(),
StringInput.builder()
.type(Type.STRING)
.id("def")
.required(false)
.defaults("awesome")
.defaults(Property.ofValue("awesome"))
.build()
))
.revision(1)
@@ -110,7 +110,7 @@ abstract public class AbstractSchedulerTest {
.tasks(Collections.singletonList(Return.builder()
.id("test")
.type(Return.class.getName())
.format(new Property<>("{{ inputs.testInputs }}"))
.format(Property.ofExpression("{{ inputs.testInputs }}"))
.build()));
if (list != null) {

View File

@@ -1,106 +0,0 @@
package io.kestra.core.services;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.Helpers;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.ServerType;
import io.kestra.core.models.Setting;
import io.kestra.core.models.collectors.Usage;
import io.kestra.core.models.executions.statistics.DailyExecutionStatistics;
import io.kestra.core.repositories.SettingRepositoryInterface;
import io.kestra.plugin.core.http.Trigger;
import io.kestra.plugin.core.log.Log;
import io.kestra.plugin.core.trigger.Schedule;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Primary;
import io.micronaut.context.annotation.Requires;
import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.junit.jupiter.api.Test;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import jakarta.validation.ConstraintViolationException;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
class CollectorServiceTest {
@Test
public void metrics() throws URISyntaxException {
ImmutableMap<String, Object> properties = ImmutableMap.of("kestra.server-type", ServerType.STANDALONE.name());
try (ApplicationContext applicationContext = Helpers.applicationContext(properties).start()) {
MetricRegistry metricRegistry = applicationContext.getBean(MetricRegistry.class);
// inject fake metrics to have plugin metrics
metricRegistry.timer(MetricRegistry.METRIC_WORKER_ENDED_DURATION, MetricRegistry.METRIC_WORKER_ENDED_DURATION_DESCRIPTION, MetricRegistry.TAG_TASK_TYPE, Log.class.getName())
.record(() -> Duration.ofSeconds(1));
metricRegistry.timer(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION, MetricRegistry.METRIC_WORKER_TRIGGER_DURATION_DESCRIPTION, MetricRegistry.TAG_TRIGGER_TYPE, Trigger.class.getName())
.record(() -> Duration.ofSeconds(1));
metricRegistry.timer(MetricRegistry.METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION, MetricRegistry.METRIC_SCHEDULER_TRIGGER_EVALUATION_DURATION_DESCRIPTION, MetricRegistry.TAG_TRIGGER_TYPE, Schedule.class.getName())
.record(() -> Duration.ofSeconds(1));
CollectorService collectorService = applicationContext.getBean(CollectorService.class);
Usage metrics = collectorService.metrics(true);
assertThat(metrics.getUri()).isEqualTo("https://mysuperhost.com/subpath");
assertThat(metrics.getUuid()).isNotNull();
assertThat(metrics.getVersion()).isNotNull();
assertThat(metrics.getStartTime()).isNotNull();
assertThat(metrics.getEnvironments()).contains("test");
assertThat(metrics.getStartTime()).isNotNull();
assertThat(metrics.getHost().getUuid()).isNotNull();
assertThat(metrics.getHost().getHardware().getLogicalProcessorCount()).isNotNull();
assertThat(metrics.getHost().getJvm().getName()).isNotNull();
assertThat(metrics.getHost().getOs().getFamily()).isNotNull();
assertThat(metrics.getConfigurations().getRepositoryType()).isEqualTo("memory");
assertThat(metrics.getConfigurations().getQueueType()).isEqualTo("memory");
assertThat(metrics.getExecutions()).isNotNull();
// 1 per hour
assertThat(metrics.getExecutions().getDailyExecutionsCount().size()).isGreaterThan(0);
// no task runs as it's an empty instance
assertThat(metrics.getExecutions().getDailyTaskRunsCount()).isNull();
assertThat(metrics.getInstanceUuid()).isEqualTo(TestSettingRepository.instanceUuid);
// we have 3 metrics so we should have the info for the related plugins
assertThat(metrics.getPluginMetrics()).hasSize(3);
}
}
@Singleton
@Requires(property = "kestra.unittest")
@Primary
public static class TestSettingRepository implements SettingRepositoryInterface {
public static Object instanceUuid = null;
@Override
public Optional<Setting> findByKey(String key) {
return Optional.empty();
}
@Override
public List<Setting> findAll() {
return new ArrayList<>();
}
@Override
public Setting save(Setting setting) throws ConstraintViolationException {
if (setting.getKey().equals(Setting.INSTANCE_UUID)) {
TestSettingRepository.instanceUuid = setting.getValue();
}
return setting;
}
@Override
public Setting delete(Setting setting) {
return setting;
}
}
}

View File

@@ -0,0 +1,142 @@
package io.kestra.core.services;
import io.kestra.core.context.TestRunContextFactory;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKind;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.utils.IdUtils;
import io.kestra.plugin.core.log.Log;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Optional;
import static io.kestra.core.repositories.AbstractFlowRepositoryTest.TEST_NAMESPACE;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
class FlowTriggerServiceTest {
public static final List<Label> EMPTY_LABELS = List.of();
public static final Optional<MultipleConditionStorageInterface> EMPTY_MULTIPLE_CONDITION_STORAGE = Optional.empty();
@Inject
private TestRunContextFactory runContextFactory;
@Inject
private ConditionService conditionService;
@Inject
private FlowService flowService;
private FlowTriggerService flowTriggerService;
@BeforeEach
void setUp() {
flowTriggerService = new FlowTriggerService(conditionService, runContextFactory, flowService);
}
@Test
void computeExecutionsFromFlowTriggers_ok() {
var simpleFlow = aSimpleFlow();
var flowWithFlowTrigger = Flow.builder()
.id("flow-with-flow-trigger")
.namespace(TEST_NAMESPACE)
.tenantId(MAIN_TENANT)
.tasks(List.of(simpleLogTask()))
.triggers(List.of(
flowTriggerWithNoConditions()
))
.build();
var simpleFlowExecution = Execution.newExecution(simpleFlow, EMPTY_LABELS).withState(State.Type.SUCCESS);
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
simpleFlowExecution,
List.of(simpleFlow, flowWithFlowTrigger),
EMPTY_MULTIPLE_CONDITION_STORAGE
);
assertThat(resultingExecutionsToRun).size().isEqualTo(1);
assertThat(resultingExecutionsToRun.get(0).getFlowId()).isEqualTo(flowWithFlowTrigger.getId());
}
@Test
void computeExecutionsFromFlowTriggers_filteringOutCreatedExecutions() {
var simpleFlow = aSimpleFlow();
var flowWithFlowTrigger = Flow.builder()
.id("flow-with-flow-trigger")
.namespace(TEST_NAMESPACE)
.tenantId(MAIN_TENANT)
.tasks(List.of(simpleLogTask()))
.triggers(List.of(
flowTriggerWithNoConditions()
))
.build();
var simpleFlowExecution = Execution.newExecution(simpleFlow, EMPTY_LABELS).withState(State.Type.CREATED);
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
simpleFlowExecution,
List.of(simpleFlow, flowWithFlowTrigger),
EMPTY_MULTIPLE_CONDITION_STORAGE
);
assertThat(resultingExecutionsToRun).size().isEqualTo(0);
}
@Test
void computeExecutionsFromFlowTriggers_filteringOutTestExecutions() {
var simpleFlow = aSimpleFlow();
var flowWithFlowTrigger = Flow.builder()
.id("flow-with-flow-trigger")
.namespace(TEST_NAMESPACE)
.tenantId(MAIN_TENANT)
.tasks(List.of(simpleLogTask()))
.triggers(List.of(
flowTriggerWithNoConditions()
))
.build();
var simpleFlowExecutionComingFromATest = Execution.newExecution(simpleFlow, EMPTY_LABELS)
.withState(State.Type.SUCCESS)
.toBuilder()
.kind(ExecutionKind.TEST)
.build();
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
simpleFlowExecutionComingFromATest,
List.of(simpleFlow, flowWithFlowTrigger),
EMPTY_MULTIPLE_CONDITION_STORAGE
);
assertThat(resultingExecutionsToRun).size().isEqualTo(0);
}
private static Flow aSimpleFlow() {
return Flow.builder()
.id("simple-flow")
.namespace(TEST_NAMESPACE)
.tenantId(MAIN_TENANT)
.tasks(List.of(simpleLogTask()))
.build();
}
private static io.kestra.plugin.core.trigger.Flow flowTriggerWithNoConditions() {
return io.kestra.plugin.core.trigger.Flow.builder()
.id("flowTrigger")
.type(io.kestra.plugin.core.trigger.Flow.class.getName())
.build();
}
private static Log simpleLogTask() {
return Log.builder()
.id(IdUtils.create())
.type(Log.class.getName())
.message("Hello World")
.build();
}
}

View File

@@ -1,7 +1,8 @@
package io.kestra.core.services;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.kv.*;
import io.micronaut.test.annotation.MockBean;
@@ -26,25 +27,25 @@ class KVStoreServiceTest {
@Test
void shouldGetKVStoreForExistingNamespaceGivenFromNull() {
Assertions.assertNotNull(storeService.get(null, TEST_EXISTING_NAMESPACE, null));
Assertions.assertNotNull(storeService.get(MAIN_TENANT, TEST_EXISTING_NAMESPACE, null));
}
@Test
void shouldThrowExceptionWhenAccessingKVStoreForNonExistingNamespace() {
KVStoreException exception = Assertions.assertThrows(KVStoreException.class, () -> storeService.get(null, "io.kestra.unittest.unknown", null));
KVStoreException exception = Assertions.assertThrows(KVStoreException.class, () -> storeService.get(MAIN_TENANT, "io.kestra.unittest.unknown", null));
Assertions.assertTrue(exception.getMessage().contains("namespace 'io.kestra.unittest.unknown' does not exist"));
}
@Test
void shouldGetKVStoreForAnyNamespaceWhenAccessingFromChildNamespace() {
Assertions.assertNotNull(storeService.get(null, "io.kestra", TEST_EXISTING_NAMESPACE));
Assertions.assertNotNull(storeService.get(MAIN_TENANT, "io.kestra", TEST_EXISTING_NAMESPACE));
}
@Test
void shouldGetKVStoreFromNonExistingNamespaceWithAKV() throws IOException {
KVStore kvStore = new InternalKVStore(null, "system", storageInterface);
KVStore kvStore = new InternalKVStore(MAIN_TENANT, "system", storageInterface);
kvStore.put("key", new KVValueAndMetadata(new KVMetadata("myDescription", Duration.ofHours(1)), "value"));
Assertions.assertNotNull(storeService.get(null, "system", null));
Assertions.assertNotNull(storeService.get(MAIN_TENANT, "system", null));
}
@MockBean(NamespaceService.class)

View File

@@ -27,6 +27,7 @@ import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.within;
@@ -91,7 +92,7 @@ class InternalKVStoreTest {
kv.put(TEST_KV_KEY, new KVValueAndMetadata(new KVMetadata(description, Duration.ofMinutes(5)), complexValue));
// Then
StorageObject withMetadata = storageInterface.getWithMetadata(null, kv.namespace(), URI.create("/" + kv.namespace().replace(".", "/") + "/_kv/my-key.ion"));
StorageObject withMetadata = storageInterface.getWithMetadata(MAIN_TENANT, kv.namespace(), URI.create("/" + kv.namespace().replace(".", "/") + "/_kv/my-key.ion"));
String valueFile = new String(withMetadata.inputStream().readAllBytes());
Instant expirationDate = Instant.parse(withMetadata.metadata().get("expirationDate"));
assertThat(expirationDate.isAfter(before.plus(Duration.ofMinutes(4))) && expirationDate.isBefore(before.plus(Duration.ofMinutes(6)))).isTrue();
@@ -102,7 +103,7 @@ class InternalKVStoreTest {
kv.put(TEST_KV_KEY, new KVValueAndMetadata(new KVMetadata(null, Duration.ofMinutes(10)), "some-value"));
// Then
withMetadata = storageInterface.getWithMetadata(null, kv.namespace(), URI.create("/" + kv.namespace().replace(".", "/") + "/_kv/my-key.ion"));
withMetadata = storageInterface.getWithMetadata(MAIN_TENANT, kv.namespace(), URI.create("/" + kv.namespace().replace(".", "/") + "/_kv/my-key.ion"));
valueFile = new String(withMetadata.inputStream().readAllBytes());
expirationDate = Instant.parse(withMetadata.metadata().get("expirationDate"));
assertThat(expirationDate.isAfter(before.plus(Duration.ofMinutes(9))) && expirationDate.isBefore(before.plus(Duration.ofMinutes(11)))).isTrue();
@@ -176,6 +177,6 @@ class InternalKVStoreTest {
private InternalKVStore kv() {
final String namespaceId = "io.kestra." + IdUtils.create();
return new InternalKVStore(null, namespaceId, storageInterface);
return new InternalKVStore(MAIN_TENANT, namespaceId, storageInterface);
}
}

View File

@@ -17,8 +17,8 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.is;
class InternalNamespaceTest {
@@ -38,7 +38,7 @@ class InternalNamespaceTest {
void shouldGetAllNamespaceFiles() throws IOException, URISyntaxException {
// Given
final String namespaceId = "io.kestra." + IdUtils.create();
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
// When
namespace.putFile(Path.of("/sub/dir/file1.txt"), new ByteArrayInputStream("1".getBytes()));
@@ -56,7 +56,7 @@ class InternalNamespaceTest {
void shouldPutFileGivenNoTenant() throws IOException, URISyntaxException {
// Given
final String namespaceId = "io.kestra." + IdUtils.create();
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
// When
NamespaceFile namespaceFile = namespace.putFile(Path.of("/sub/dir/file.txt"), new ByteArrayInputStream("1".getBytes()));
@@ -73,7 +73,7 @@ class InternalNamespaceTest {
void shouldSucceedPutFileGivenExistingFileForConflictOverwrite() throws IOException, URISyntaxException {
// Given
final String namespaceId = "io.kestra." + IdUtils.create();
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
NamespaceFile namespaceFile = namespace.get(Path.of("/sub/dir/file.txt"));
@@ -92,7 +92,7 @@ class InternalNamespaceTest {
void shouldFailPutFileGivenExistingFileForError() throws IOException, URISyntaxException {
// Given
final String namespaceId = "io.kestra." + IdUtils.create();
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
NamespaceFile namespaceFile = namespace.get(Path.of("/sub/dir/file.txt"));
@@ -109,7 +109,7 @@ class InternalNamespaceTest {
void shouldIgnorePutFileGivenExistingFileForSkip() throws IOException, URISyntaxException {
// Given
final String namespaceId = "io.kestra." + IdUtils.create();
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
NamespaceFile namespaceFile = namespace.get(Path.of("/sub/dir/file.txt"));
@@ -128,7 +128,7 @@ class InternalNamespaceTest {
void shouldFindAllMatchingGivenNoTenant() throws IOException, URISyntaxException {
// Given
final String namespaceId = "io.kestra." + IdUtils.create();
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
// When
namespace.putFile(Path.of("/a/b/c/1.sql"), new ByteArrayInputStream("1".getBytes()));
@@ -171,7 +171,7 @@ class InternalNamespaceTest {
void shouldReturnNoNamespaceFileForEmptyNamespace() throws IOException {
// Given
final String namespaceId = "io.kestra." + IdUtils.create();
final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface);
final InternalNamespace namespace = new InternalNamespace(logger, MAIN_TENANT, namespaceId, storageInterface);
List<NamespaceFile> namespaceFiles = namespace.findAllFilesMatching((unused) -> true);
assertThat(namespaceFiles.size()).isZero();
}

View File

@@ -1,6 +1,7 @@
package io.kestra.core.test;
import io.kestra.core.test.flow.AssertionResult;
import io.kestra.core.test.flow.AssertionRunError;
import io.kestra.core.test.flow.UnitTestResult;
import org.junit.jupiter.api.Test;
@@ -78,6 +79,34 @@ class TestSuiteRunResultTest {
assertThat(res).extracting(TestSuiteRunResult::state).isEqualTo(TestState.FAILED);
}
@Test
void one_testcase_error() {
var res = TestSuiteRunResult.of("id", "testSuiteId", "namespace", "flowId", Instant.now(), Instant.now(),
List.of(
UnitTestResult.of("id", "type", "executionId", URI.create("url"),
List.of(
SUCCESSFUL_ASSERTION
),
List.of(),
null
),
UnitTestResult.of("id", "type", "executionId", URI.create("url"),
List.of(
FAILING_ASSERTION
),
List.of(),
null
),
UnitTestResult.of("id", "type", "executionId", URI.create("url"),
List.of(),
List.of(new AssertionRunError("assertion failed", "assertion failed details")),
null
)
)
);
assertThat(res).extracting(TestSuiteRunResult::state).isEqualTo(TestState.ERROR);
}
@Test
void one_testcase_skipped() {
var skippedTestcaseId = "skipped_testcase_id";

Some files were not shown because too many files have changed in this diff Show More