Compare commits

...

109 Commits

Author SHA1 Message Date
Florian Hussonnois
c0baedf7f1 fix(flows): enhance error logs when injecting default values
Avoid logging unnecessary full exception error stacktrace
when an error occurred while injecting plugin defaults -
Those errors can be logged often and can be temporary.
2025-04-22 14:12:14 +02:00
dependabot[bot]
a06421dd84 build(deps): bump co.elastic.logging:logback-ecs-encoder
Bumps [co.elastic.logging:logback-ecs-encoder](https://github.com/elastic/ecs-logging-java) from 1.6.0 to 1.7.0.
- [Release notes](https://github.com/elastic/ecs-logging-java/releases)
- [Commits](https://github.com/elastic/ecs-logging-java/compare/v1.6.0...v1.7.0)

---
updated-dependencies:
- dependency-name: co.elastic.logging:logback-ecs-encoder
  dependency-version: 1.7.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-04-22 11:23:18 +02:00
dependabot[bot]
de6fcab785 build(deps): bump org.opensearch.client:opensearch-java
Bumps [org.opensearch.client:opensearch-java](https://github.com/opensearch-project/opensearch-java) from 2.22.0 to 2.23.0.
- [Release notes](https://github.com/opensearch-project/opensearch-java/releases)
- [Changelog](https://github.com/opensearch-project/opensearch-java/blob/v2.23.0/CHANGELOG.md)
- [Commits](https://github.com/opensearch-project/opensearch-java/compare/v2.22.0...v2.23.0)

---
updated-dependencies:
- dependency-name: org.opensearch.client:opensearch-java
  dependency-version: 2.23.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-04-22 11:22:01 +02:00
dependabot[bot]
b2f68a7b97 build(deps): bump com.github.oshi:oshi-core from 6.8.0 to 6.8.1
Bumps [com.github.oshi:oshi-core](https://github.com/oshi/oshi) from 6.8.0 to 6.8.1.
- [Release notes](https://github.com/oshi/oshi/releases)
- [Changelog](https://github.com/oshi/oshi/blob/master/CHANGELOG.md)
- [Commits](https://github.com/oshi/oshi/compare/oshi-parent-6.8.0...oshi-parent-6.8.1)

---
updated-dependencies:
- dependency-name: com.github.oshi:oshi-core
  dependency-version: 6.8.1
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-04-22 11:10:17 +02:00
dependabot[bot]
01cb30f933 build(deps): bump software.amazon.awssdk:bom from 2.31.21 to 2.31.25
Bumps software.amazon.awssdk:bom from 2.31.21 to 2.31.25.

---
updated-dependencies:
- dependency-name: software.amazon.awssdk:bom
  dependency-version: 2.31.25
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-04-22 11:09:06 +02:00
dependabot[bot]
d2cda63cfa build(deps): bump flyingSaucerVersion from 9.11.6 to 9.12.0
Bumps `flyingSaucerVersion` from 9.11.6 to 9.12.0.

Updates `org.xhtmlrenderer:flying-saucer-core` from 9.11.6 to 9.12.0
- [Release notes](https://github.com/flyingsaucerproject/flyingsaucer/releases)
- [Changelog](https://github.com/flyingsaucerproject/flyingsaucer/blob/main/CHANGELOG.md)
- [Commits](https://github.com/flyingsaucerproject/flyingsaucer/compare/v9.11.6...v9.12.0)

Updates `org.xhtmlrenderer:flying-saucer-pdf` from 9.11.6 to 9.12.0
- [Release notes](https://github.com/flyingsaucerproject/flyingsaucer/releases)
- [Changelog](https://github.com/flyingsaucerproject/flyingsaucer/blob/main/CHANGELOG.md)
- [Commits](https://github.com/flyingsaucerproject/flyingsaucer/compare/v9.11.6...v9.12.0)

---
updated-dependencies:
- dependency-name: org.xhtmlrenderer:flying-saucer-core
  dependency-version: 9.12.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
- dependency-name: org.xhtmlrenderer:flying-saucer-pdf
  dependency-version: 9.12.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-04-22 11:08:11 +02:00
dependabot[bot]
f89187db6a build(deps): bump io.micronaut.platform:micronaut-platform
Bumps [io.micronaut.platform:micronaut-platform](https://github.com/micronaut-projects/micronaut-platform) from 4.8.0 to 4.8.2.
- [Release notes](https://github.com/micronaut-projects/micronaut-platform/releases)
- [Commits](https://github.com/micronaut-projects/micronaut-platform/compare/v4.8.0...v4.8.2)

---
updated-dependencies:
- dependency-name: io.micronaut.platform:micronaut-platform
  dependency-version: 4.8.2
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-04-22 11:07:01 +02:00
dependabot[bot]
8e4fe892e9 build(deps): bump com.google.guava:guava from 33.4.7-jre to 33.4.8-jre
Bumps [com.google.guava:guava](https://github.com/google/guava) from 33.4.7-jre to 33.4.8-jre.
- [Release notes](https://github.com/google/guava/releases)
- [Commits](https://github.com/google/guava/commits)

---
updated-dependencies:
- dependency-name: com.google.guava:guava
  dependency-version: 33.4.8-jre
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-04-22 11:04:45 +02:00
Barthélémy Ledoux
eb13dce0ff fix(ui): use creating=true instead of identifier=new (#8443) 2025-04-22 09:46:02 +02:00
Florian Hussonnois
a14518b810 refactor(plugin): add dedicated service to parse tasks log line
Add new TaskLogLineMatcher class for matching and
 capturing structured data from task execution logs.

Related-to: kestra-io/kestra-ee#3441
2025-04-22 09:37:32 +02:00
Piyush Bhaskar
c64f15a035 fix(container): update tab container classes for better layout handling. (#8456) 2025-04-22 12:19:29 +05:30
Piyush Bhaskar
f79541616e chore(ui): Improve the display of Resume/Kill executions (#8227)
* chore(ui): Improve the display of Resume/Kill executions

* minor tweak

---------

Co-authored-by: Miloš Paunović <paun992@hotmail.com>
Co-authored-by: Barthélémy Ledoux <ledouxb@me.com>
2025-04-21 11:22:35 +05:30
Frank Tianyu Zeng
cb6a6bfd91 fix(revision): restore side-by-side view in revision history after flow edit (#8439)
Co-authored-by: Piyush Bhaskar <102078527+Piyush-r-bhaskar@users.noreply.github.com>
2025-04-21 11:12:36 +05:30
Ludovic DEHON
b2ae2ff6f7 feat(build): fix some sonar alert 2025-04-20 22:23:41 +02:00
Ludovic DEHON
a7836ca673 feat(build): add sonar 2025-04-19 00:49:09 +02:00
YannC
7bc60a1056 fix(ui): add new property in filters (#8444) 2025-04-18 21:27:55 +02:00
YannC
09943a1e7b fix(webserver): set ids parameters as body instead of queryvalue (#8438) 2025-04-18 11:58:11 +02:00
Sayed Murtadha Ahmed
8b6af7a808 chore(core)*: make sure that notifications with large message text are closeable (#8431)
There was a problem when an error message is too big, the toaster could not be closed. This change is amending that issue.

Closes https://github.com/kestra-io/kestra/issues/8352.
2025-04-17 11:44:17 +02:00
YannC.
e74e7ff8e5 fix(ui): correct path for namespace files on namespace page
close #8140
2025-04-17 10:34:46 +02:00
Anna Geller
9adf3a5444 fix(docs): improve PublishMetrics example and basic.md KV doc (#8427) 2025-04-16 20:02:36 +02:00
Karuna Tata
c069b2fbb3 chore(core)*: deboucing task validation from no code editor (#8418)
There was an issue when a user edited a task from the No Code editor—the validation endpoint was being called excessively, instead of waiting for the user to finish typing. This has now been resolved with this PR.

Closes https://github.com/kestra-io/kestra/issues/7073.

Co-authored-by: MilosPaunovic <paun992@hotmail.com>
2025-04-16 12:31:03 +02:00
YannC
534b7f4ec7 refactor(core): changes to improve openapi spec
* better method name
* correct annotation to descript body
2025-04-16 12:26:32 +02:00
Miloš Paunović
7b1ee4a9e0 refactor(core): prevent inheriting unused attributes in components (#8419)
Remove default inheriting of attributes in Vue components, which are not used. This pollutes the console with warnings and it's not necessary, so this PR is solving the problem.
2025-04-16 12:08:45 +02:00
Miloš Paunović
38a9ebcbef refactor(core): replace deprecated vue flow options parameter (#8417)
Remove usage of the deprecated options object parameter with the id one, as per the latest API guidelines. This change ensures compatibility with future versions and removes the related console warning.

Related to #7804.
2025-04-16 10:29:47 +02:00
weibo1
eea47c6e40 feat(system): add duration metrics for handle() method 2025-04-16 09:05:00 +02:00
weibo1
e1c4ae22f2 feat(system): add index to commonly queried fields in the WHERE conditions of the triggers table 2025-04-16 09:05:00 +02:00
Miloš Paunović
b7861a139e chore(deps): regular dependency update (#8415)
Performing a weekly round of dependency updates in the NPM ecosystem to keep everything up to date.
2025-04-16 08:46:32 +02:00
Miloš Paunović
954d64ecaa fix(core)*: allow horizontal scrolling of tabs in multi panel view (#8414)
If there are multiple tabs opened in a single panel of the new multi panel view,  there was no ability to scroll and see the overflowing ones. With changes in this PR, users can now do just that.

Closes https://github.com/kestra-io/kestra/issues/8270.
2025-04-16 12:05:37 +05:30
Karuna Tata
f61ba36023 feat(core)*: allow moving of entire panels (#8377)
Changes in this pull request now allow users to move entire panels where ever they want, plus, there are `Move right` and `Move left` options in the context menu.

Closes https://github.com/kestra-io/kestra/issues/8272.

Co-authored-by: MilosPaunovic <paun992@hotmail.com>
2025-04-16 07:47:09 +02:00
Loïc Mathieu
fbd989ccab feat(core): allow to specify Pause task resume behavior (#8242)
When a pause task is resumed either because it wait until the end of it's duration or is resumed manually, it can now use a behavior to describe what to do next: resume, warn, fail, or cancel the execution.

Fixes #8242
2025-04-15 17:39:53 +02:00
yuri
12affd4b4b fix(build): amend Node Gradle to reflect changes (#7970) 2025-04-15 15:15:34 +02:00
Satvik Kushwaha
b75730a0ca chore(dashboards)*: amend hover color of table top buttons on dashboard (#8388)
Tables on main dashboard which have the `See all` buttons needed UI tweaking of color on hover, which is handled in this pull request.

Closes https://github.com/kestra-io/kestra/issues/8376.

Co-authored-by: MilosPaunovic <paun992@hotmail.com>
2025-04-15 14:14:49 +02:00
Barthélémy Ledoux
4170615765 fix(ui): navigation between flow editor and tabs - blank screen (#8392) 2025-04-15 11:41:18 +02:00
Barthélémy Ledoux
5cfb6aa1f5 test(ui): ignore unhandled canceled promises in storybook (#8394) 2025-04-15 11:39:39 +02:00
AJ Emerich
41d660e18e docs(core): update plugin titles and descriptions (#8390) 2025-04-15 11:22:47 +02:00
yuri
8af4f1928a chore(webserver): amend copy&pasted description (#8346) 2025-04-15 11:12:09 +02:00
github-actions[bot]
1488caccc7 chore(core): localize to languages other than english (#8391)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-04-15 11:09:33 +02:00
Karuna Tata
85fc48963f feat(core): add a confirmation dialog before leaving the settings page (#8365)
If the user changed something on the `Settings` page and then just navigated to another page, all new changes would all be lost. This PR is adding a confirmation dialog before route leave, if there are any changes made, to confirm either saving or discarding them.

Closes https://github.com/kestra-io/kestra/issues/8364.

Co-authored-by: MilosPaunovic <paun992@hotmail.com>
2025-04-15 10:31:49 +02:00
Barthélémy Ledoux
b706dec9d2 test(core): make storybook tests pass with less warnings (#8382)
Lots of warnings that we can see in the UI unit tests make them flaky. In this PR we're trying to avoid pollution as much as possible so we can at least test.

Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-04-15 10:16:14 +02:00
Karuna Tata
ceac4d38f9 feat(core)*: implement redirection to flow creation page on skipping tutorial (#8333)
When a user clicks the `Skip tutorial` button, it just does that, shuts the tutorial down but leaves the user on the same page. This change will redirect users to `Flow Creation` page if the `Skip Tutorial` button is clicked.

Closes https://github.com/kestra-io/kestra/issues/8326.

Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-04-15 08:11:41 +02:00
dependabot[bot]
ec7bf52e08 build(deps-dev): bump vite in /ui in the npm_and_yarn group (#8362)
Bumps the npm_and_yarn group in /ui with 1 update: [vite](https://github.com/vitejs/vite/tree/HEAD/packages/vite).


Updates `vite` from 6.2.5 to 6.2.6
- [Release notes](https://github.com/vitejs/vite/releases)
- [Changelog](https://github.com/vitejs/vite/blob/v6.2.6/packages/vite/CHANGELOG.md)
- [Commits](https://github.com/vitejs/vite/commits/v6.2.6/packages/vite)

---
updated-dependencies:
- dependency-name: vite
  dependency-version: 6.2.6
  dependency-type: direct:development
  dependency-group: npm_and_yarn
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-04-14 15:22:09 +02:00
Loïc Mathieu
247594299c feat(system)!: Describe our internal metrics
Add a description to all our internal metrics.

BREAKING CHANGE: the following metrics has been removed:
- executor.taskrun.next.count
- executor.workertaskresult.count

The metric scheduler.execution.running.duration has been renamed to scheduler.execution.lock.duration
2025-04-14 15:17:23 +02:00
Loïc Mathieu
513139976c feat(system): Allow to describe metrics (#1989) 2025-04-14 15:17:23 +02:00
Miloš Paunović
2cab9de57c refactor(namespaces): remove the unused maximize property (#8380)
There is no more need for `maximize` property to be passed for locked tabs, and therefore it's removed in this commit.
2025-04-14 13:55:45 +02:00
dependabot[bot]
cfae13c045 build(deps): bump io.micrometer:micrometer-core from 1.14.5 to 1.14.6
Bumps [io.micrometer:micrometer-core](https://github.com/micrometer-metrics/micrometer) from 1.14.5 to 1.14.6.
- [Release notes](https://github.com/micrometer-metrics/micrometer/releases)
- [Commits](https://github.com/micrometer-metrics/micrometer/compare/v1.14.5...v1.14.6)

---
updated-dependencies:
- dependency-name: io.micrometer:micrometer-core
  dependency-version: 1.14.6
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-04-14 11:55:59 +02:00
dependabot[bot]
6190f8774a build(deps): bump com.google.guava:guava from 33.4.6-jre to 33.4.7-jre
Bumps [com.google.guava:guava](https://github.com/google/guava) from 33.4.6-jre to 33.4.7-jre.
- [Release notes](https://github.com/google/guava/releases)
- [Commits](https://github.com/google/guava/commits)

---
updated-dependencies:
- dependency-name: com.google.guava:guava
  dependency-version: 33.4.7-jre
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-04-14 10:55:50 +02:00
dependabot[bot]
b98a0a783d build(deps): bump commons-io:commons-io from 2.18.0 to 2.19.0
Bumps commons-io:commons-io from 2.18.0 to 2.19.0.

---
updated-dependencies:
- dependency-name: commons-io:commons-io
  dependency-version: 2.19.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-04-14 10:10:24 +02:00
dependabot[bot]
3da2dc6257 build(deps): bump flyingSaucerVersion from 9.11.5 to 9.11.6
Bumps `flyingSaucerVersion` from 9.11.5 to 9.11.6.

Updates `org.xhtmlrenderer:flying-saucer-core` from 9.11.5 to 9.11.6
- [Release notes](https://github.com/flyingsaucerproject/flyingsaucer/releases)
- [Changelog](https://github.com/flyingsaucerproject/flyingsaucer/blob/main/CHANGELOG.md)
- [Commits](https://github.com/flyingsaucerproject/flyingsaucer/compare/v9.11.5...v9.11.6)

Updates `org.xhtmlrenderer:flying-saucer-pdf` from 9.11.5 to 9.11.6
- [Release notes](https://github.com/flyingsaucerproject/flyingsaucer/releases)
- [Changelog](https://github.com/flyingsaucerproject/flyingsaucer/blob/main/CHANGELOG.md)
- [Commits](https://github.com/flyingsaucerproject/flyingsaucer/compare/v9.11.5...v9.11.6)

---
updated-dependencies:
- dependency-name: org.xhtmlrenderer:flying-saucer-core
  dependency-version: 9.11.6
  dependency-type: direct:production
  update-type: version-update:semver-patch
- dependency-name: org.xhtmlrenderer:flying-saucer-pdf
  dependency-version: 9.11.6
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-04-14 10:10:06 +02:00
dependabot[bot]
6feb027696 build(deps): bump org.aspectj:aspectjweaver from 1.9.23 to 1.9.24
Bumps [org.aspectj:aspectjweaver](https://github.com/eclipse/org.aspectj) from 1.9.23 to 1.9.24.
- [Release notes](https://github.com/eclipse/org.aspectj/releases)
- [Commits](https://github.com/eclipse/org.aspectj/commits)

---
updated-dependencies:
- dependency-name: org.aspectj:aspectjweaver
  dependency-version: 1.9.24
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-04-14 10:09:21 +02:00
dependabot[bot]
83d6095669 build(deps): bump software.amazon.awssdk:bom from 2.31.16 to 2.31.21
Bumps software.amazon.awssdk:bom from 2.31.16 to 2.31.21.

---
updated-dependencies:
- dependency-name: software.amazon.awssdk:bom
  dependency-version: 2.31.21
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-04-14 10:09:02 +02:00
dependabot[bot]
444e3d2a77 build(deps): bump software.amazon.awssdk.crt:aws-crt
Bumps [software.amazon.awssdk.crt:aws-crt](https://github.com/awslabs/aws-crt-java) from 0.37.0 to 0.38.1.
- [Release notes](https://github.com/awslabs/aws-crt-java/releases)
- [Commits](https://github.com/awslabs/aws-crt-java/compare/v0.37.0...v0.38.1)

---
updated-dependencies:
- dependency-name: software.amazon.awssdk.crt:aws-crt
  dependency-version: 0.38.1
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-04-14 10:08:28 +02:00
Satvik Kushwaha
752405ac78 fix(flows)*: amend text color problem in no code editor on firefox (#8367)
There was an issue with title color of tasks on the `No Code` editor (which stayed dark no matter the mode selected), but only viewed in `Firefox` browser. That is sorted out with this pull request.

Closes https://github.com/kestra-io/kestra/issues/8327.

Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-04-14 09:35:39 +02:00
Miloš Paunović
860c1b218c feat(core)*: allow closing all tabs at once in a single panel (#8360)
Adds the ability to close all tabs in a single panel with a single click, improving usability and eliminating the need to close each tab individually.

Closes https://github.com/kestra-io/kestra/issues/8273.
2025-04-11 15:14:04 +02:00
Barthélémy Ledoux
0bd017556a fix(ui): command array should display properly in no-code (#8349) 2025-04-11 14:47:17 +02:00
Miloš Paunović
63ec5cab27 build(core): update commit message and description for translation-related pull requests (#8358)
This change updates the commit message and description generated by the translation CI process to be more descriptive and consistent with our commit conventions. It improves the clarity and traceability of automated translation updates in the commit history.
2025-04-11 14:38:31 +02:00
brian-mulier-p
e73f15a538 fix(core): bring back documentation on some plugins (#8354)
swagger is now resolved from Kestra classpath

closes #8265
2025-04-11 13:00:53 +02:00
github-actions[bot]
f871fa838e chore(translations)*: localize to languages other than english (#8353) 2025-04-11 12:11:38 +02:00
Miloš Paunović
c2028759e4 feat(core)*: introduce a dropdown menu for each panel (#8351)
This is the initial work to support the two related issues mentioned above, which is introducing the context menu for each tab panel, which will give us the place to list related actions.

Relates to https://github.com/kestra-io/kestra/issues/8272.
Relates to https://github.com/kestra-io/kestra/issues/8273.
2025-04-11 11:56:04 +02:00
Nicolas K.
21d6e0fa62 test(system): rework concurrency paused tests so they actually test t… (#8339)
* test(system): rework concurrency paused tests so they actually test the pause + concurrency behavior

* test(system): Add a flow name check in the queue because kafka queue is not cleaned between tests

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-04-11 11:16:01 +02:00
Bart Ledoux
ab666bff11 test(ui): failing test for autocompletiona after contributor's PR 2025-04-11 09:42:23 +02:00
Miloš Paunović
92c082e2e0 chore(flows)*: hide horizontal scroll on editor based inputs (#8347)
There was always a horizontal scrollbar present at editor compnent when used as input field, which was previously handled for the No Code editor in https://github.com/kestra-io/kestra/pull/8216.

Relates to https://github.com/kestra-io/kestra-ee/issues/3404.
2025-04-11 08:09:22 +02:00
Frank Tianyu Zeng
5a3a54fd57 feat(variables): add missing functions to editor autocompletion (#8245)
closes #7733

---------

Co-authored-by: brian.mulier <bmmulier@hotmail.fr>
2025-04-10 23:32:40 +02:00
Florian Hussonnois
1576051ebb fix(flow): re-enable NotEmpty constraint on property tasks for Sequential 2025-04-10 18:03:35 +02:00
Loïc Mathieu
4ec2a5d064 chore(system): don't emit two time a workertaskresult for RUNNING
We emit a WorkerTaskResult in RUNNING, then create an attempt, emit a new WorkerTaskResult with the attemps and then start running the task.
We can only emit one time a WorkerTaskResult as the second would be emitted microseconds after so it's just noise.
2025-04-10 18:02:42 +02:00
Florian Hussonnois
846e20a100 fix(flow): enhance error handling when injecting plugin defaults
Add a new checked exception FlowProcessingException to enhance error
handling when parsing, validating a flow, and injecting plugin defaults.

Related-to: #7894
2025-04-10 17:13:40 +02:00
Barthélémy Ledoux
433a332123 feat: use TaskTasks.vue to show subtask (#8332) 2025-04-10 16:59:10 +02:00
Loïc Mathieu
9b8b240d7c feat(core): use a fixed-size mask to mask secrets (#8131)
Fixes 8131
2025-04-10 15:17:05 +02:00
Miloš Paunović
e6937c4a8c feat(core)*: close code tab if file is deleted (#8331)
In the new multi panel view, if we have `code` tab opened and delete that file from `files` panel, we'll automatically close the opened tab for the file in question.

Closes https://github.com/kestra-io/kestra/issues/8271.
2025-04-10 14:51:43 +02:00
Nicolas K.
70dbe6a219 test(system): fix flaky runner test by adding sleep in subtask (#8329)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-04-10 14:41:21 +02:00
Mathieu Gabelle
8ceee4cfdc refactor: migrate plugin.core flowable tasks to dynamic proeprties (#8313)
* refactor: migrate plugin.core flowable tasks to dynamic proeprties

migrated properties to dynamic properties if possible
updated tests accordingly
remove unused import and fix sonar issues
2025-04-10 13:59:57 +02:00
Florian Hussonnois
b97347df97 fix(flow): handle parsing error when reading flows from repository
This commit fixes NPE in JdbcExecutor that can occurred when
reading invalid an flow, and add better handling of parsing error in JDBC flow
repositories.

Related-to: #7894
2025-04-10 13:28:11 +02:00
Piyush Bhaskar
b177a1f304 chore(ui): Add a trigger from Triggers tab (#7754)
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
Co-authored-by: Barthélémy Ledoux <ledouxb@me.com>
2025-04-10 11:11:34 +02:00
Piyush Bhaskar
999406aee4 fix(ui): uniform scrollbar across UI (#7758)
* scrollbar  uniforming

* fix(ui): uniform scrollbar across ui.

* minor tweaks

* minor tweak

---------

Co-authored-by: Miloš Paunović <paun992@hotmail.com>
Co-authored-by: Barthélémy Ledoux <ledouxb@me.com>
2025-04-10 11:10:46 +02:00
Barthélémy Ledoux
31fd0303b5 fix: make monaco composition API (#8320)
* refactor: make monaco composition API

* fix kv

* rename var in monaco

* enable autocomplete in new panel

* fix: set theme even in diff mode

* refactor: avoid using computed when unnecessary
2025-04-10 10:57:28 +02:00
Florian Hussonnois
32b4a9e0be fix(core): handle parsing error when reading flows from repository
This commit fixes a regression after applying plugin default
versions in the AbstractJdbcFlowRepository
2025-04-09 22:58:24 +02:00
Barthélémy Ledoux
2c411a27cc build: use storybook test addon to get coverage (#8316) 2025-04-09 20:53:53 +02:00
Florian Hussonnois
504ff282ef fix: add missing indices for service instance table 2025-04-09 19:01:42 +02:00
Nicolas K.
365d82eb96 feat(#233)!: remove warningOnError behavior (#8321)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-04-09 18:00:20 +02:00
YannC
9a67466d9c chore(deps): upgrade micronaut-platform to 4.8.0 (#8232)
* chore(deps): upgrade micronaut-platform to 4.8.0

* fix: http client version enforcement is not needed anymore as docker-java as updated the dependency

* fix: make applicationContext protected in SchedulerPollingTriggerTest

* fix: upgrade docker-java version to 3.5.0

* fix: make some method protected in the AbstractRunnerTest so it can be override
2025-04-09 17:06:38 +02:00
Nicolas K.
a04db6510b feat(makefile): add pull-plugin script and fixe clone-plugin script (#8283)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-04-09 17:03:05 +02:00
Will Russell
ab2fea660a doc(Core): task example indentation error 2025-04-09 15:25:25 +01:00
ben8t
1c9016ba45 fix(sanitycheck): update flows id and namespace
* fix:sleep assert comparison

* fix:edit sanitycheck according to new process

* fix:typo
2025-04-09 14:46:47 +02:00
Miloš Paunović
935ff944a5 fix(ui): make sure that dropping tab places it into right place (#8315)
Co-authored-by: Barthélémy Ledoux <ledouxb@me.com>
2025-04-09 14:41:01 +02:00
Will Russell
fd4667383c doc(Devcontainer): link instructions to contributing guide 2025-04-09 12:15:44 +01:00
Loïc Mathieu
5bec2fa8b3 feat(cli,core)!: change KESTRA_ env prefix by ENV_
This avoid possible security issue as you can use env var to override Kestra configuration properties and Kestra related configuration properties starts with `kestra`.

Fixes https://github.com/kestra-io/kestra-ee/issues/3131
2025-04-09 12:48:54 +02:00
Piyush Bhaskar
c24fcfd0ca chore(ui): Add a trigger from Triggers tab (#7754)
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
Co-authored-by: Barthélémy Ledoux <ledouxb@me.com>
2025-04-09 11:25:34 +02:00
Piyush Bhaskar
22424c69b6 chore(ui): improve the filter label colors on hover (#8141) 2025-04-09 11:17:18 +02:00
Bart Ledoux
938b3cc08b tests: fix storybook multipanel test 2025-04-09 11:15:14 +02:00
Piyush Bhaskar
5fdf2f3085 chore(ui): amend scrollbar issue on the no code editor fields (#8216) 2025-04-09 11:10:07 +02:00
Tanvir Ahmed
062957982b fix: [#8282] - Fixed issue with Devcontainer not running on arm64 architecture (#8285)
* fix: [#8282] - Fixed issue with Devcontainer not running on arm64 architecture

* chore: Updated implementation of dynamic install of Java JDK
2025-04-09 10:07:16 +01:00
Bart Ledoux
c08213b48d ci: remove some odd arguments to frontend checkout 2025-04-09 10:49:25 +02:00
Karuna Tata
152d96f018 chore(ui): remove reorder buttons when there's only one value in no code arrays (#8266) 2025-04-09 09:52:02 +02:00
AJ Emerich
4865843b10 fix(docs): add titles and examples to Dashboard chart types (#8233)
* fix(docs): add titles to Dashboard chart types

* fix(docs): add examples to Dashboard chart types

* fix(docs): add bar chart example
2025-04-09 09:19:46 +02:00
Miloš Paunović
47309a0782 chore(deps): regular dependency update (#8310) 2025-04-09 08:11:58 +02:00
Bart Ledoux
7d7340b4ba fix: remove the big plus when dragging 2025-04-08 16:11:58 +02:00
Bart Ledoux
eb1509959c chore: remove console.log drop 2025-04-08 15:50:59 +02:00
Nicolas K.
5285bea930 feat(Unit Tests) #8171 convert hamcrest to assertj (#8276)
* feat(Unit Tests) #8171 convert hamcrest to assertj

* fix(Unit Tests) #8171 failing unit test after assertj migration

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-04-08 15:48:54 +02:00
Bart Ledoux
cc5a1c9f68 fix: remove props warning in flow editor 2025-04-08 15:47:04 +02:00
MilosPaunovic
725e5e5d78 fix(ui): allow tab button to take effect while using editor 2025-04-08 15:09:30 +02:00
Bart Ledoux
56b903b8fd fix: repair docs home page 2025-04-08 15:05:08 +02:00
Miloš Paunović
21c0c86238 fix(ui): amend namespace files creation & editing problems (#8279) 2025-04-08 13:43:20 +02:00
Miloš Paunović
65ab695001 chore(ui): pass prop to maximize the height of namespace file editor (#8278) 2025-04-08 13:34:33 +02:00
Miloš Paunović
1310e59cf9 chore(ui): amend the sizing of editor panels (#8277) 2025-04-08 13:30:22 +02:00
YannC
c35352e2b4 feat: Deprecated BOOLEAN input for new BOOL input with no undefined state (#8257)
close #8225
2025-04-08 10:22:11 +02:00
Miloš Paunović
59589b1c2d refactor(ui): uniform the namespace components for oss and ee versions (#8256) 2025-04-08 09:29:19 +02:00
brian.mulier
4729430a00 feat(ui): ability to hide secret value when typing in secrets
closes kestra-io/kestra-ee#3358
2025-04-08 00:10:22 +02:00
brian.mulier
1aa37d5756 fix(core): duration defaults are rendered as duration string instead of timestamp
closes #3742
2025-04-07 22:06:24 +02:00
brian.mulier
1a121951d6 fix(jdbc): conflict on migration numbers 2025-04-07 21:59:14 +02:00
YannC
92ee3f749e fix: correct version migration for H2 sql file 2025-04-07 19:20:49 +02:00
631 changed files with 10177 additions and 9188 deletions

View File

@@ -1,5 +1,6 @@
FROM ubuntu:24.04
ARG BUILDPLATFORM
ARG DEBIAN_FRONTEND=noninteractive
USER root
@@ -31,9 +32,23 @@ ENV SHELL=/bin/zsh
# --------------------------------------
# Java
# --------------------------------------
RUN wget https://download.oracle.com/java/21/latest/jdk-21_linux-x64_bin.deb
RUN dpkg -i ./jdk-21_linux-x64_bin.deb
ENV JAVA_HOME=/usr/java/jdk-21-oracle-x64
ARG OS_ARCHITECTURE
RUN mkdir -p /usr/java
RUN echo "Building on platform: $BUILDPLATFORM"
RUN case "$BUILDPLATFORM" in \
"linux/amd64") OS_ARCHITECTURE="linux-x64" ;; \
"linux/arm64") OS_ARCHITECTURE="linux-aarch64" ;; \
"darwin/amd64") OS_ARCHITECTURE="macos-x64" ;; \
"darwin/arm64") OS_ARCHITECTURE="macos-aarch64" ;; \
*) echo "Unsupported BUILDPLATFORM: $BUILDPLATFORM" && exit 1 ;; \
esac && \
wget "https://aka.ms/download-jdk/microsoft-jdk-21.0.6-$OS_ARCHITECTURE.tar.gz" && \
mv "microsoft-jdk-21.0.6-$OS_ARCHITECTURE.tar.gz" microsoft-jdk-21.0.6.tar.gz
RUN tar -xzvf microsoft-jdk-21.0.6.tar.gz && \
mv jdk-21.0.6+7 jdk-21 && \
mv jdk-21 /usr/java/
ENV JAVA_HOME=/usr/java/jdk-21
ENV PATH="$PATH:$JAVA_HOME/bin"
# Will load a custom configuration file for Micronaut
ENV MICRONAUT_ENVIRONMENTS=local,override

View File

@@ -37,6 +37,10 @@ The following dependencies are required to build Kestra locally:
- Docker & Docker Compose
- an IDE (Intellij IDEA, Eclipse or VS Code)
Thanks to the Kestra community, if using VSCode, you can also start development on either the frontend or backend with a bootstrapped docker container without the need to manually set up the environment.
Check out the [README](../.devcontainer/README.md) for set-up instructions and the associated [Dockerfile](../.devcontainer/Dockerfile) in the respository to get started.
To start contributing:
- [Fork](https://docs.github.com/en/github/getting-started-with-github/fork-a-repo) the repository
- Clone the fork on your workstation:
@@ -46,7 +50,7 @@ git clone git@github.com:{YOUR_USERNAME}/kestra.git
cd kestra
```
#### Develop backend
#### Develop on the backend
The backend is made with [Micronaut](https://micronaut.io).
Open the cloned repository in your favorite IDE. In most of decent IDEs, Gradle build will be detected and all dependencies will be downloaded.
@@ -72,7 +76,7 @@ python3 -m pip install virtualenv
```
#### Develop frontend
#### Develop on the frontend
The frontend is made with [Vue.js](https://vuejs.org/) and located on the `/ui` folder.
- `npm install`

View File

@@ -62,7 +62,7 @@ jobs:
- name: Build with Gradle
if: ${{ matrix.language == 'java' }}
run: ./gradlew testClasses -x :ui:installFrontend -x :ui:assembleFrontend
run: ./gradlew testClasses -x :ui:assembleFrontend
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)

View File

@@ -62,6 +62,6 @@ jobs:
echo "No changes to commit. Exiting with success."
exit 0
fi
git commit -m "chore(translations): localize to languages other than English"
git commit -m "chore(core): localize to languages other than english" -m "Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference."
git push -u origin $BRANCH_NAME || (git push origin --delete $BRANCH_NAME && git push -u origin $BRANCH_NAME)
gh pr create --title "Translations from en.json" --body "This PR was created automatically by a GitHub Action." --base develop --head $BRANCH_NAME --assignee anna-geller --reviewer anna-geller

View File

@@ -8,6 +8,9 @@ on:
env:
JAVA_VERSION: '21'
permissions:
contents: read
jobs:
dependency-check:
name: Dependency Check
@@ -57,6 +60,10 @@ jobs:
develop-image-check:
name: Image Check (develop)
runs-on: ubuntu-latest
permissions:
contents: read
security-events: write
actions: read
steps:
# Checkout
- uses: actions/checkout@v4
@@ -83,13 +90,25 @@ jobs:
uses: aquasecurity/trivy-action@0.30.0
with:
image-ref: kestra/kestra:develop
format: table
format: 'template'
template: '@/contrib/sarif.tpl'
severity: 'CRITICAL,HIGH'
output: 'trivy-results.sarif'
skip-dirs: /app/plugins
scanners: vuln
- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: 'trivy-results.sarif'
category: docker-
latest-image-check:
name: Image Check (latest)
runs-on: ubuntu-latest
permissions:
contents: read
security-events: write
actions: read
steps:
# Checkout
- uses: actions/checkout@v4
@@ -118,4 +137,11 @@ jobs:
image-ref: kestra/kestra:latest
format: table
skip-dirs: /app/plugins
scanners: vuln
scanners: vuln
severity: 'CRITICAL,HIGH'
output: 'trivy-results.sarif'
- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: 'trivy-results.sarif'

View File

@@ -31,6 +31,8 @@ jobs:
steps:
- uses: actions/checkout@v4
name: Checkout - Current ref
with:
fetch-depth: 0
# Setup build
- uses: kestra-io/actions/.github/actions/setup-build@main

View File

@@ -19,11 +19,8 @@ jobs:
name: Frontend - Tests
runs-on: ubuntu-latest
steps:
- id: checkout
name: Checkout - Current ref
- name: Checkout
uses: actions/checkout@v4
with:
ref: ${{ github.head_ref }}
- name: Npm - install
shell: bash
@@ -44,28 +41,15 @@ jobs:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
run: npm run build
- name: Run front-end unit tests
shell: bash
working-directory: ui
run: npm run test:cicd
- name: Storybook - Install Playwright
shell: bash
working-directory: ui
run: npx playwright install --with-deps
- name: Storybook - Build
- name: Run front-end unit tests
shell: bash
working-directory: ui
run: npm run build-storybook --quiet
- name: Storybook - Run tests
shell: bash
working-directory: ui
run: |
npx concurrently -k -s first -n "SB,TEST" -c "magenta,blue" \
"npx http-server storybook-static --port 6006 --silent" \
"npx wait-on tcp:127.0.0.1:6006 && npm run test:storybook"
run: npm run test:cicd
- name: Codecov - Upload coverage reports
uses: codecov/codecov-action@v5

View File

@@ -181,8 +181,8 @@ clone-plugins:
@echo "Using PLUGIN_GIT_DIR: $(PLUGIN_GIT_DIR)"
@mkdir -p "$(PLUGIN_GIT_DIR)"
@echo "Fetching repository list from GitHub..."
@REPOS=$(gh repo list kestra-io -L 1000 --json name | jq -r .[].name | sort | grep "^plugin-") \
for repo in $$REPOS; do \
@REPOS=$$(gh repo list kestra-io -L 1000 --json name | jq -r .[].name | sort | grep "^plugin-"); \
for repo in $$REPOS; do \
if [[ $$repo == plugin-* ]]; then \
if [ -d "$(PLUGIN_GIT_DIR)/$$repo" ]; then \
echo "Skipping: $$repo (Already cloned)"; \
@@ -194,6 +194,22 @@ clone-plugins:
done
@echo "Done!"
# Pull every plugins in main or master branch
pull-plugins:
@echo "🔍 Pulling repositories in '$(PLUGIN_GIT_DIR)'..."
@for repo in "$(PLUGIN_GIT_DIR)"/*; do \
if [ -d "$$repo/.git" ]; then \
branch=$$(git -C "$$repo" rev-parse --abbrev-ref HEAD); \
if [[ "$$branch" == "master" || "$$branch" == "main" ]]; then \
echo "🔄 Pulling: $$(basename "$$repo") (branch: $$branch)"; \
git -C "$$repo" pull; \
else \
echo "❌ Skipping: $$(basename "$$repo") (Not on master or main branch, currently on $$branch)"; \
fi; \
fi; \
done
@echo "✅ Done pulling!"
# Update all plugins jar
build-plugins:
@echo "🔍 Scanning repositories in '$(PLUGIN_GIT_DIR)'..."

View File

@@ -196,6 +196,9 @@ subprojects {
testImplementation 'org.hamcrest:hamcrest'
testImplementation 'org.hamcrest:hamcrest-library'
testImplementation 'org.exparity:hamcrest-date'
//assertj
testImplementation 'org.assertj:assertj-core'
}
test {
@@ -213,8 +216,8 @@ subprojects {
environment 'SECRET_WEBHOOK_KEY', "secretKey".bytes.encodeBase64().toString()
environment 'SECRET_NON_B64_SECRET', "some secret value"
environment 'SECRET_PASSWORD', "cGFzc3dvcmQ="
environment 'KESTRA_TEST1', "true"
environment 'KESTRA_TEST2', "Pass by env"
environment 'ENV_TEST1', "true"
environment 'ENV_TEST2', "Pass by env"
}
testlogger {
@@ -279,7 +282,7 @@ subprojects {
}
dependencies {
agent "org.aspectj:aspectjweaver:1.9.23"
agent "org.aspectj:aspectjweaver:1.9.24"
}
test {

View File

@@ -36,5 +36,5 @@ dependencies {
implementation project(":webserver")
//test
testImplementation "org.wiremock:wiremock"
testImplementation "org.wiremock:wiremock-jetty12"
}

View File

@@ -1,6 +1,6 @@
package io.kestra.cli.services;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.FlowProcessingException;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithPath;
import io.kestra.core.models.flows.FlowWithSource;
@@ -236,7 +236,7 @@ public class FileChangedEventListener {
FlowWithSource flow = pluginDefaultService.parseFlowWithAllDefaults(tenantId, content, false);
modelValidator.validate(flow);
return Optional.of(flow);
} catch (DeserializationException | ConstraintViolationException e) {
} catch (ConstraintViolationException | FlowProcessingException e) {
log.warn("Error while parsing flow: {}", entry, e);
}
return Optional.empty();

View File

@@ -168,7 +168,7 @@ kestra:
values:
recoverMissedSchedules: ALL
variables:
env-vars-prefix: KESTRA_
env-vars-prefix: ENV_
cache-enabled: true
cache-size: 1000

View File

@@ -13,8 +13,7 @@ import picocli.CommandLine;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
class AppTest {
@@ -26,7 +25,7 @@ class AppTest {
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
PicocliRunner.call(App.class, ctx, "--help");
assertThat(out.toString(), containsString("kestra"));
assertThat(out.toString()).contains("kestra");
}
}
@@ -42,7 +41,7 @@ class AppTest {
new CommandLine(App.class, new MicronautFactory(ctx)).execute(args);
assertTrue(ctx.getProperty("kestra.server-type", ServerType.class).isEmpty());
assertThat(out.toString(), startsWith("Usage: kestra server " + serverType));
assertThat(out.toString()).startsWith("Usage: kestra server " + serverType);
}
}
@@ -56,9 +55,9 @@ class AppTest {
try (ApplicationContext ctx = App.applicationContext(App.class, argsWithMissingParams)) {
new CommandLine(App.class, new MicronautFactory(ctx)).execute(argsWithMissingParams);
assertThat(out.toString(), startsWith("Missing required parameters: "));
assertThat(out.toString(), containsString("Usage: kestra flow namespace update "));
assertThat(out.toString(), not(containsString("MissingParameterException: ")));
assertThat(out.toString()).startsWith("Missing required parameters: ");
assertThat(out.toString()).contains("Usage: kestra flow namespace update ");
assertThat(out.toString()).doesNotContain("MissingParameterException: ");
}
}
}

View File

@@ -8,8 +8,7 @@ import org.junit.jupiter.api.Test;
import java.util.Map;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.assertj.core.api.Assertions.assertThat;
class ServerCommandValidatorTest {
@@ -40,8 +39,8 @@ class ServerCommandValidatorTest {
.start()
);
final Throwable rootException = getRootException(exception);
assertThat(rootException.getClass(), is(ServerCommandValidator.ServerCommandException.class));
assertThat(rootException.getMessage(), is("Incomplete server configuration - missing required properties"));
assertThat(rootException.getClass()).isEqualTo(ServerCommandValidator.ServerCommandException.class);
assertThat(rootException.getMessage()).isEqualTo("Incomplete server configuration - missing required properties");
}
private Throwable getRootException(Throwable exception) {

View File

@@ -8,8 +8,7 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.StringContains.containsString;
import static org.assertj.core.api.Assertions.assertThat;
class ConfigPropertiesCommandTest {
@Test
@@ -20,8 +19,8 @@ class ConfigPropertiesCommandTest {
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
PicocliRunner.call(ConfigPropertiesCommand.class, ctx);
assertThat(out.toString(), containsString("activeEnvironments:"));
assertThat(out.toString(), containsString("- test"));
assertThat(out.toString()).contains("activeEnvironments:");
assertThat(out.toString()).contains("- test");
}
}
}

View File

@@ -11,9 +11,7 @@ import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.StringContains.containsString;
import static org.assertj.core.api.Assertions.assertThat;
class FlowCreateOrUpdateCommandTest {
@RetryingTest(5) // flaky on CI but cannot be reproduced even with 100 repetitions
@@ -38,7 +36,7 @@ class FlowCreateOrUpdateCommandTest {
};
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
assertThat(out.toString(), containsString("4 flow(s)"));
assertThat(out.toString()).contains("4 flow(s)");
out.reset();
args = new String[]{
@@ -53,7 +51,7 @@ class FlowCreateOrUpdateCommandTest {
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
// 2 delete + 1 update
assertThat(out.toString(), containsString("4 flow(s)"));
assertThat(out.toString()).contains("4 flow(s)");
}
}
@@ -80,7 +78,7 @@ class FlowCreateOrUpdateCommandTest {
};
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
assertThat(out.toString(), containsString("4 flow(s)"));
assertThat(out.toString()).contains("4 flow(s)");
out.reset();
// no "delete" arg should behave as no-delete
@@ -93,7 +91,7 @@ class FlowCreateOrUpdateCommandTest {
};
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
assertThat(out.toString(), containsString("1 flow(s)"));
assertThat(out.toString()).contains("1 flow(s)");
out.reset();
args = new String[]{
@@ -106,7 +104,7 @@ class FlowCreateOrUpdateCommandTest {
};
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
assertThat(out.toString(), containsString("1 flow(s)"));
assertThat(out.toString()).contains("1 flow(s)");
}
}
@@ -131,8 +129,8 @@ class FlowCreateOrUpdateCommandTest {
};
Integer call = PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
assertThat(call, is(0));
assertThat(out.toString(), containsString("1 flow(s)"));
assertThat(call).isZero();
assertThat(out.toString()).contains("1 flow(s)");
}
}
}

View File

@@ -9,9 +9,7 @@ import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.StringContains.containsString;
import static org.assertj.core.api.Assertions.assertThat;
class FlowDotCommandTest {
@Test
@@ -26,8 +24,8 @@ class FlowDotCommandTest {
};
Integer call = PicocliRunner.call(FlowDotCommand.class, ctx, args);
assertThat(call, is(0));
assertThat(out.toString(), containsString("\"root.date\"[shape=box];"));
assertThat(call).isZero();
assertThat(out.toString()).contains("\"root.date\"[shape=box];");
}
}
}

View File

@@ -7,8 +7,7 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.assertj.core.api.Assertions.assertThat;
class FlowExpandCommandTest {
@SuppressWarnings("deprecation")
@@ -23,22 +22,20 @@ class FlowExpandCommandTest {
};
Integer call = PicocliRunner.call(FlowExpandCommand.class, ctx, args);
assertThat(call, is(0));
assertThat(out.toString(), is(
"id: include\n" +
"namespace: io.kestra.cli\n" +
"\n" +
"# The list of tasks\n" +
"tasks:\n" +
"- id: t1\n" +
" type: io.kestra.plugin.core.debug.Return\n" +
" format: \"Lorem ipsum dolor sit amet\"\n" +
"- id: t2\n" +
" type: io.kestra.plugin.core.debug.Return\n" +
" format: |\n" +
" Lorem ipsum dolor sit amet\n" +
" Lorem ipsum dolor sit amet\n"
));
assertThat(call).isZero();
assertThat(out.toString()).isEqualTo("id: include\n" +
"namespace: io.kestra.cli\n" +
"\n" +
"# The list of tasks\n" +
"tasks:\n" +
"- id: t1\n" +
" type: io.kestra.plugin.core.debug.Return\n" +
" format: \"Lorem ipsum dolor sit amet\"\n" +
"- id: t2\n" +
" type: io.kestra.plugin.core.debug.Return\n" +
" format: |\n" +
" Lorem ipsum dolor sit amet\n" +
" Lorem ipsum dolor sit amet\n");
}
}
}

View File

@@ -14,10 +14,7 @@ import java.io.PrintStream;
import java.net.URL;
import java.util.zip.ZipFile;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.StringContains.containsString;
import static org.assertj.core.api.Assertions.assertThat;
class FlowExportCommandTest {
@Test
@@ -42,7 +39,7 @@ class FlowExportCommandTest {
directory.getPath(),
};
PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, updateArgs);
assertThat(out.toString(), containsString("3 flow(s)"));
assertThat(out.toString()).contains("3 flow(s)");
// then we export them
String[] exportArgs = {
@@ -58,11 +55,11 @@ class FlowExportCommandTest {
};
PicocliRunner.call(FlowExportCommand.class, ctx, exportArgs);
File file = new File("/tmp/flows.zip");
assertThat(file.exists(), is(true));
assertThat(file.exists()).isTrue();
ZipFile zipFile = new ZipFile(file);
// When launching the test in a suite, there is 4 flows but when lauching individualy there is only 3
assertThat(zipFile.stream().count(), greaterThanOrEqualTo(3L));
assertThat(zipFile.stream().count()).isGreaterThanOrEqualTo(3L);
file.delete();
}

View File

@@ -10,9 +10,7 @@ import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.StringContains.containsString;
import static org.assertj.core.api.Assertions.assertThat;
class FlowUpdatesCommandTest {
@Test
@@ -39,7 +37,7 @@ class FlowUpdatesCommandTest {
};
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
assertThat(out.toString(), containsString("successfully updated !"));
assertThat(out.toString()).contains("successfully updated !");
out.reset();
args = new String[]{
@@ -56,7 +54,7 @@ class FlowUpdatesCommandTest {
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
// 2 delete + 1 update
assertThat(out.toString(), containsString("successfully updated !"));
assertThat(out.toString()).contains("successfully updated !");
}
}
@@ -85,7 +83,7 @@ class FlowUpdatesCommandTest {
};
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
assertThat(out.toString(), containsString("4 flow(s)"));
assertThat(out.toString()).contains("4 flow(s)");
out.reset();
// no "delete" arg should behave as no-delete
@@ -100,7 +98,7 @@ class FlowUpdatesCommandTest {
};
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
assertThat(out.toString(), containsString("1 flow(s)"));
assertThat(out.toString()).contains("1 flow(s)");
out.reset();
args = new String[]{
@@ -115,7 +113,7 @@ class FlowUpdatesCommandTest {
};
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
assertThat(out.toString(), containsString("1 flow(s)"));
assertThat(out.toString()).contains("1 flow(s)");
}
}
@@ -144,7 +142,7 @@ class FlowUpdatesCommandTest {
};
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
assertThat(out.toString(), containsString("Invalid entity: flow.namespace: io.kestra.outsider_quattro_-1 - flow namespace is invalid"));
assertThat(out.toString()).contains("Invalid entity: flow.namespace: io.kestra.outsider_quattro_-1 - flow namespace is invalid");
}
}
@@ -171,8 +169,8 @@ class FlowUpdatesCommandTest {
};
Integer call = PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
assertThat(call, is(0));
assertThat(out.toString(), containsString("1 flow(s)"));
assertThat(call).isZero();
assertThat(out.toString()).contains("1 flow(s)");
}
}
}

View File

@@ -7,9 +7,7 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.StringContains.containsString;
import static org.assertj.core.api.Assertions.assertThat;
class FlowValidateCommandTest {
@Test
@@ -24,8 +22,8 @@ class FlowValidateCommandTest {
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call, is(0));
assertThat(out.toString(), containsString("✓ - io.kestra.cli / include"));
assertThat(call).isZero();
assertThat(out.toString()).contains("✓ - io.kestra.cli / include");
}
}
@@ -41,10 +39,10 @@ class FlowValidateCommandTest {
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call, is(0));
assertThat(out.toString(), containsString("✓ - system / warning"));
assertThat(out.toString(), containsString("⚠ - tasks[0] is deprecated"));
assertThat(out.toString(), containsString(" - io.kestra.core.tasks.log.Log is replaced by io.kestra.plugin.core.log.Log"));
assertThat(call).isZero();
assertThat(out.toString()).contains("✓ - system / warning");
assertThat(out.toString()).contains("⚠ - tasks[0] is deprecated");
assertThat(out.toString()).contains(" - io.kestra.core.tasks.log.Log is replaced by io.kestra.plugin.core.log.Log");
}
}
}

View File

@@ -10,8 +10,7 @@ import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.StringContains.containsString;
import static org.assertj.core.api.Assertions.assertThat;
public class SingleFlowCommandsTest {
@@ -37,7 +36,7 @@ public class SingleFlowCommandsTest {
};
PicocliRunner.call(FlowDeleteCommand.class, ctx, deleteArgs);
assertThat(out.toString(), containsString("Flow successfully deleted !"));
assertThat(out.toString()).contains("Flow successfully deleted !");
out.reset();
String[] createArgs = {
@@ -49,7 +48,7 @@ public class SingleFlowCommandsTest {
};
PicocliRunner.call(FlowCreateCommand.class, ctx, createArgs);
assertThat(out.toString(), containsString("Flow successfully created !"));
assertThat(out.toString()).contains("Flow successfully created !");
out.reset();String[] updateArgs = {
@@ -63,7 +62,7 @@ public class SingleFlowCommandsTest {
};
PicocliRunner.call(FlowUpdateCommand.class, ctx, updateArgs);
assertThat(out.toString(), containsString("Flow successfully updated !"));
assertThat(out.toString()).contains("Flow successfully updated !");
out.reset();
}
}

View File

@@ -10,9 +10,7 @@ import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.StringContains.containsString;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateValidateCommandTest {
@Test
@@ -28,9 +26,9 @@ class TemplateValidateCommandTest {
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call, is(1));
assertThat(out.toString(), containsString("Unable to parse flow"));
assertThat(out.toString(), containsString("must not be empty"));
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse flow");
assertThat(out.toString()).contains("must not be empty");
}
}
@@ -56,9 +54,9 @@ class TemplateValidateCommandTest {
};
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);
assertThat(call, is(1));
assertThat(out.toString(), containsString("Unable to parse flow"));
assertThat(out.toString(), containsString("must not be empty"));
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse flow");
assertThat(out.toString()).contains("must not be empty");
}
}
}

View File

@@ -7,9 +7,7 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.core.Is.is;
import static org.assertj.core.api.Assertions.assertThat;
class FlowNamespaceCommandTest {
@Test
@@ -21,8 +19,8 @@ class FlowNamespaceCommandTest {
String[] args = {};
Integer call = PicocliRunner.call(FlowNamespaceCommand.class, ctx, args);
assertThat(call, is(0));
assertThat(out.toString(), containsString("Usage: kestra flow namespace"));
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra flow namespace");
}
}
}

View File

@@ -10,10 +10,7 @@ import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.core.StringContains.containsString;
import static org.hamcrest.core.Is.is;
import static org.assertj.core.api.Assertions.assertThat;
class FlowNamespaceUpdateCommandTest {
@Test
@@ -39,7 +36,7 @@ class FlowNamespaceUpdateCommandTest {
};
PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString(), containsString("namespace 'io.kestra.cli' successfully updated"));
assertThat(out.toString()).contains("namespace 'io.kestra.cli' successfully updated");
out.reset();
args = new String[]{
@@ -55,7 +52,7 @@ class FlowNamespaceUpdateCommandTest {
PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, args);
// 2 delete + 1 update
assertThat(out.toString(), containsString("namespace 'io.kestra.cli' successfully updated"));
assertThat(out.toString()).contains("namespace 'io.kestra.cli' successfully updated");
}
}
@@ -81,9 +78,9 @@ class FlowNamespaceUpdateCommandTest {
};
Integer call = PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, args);
assertThat(call, is(1));
assertThat(out.toString(), containsString("Unable to parse flows"));
assertThat(out.toString(), containsString("must not be empty"));
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse flows");
assertThat(out.toString()).contains("must not be empty");
}
}
@@ -111,7 +108,7 @@ class FlowNamespaceUpdateCommandTest {
};
PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString(), containsString("3 flow(s)"));
assertThat(out.toString()).contains("3 flow(s)");
out.reset();
// no "delete" arg should behave as no-delete
@@ -125,7 +122,7 @@ class FlowNamespaceUpdateCommandTest {
};
PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString(), containsString("1 flow(s)"));
assertThat(out.toString()).contains("1 flow(s)");
out.reset();
args = new String[]{
@@ -139,7 +136,7 @@ class FlowNamespaceUpdateCommandTest {
};
PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString(), containsString("1 flow(s)"));
assertThat(out.toString()).contains("1 flow(s)");
}
}
@@ -165,8 +162,8 @@ class FlowNamespaceUpdateCommandTest {
};
Integer call = PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, args);
assertThat(call, is(0));
assertThat(out.toString(), containsString("1 flow(s)"));
assertThat(call).isZero();
assertThat(out.toString()).contains("1 flow(s)");
}
}
@@ -195,8 +192,8 @@ class FlowNamespaceUpdateCommandTest {
};
PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString(), containsString("io.kestra.override"));
assertThat(out.toString(), not(containsString("io.kestra.cli")));
assertThat(out.toString()).contains("io.kestra.override");
assertThat(out.toString()).doesNotContain("io.kestra.cli");
}
}

View File

@@ -7,9 +7,7 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.core.Is.is;
import static org.assertj.core.api.Assertions.assertThat;
class NamespaceCommandTest {
@Test
@@ -21,8 +19,8 @@ class NamespaceCommandTest {
String[] args = {};
Integer call = PicocliRunner.call(NamespaceCommand.class, ctx, args);
assertThat(call, is(0));
assertThat(out.toString(), containsString("Usage: kestra namespace"));
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra namespace");
}
}
}

View File

@@ -7,9 +7,7 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.core.Is.is;
import static org.assertj.core.api.Assertions.assertThat;
class NamespaceFilesCommandTest {
@Test
@@ -21,8 +19,8 @@ class NamespaceFilesCommandTest {
String[] args = {};
Integer call = PicocliRunner.call(NamespaceFilesCommand.class, ctx, args);
assertThat(call, is(0));
assertThat(out.toString(), containsString("Usage: kestra namespace files"));
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra namespace files");
}
}
}

View File

@@ -14,8 +14,8 @@ import java.net.URISyntaxException;
import java.net.URL;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.core.StringContains.containsString;
class NamespaceFilesUpdateCommandTest {
@Test

View File

@@ -7,9 +7,7 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.core.Is.is;
import static org.assertj.core.api.Assertions.assertThat;
class KvCommandTest {
@Test
@@ -21,8 +19,8 @@ class KvCommandTest {
String[] args = {};
Integer call = PicocliRunner.call(KvCommand.class, ctx, args);
assertThat(call, is(0));
assertThat(out.toString(), containsString("Usage: kestra namespace kv"));
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra namespace kv");
}
}
}

View File

@@ -16,8 +16,7 @@ import java.io.IOException;
import java.nio.file.Files;
import java.util.Map;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.assertj.core.api.Assertions.assertThat;
class KvUpdateCommandTest {
@Test
@@ -43,8 +42,8 @@ class KvUpdateCommandTest {
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);
assertThat(kvStore.getValue("string").get(), is(new KVValue("stringValue")));
assertThat(((InternalKVStore)kvStore).getRawValue("string").get(), is("\"stringValue\""));
assertThat(kvStore.getValue("string").get()).isEqualTo(new KVValue("stringValue"));
assertThat(((InternalKVStore) kvStore).getRawValue("string").get()).isEqualTo("\"stringValue\"");
}
}
@@ -71,8 +70,8 @@ class KvUpdateCommandTest {
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);
assertThat(kvStore.getValue("int").get(), is(new KVValue(1)));
assertThat(((InternalKVStore)kvStore).getRawValue("int").get(), is("1"));
assertThat(kvStore.getValue("int").get()).isEqualTo(new KVValue(1));
assertThat(((InternalKVStore) kvStore).getRawValue("int").get()).isEqualTo("1");
}
}
@@ -101,8 +100,8 @@ class KvUpdateCommandTest {
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);
assertThat(kvStore.getValue("intStr").get(), is(new KVValue("1")));
assertThat(((InternalKVStore)kvStore).getRawValue("intStr").get(), is("\"1\""));
assertThat(kvStore.getValue("intStr").get()).isEqualTo(new KVValue("1"));
assertThat(((InternalKVStore) kvStore).getRawValue("intStr").get()).isEqualTo("\"1\"");
}
}
@@ -129,8 +128,8 @@ class KvUpdateCommandTest {
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);
assertThat(kvStore.getValue("object").get(), is(new KVValue(Map.of("some", "json"))));
assertThat(((InternalKVStore)kvStore).getRawValue("object").get(), is("{some:\"json\"}"));
assertThat(kvStore.getValue("object").get()).isEqualTo(new KVValue(Map.of("some", "json")));
assertThat(((InternalKVStore) kvStore).getRawValue("object").get()).isEqualTo("{some:\"json\"}");
}
}
@@ -159,8 +158,8 @@ class KvUpdateCommandTest {
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);
assertThat(kvStore.getValue("objectStr").get(), is(new KVValue("{\"some\":\"json\"}")));
assertThat(((InternalKVStore)kvStore).getRawValue("objectStr").get(), is("\"{\\\"some\\\":\\\"json\\\"}\""));
assertThat(kvStore.getValue("objectStr").get()).isEqualTo(new KVValue("{\"some\":\"json\"}"));
assertThat(((InternalKVStore) kvStore).getRawValue("objectStr").get()).isEqualTo("\"{\\\"some\\\":\\\"json\\\"}\"");
}
}
@@ -193,8 +192,8 @@ class KvUpdateCommandTest {
KVStoreService kvStoreService = ctx.getBean(KVStoreService.class);
KVStore kvStore = kvStoreService.get(null, "io.kestra.cli", null);
assertThat(kvStore.getValue("objectFromFile").get(), is(new KVValue(Map.of("some", "json", "from", "file"))));
assertThat(((InternalKVStore)kvStore).getRawValue("objectFromFile").get(), is("{some:\"json\",from:\"file\"}"));
assertThat(kvStore.getValue("objectFromFile").get()).isEqualTo(new KVValue(Map.of("some", "json", "from", "file")));
assertThat(((InternalKVStore) kvStore).getRawValue("objectFromFile").get()).isEqualTo("{some:\"json\",from:\"file\"}");
}
}
}

View File

@@ -8,8 +8,7 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.StringContains.containsString;
import static org.assertj.core.api.Assertions.assertThat;
class PluginCommandTest {
@@ -21,7 +20,7 @@ class PluginCommandTest {
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
PicocliRunner.call(PluginCommand.class, ctx);
assertThat(out.toString(), containsString("Usage: kestra plugins"));
assertThat(out.toString()).contains("Usage: kestra plugins");
}
}
}

View File

@@ -17,8 +17,7 @@ import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.assertj.core.api.Assertions.assertThat;
class PluginDocCommandTest {
@@ -44,16 +43,16 @@ class PluginDocCommandTest {
List<Path> files = Files.list(docPath).toList();
assertThat(files.size(), is(1));
assertThat(files.getFirst().getFileName().toString(), is("plugin-template-test"));
assertThat(files.size()).isEqualTo(1);
assertThat(files.getFirst().getFileName().toString()).isEqualTo("plugin-template-test");
var directory = files.getFirst().toFile();
assertThat(directory.isDirectory(), is(true));
assertThat(directory.listFiles().length, is(3));
assertThat(directory.isDirectory()).isTrue();
assertThat(directory.listFiles().length).isEqualTo(3);
var readme = directory.toPath().resolve("index.md");
var readmeContent = new String(Files.readAllBytes(readme));
assertThat(readmeContent, containsString("""
assertThat(readmeContent).contains("""
---
title: Template test
description: "Plugin template for Kestra"
@@ -61,18 +60,17 @@ class PluginDocCommandTest {
---
# Template test
"""));
""");
assertThat(readmeContent, containsString("""
assertThat(readmeContent).contains("""
Plugin template for Kestra
This is a more complex description of the plugin.
This is in markdown and will be inline inside the plugin page.
"""));
""");
assertThat(readmeContent, containsString(
"""
assertThat(readmeContent).contains("""
/> Subgroup title
Subgroup description
@@ -89,20 +87,20 @@ class PluginDocCommandTest {
\s
* [Reporting](./guides/reporting.md)
\s
"""));
""");
// check @PluginProperty from an interface
var task = directory.toPath().resolve("tasks/io.kestra.plugin.templates.ExampleTask.md");
String taskDoc = new String(Files.readAllBytes(task));
assertThat(taskDoc, containsString("""
assertThat(taskDoc).contains("""
### `example`
* **Type:** ==string==
* **Dynamic:** ✔️
* **Required:** ❌
**Example interface**
"""));
assertThat(taskDoc, containsString("""
""");
assertThat(taskDoc).contains("""
### `from`
* **Type:**
* ==string==
@@ -110,12 +108,12 @@ class PluginDocCommandTest {
* [==Example==](#io.kestra.core.models.annotations.example)
* **Dynamic:** ✔️
* **Required:** ✔️
"""));
""");
var authenticationGuide = directory.toPath().resolve("guides/authentication.md");
assertThat(new String(Files.readAllBytes(authenticationGuide)), containsString("This is how to authenticate for this plugin:"));
assertThat(new String(Files.readAllBytes(authenticationGuide))).contains("This is how to authenticate for this plugin:");
var reportingGuide = directory.toPath().resolve("guides/reporting.md");
assertThat(new String(Files.readAllBytes(reportingGuide)), containsString("This is the reporting of the plugin:"));
assertThat(new String(Files.readAllBytes(reportingGuide))).contains("This is the reporting of the plugin:");
}
}
}

View File

@@ -10,8 +10,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.assertj.core.api.Assertions.assertThat;
class PluginInstallCommandTest {
@@ -26,8 +25,8 @@ class PluginInstallCommandTest {
List<Path> files = Files.list(pluginsPath).toList();
assertThat(files.size(), is(1));
assertThat(files.getFirst().getFileName().toString(), is("io_kestra_plugin__plugin-notifications__0_6_0.jar"));
assertThat(files.size()).isEqualTo(1);
assertThat(files.getFirst().getFileName().toString()).isEqualTo("io_kestra_plugin__plugin-notifications__0_6_0.jar");
}
}
@@ -42,9 +41,9 @@ class PluginInstallCommandTest {
List<Path> files = Files.list(pluginsPath).toList();
assertThat(files.size(), is(1));
assertThat(files.getFirst().getFileName().toString(), startsWith("io_kestra_plugin__plugin-notifications__"));
assertThat(files.getFirst().getFileName().toString(), not(containsString("LATEST")));
assertThat(files.size()).isEqualTo(1);
assertThat(files.getFirst().getFileName().toString()).startsWith("io_kestra_plugin__plugin-notifications__");
assertThat(files.getFirst().getFileName().toString()).doesNotContain("LATEST");
}
}
@@ -60,8 +59,8 @@ class PluginInstallCommandTest {
List<Path> files = Files.list(pluginsPath).toList();
assertThat(files.size(), is(1));
assertThat(files.getFirst().getFileName().toString(), is("io_kestra_storage__storage-s3__0_12_1.jar"));
assertThat(files.size()).isEqualTo(1);
assertThat(files.getFirst().getFileName().toString()).isEqualTo("io_kestra_storage__storage-s3__0_12_1.jar");
}
}
}

View File

@@ -16,8 +16,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Objects;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.StringContains.containsString;
import static org.assertj.core.api.Assertions.assertThat;
class PluginListCommandTest {
@@ -41,7 +40,7 @@ class PluginListCommandTest {
String[] args = {"--plugins", pluginsPath.toAbsolutePath().toString()};
PicocliRunner.call(PluginListCommand.class, ctx, args);
assertThat(out.toString(), containsString("io.kestra.plugin.templates.Example"));
assertThat(out.toString()).contains("io.kestra.plugin.templates.Example");
}
}
}

View File

@@ -13,8 +13,7 @@ import java.io.PrintStream;
import java.util.Map;
import static com.github.tomakehurst.wiremock.client.WireMock.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.assertj.core.api.Assertions.assertThat;
@WireMockTest(httpPort = 28181)
class PluginSearchCommandTest {
@@ -61,9 +60,9 @@ class PluginSearchCommandTest {
PicocliRunner.call(PluginSearchCommand.class, ctx, args);
String output = outputStreamCaptor.toString().trim();
assertThat(output, containsString("Found 1 plugins matching 'notifications'"));
assertThat(output, containsString("plugin-notifications"));
assertThat(output, not(containsString("plugin-scripts")));
assertThat(output).contains("Found 1 plugins matching 'notifications'");
assertThat(output).contains("plugin-notifications");
assertThat(output).doesNotContain("plugin-scripts");
}
}
@@ -97,9 +96,9 @@ class PluginSearchCommandTest {
PicocliRunner.call(PluginSearchCommand.class, ctx, args);
String output = outputStreamCaptor.toString().trim();
assertThat(output, containsString("Found 2 plugins"));
assertThat(output, containsString("plugin-notifications"));
assertThat(output, containsString("plugin-scripts"));
assertThat(output).contains("Found 2 plugins");
assertThat(output).contains("plugin-notifications");
assertThat(output).contains("plugin-scripts");
}
}
}

View File

@@ -11,9 +11,7 @@ import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.StringContains.containsString;
import static org.assertj.core.api.Assertions.assertThat;
class ReindexCommandTest {
@Test
@@ -36,7 +34,7 @@ class ReindexCommandTest {
directory.getPath(),
};
PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, updateArgs);
assertThat(out.toString(), containsString("3 flow(s)"));
assertThat(out.toString()).contains("3 flow(s)");
// then we reindex them
String[] reindexArgs = {
@@ -44,9 +42,9 @@ class ReindexCommandTest {
"flow",
};
Integer call = PicocliRunner.call(ReindexCommand.class, ctx, reindexArgs);
assertThat(call, is(0));
assertThat(call).isZero();
// in local it reindex 3 flows and in CI 4 for an unknown reason
assertThat(out.toString(), containsString("Successfully reindex"));
assertThat(out.toString()).contains("Successfully reindex");
}
}
}

View File

@@ -7,9 +7,7 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.core.Is.is;
import static org.assertj.core.api.Assertions.assertThat;
class DatabaseCommandTest {
@Test
@@ -21,8 +19,8 @@ class DatabaseCommandTest {
String[] args = {};
Integer call = PicocliRunner.call(DatabaseCommand.class, ctx, args);
assertThat(call, is(0));
assertThat(out.toString(), containsString("Usage: kestra sys database"));
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra sys database");
}
}
}

View File

@@ -8,9 +8,7 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.core.Is.is;
import static org.assertj.core.api.Assertions.assertThat;
class StateStoreCommandTest {
@Test
@@ -22,8 +20,8 @@ class StateStoreCommandTest {
String[] args = {};
Integer call = PicocliRunner.call(StateStoreCommand.class, ctx, args);
assertThat(call, is(0));
assertThat(out.toString(), containsString("Usage: kestra sys state-store"));
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra sys state-store");
}
}
}

View File

@@ -25,8 +25,7 @@ import java.net.URI;
import java.util.List;
import java.util.Map;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.assertj.core.api.Assertions.assertThat;
class StateStoreMigrateCommandTest {
@Test
@@ -54,10 +53,7 @@ class StateStoreMigrateCommandTest {
oldStateStoreUri,
new ByteArrayInputStream("my-value".getBytes())
);
assertThat(
storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri),
is(true)
);
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isTrue();
RunContext runContext = ctx.getBean(RunContextFactory.class).of(flow, Map.of("flow", Map.of(
"tenantId", tenantId,
@@ -70,13 +66,10 @@ class StateStoreMigrateCommandTest {
String[] args = {};
Integer call = PicocliRunner.call(StateStoreMigrateCommand.class, ctx, args);
assertThat(new String(stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value").readAllBytes()), is("my-value"));
assertThat(
storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri),
is(false)
);
assertThat(new String(stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value").readAllBytes())).isEqualTo("my-value");
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isFalse();
assertThat(call, is(0));
assertThat(call).isZero();
}
}
}

View File

@@ -15,9 +15,7 @@ import java.net.URL;
import java.util.Map;
import java.util.zip.ZipFile;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.StringContains.containsString;
import static org.hamcrest.core.Is.is;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateExportCommandTest {
@Test
@@ -42,7 +40,7 @@ class TemplateExportCommandTest {
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString(), containsString("3 template(s)"));
assertThat(out.toString()).contains("3 template(s)");
// then we export them
String[] exportArgs = {
@@ -56,9 +54,9 @@ class TemplateExportCommandTest {
};
PicocliRunner.call(TemplateExportCommand.class, ctx, exportArgs);
File file = new File("/tmp/templates.zip");
assertThat(file.exists(), is(true));
assertThat(file.exists()).isTrue();
ZipFile zipFile = new ZipFile(file);
assertThat(zipFile.stream().count(), is(3L));
assertThat(zipFile.stream().count()).isEqualTo(3L);
file.delete();
}

View File

@@ -11,11 +11,9 @@ import java.io.PrintStream;
import java.net.URL;
import java.util.Map;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.StringContains.containsString;
import static org.assertj.core.api.Assertions.assertThat;
public class TemplateValidateCommandTest {
class TemplateValidateCommandTest {
@Test
void runLocal() {
URL directory = TemplateValidateCommandTest.class.getClassLoader().getResource("invalidsTemplates/template.yml");
@@ -29,9 +27,9 @@ public class TemplateValidateCommandTest {
};
Integer call = PicocliRunner.call(TemplateValidateCommand.class, ctx, args);
assertThat(call, is(1));
assertThat(out.toString(), containsString("Unable to parse template"));
assertThat(out.toString(), containsString("must not be empty"));
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse template");
assertThat(out.toString()).contains("must not be empty");
}
}
@@ -55,9 +53,9 @@ public class TemplateValidateCommandTest {
};
Integer call = PicocliRunner.call(TemplateValidateCommand.class, ctx, args);
assertThat(call, is(1));
assertThat(out.toString(), containsString("Unable to parse template"));
assertThat(out.toString(), containsString("must not be empty"));
assertThat(call).isEqualTo(1);
assertThat(out.toString()).contains("Unable to parse template");
assertThat(out.toString()).contains("must not be empty");
}
}
}

View File

@@ -7,9 +7,7 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.core.Is.is;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateNamespaceCommandTest {
@Test
@@ -21,8 +19,8 @@ class TemplateNamespaceCommandTest {
String[] args = {};
Integer call = PicocliRunner.call(TemplateNamespaceCommand.class, ctx, args);
assertThat(call, is(0));
assertThat(out.toString(), containsString("Usage: kestra template namespace"));
assertThat(call).isZero();
assertThat(out.toString()).contains("Usage: kestra template namespace");
}
}
}

View File

@@ -11,8 +11,7 @@ import java.io.PrintStream;
import java.net.URL;
import java.util.Map;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.StringContains.containsString;
import static org.assertj.core.api.Assertions.assertThat;
class TemplateNamespaceUpdateCommandTest {
@Test
@@ -37,7 +36,7 @@ class TemplateNamespaceUpdateCommandTest {
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString(), containsString("3 template(s)"));
assertThat(out.toString()).contains("3 template(s)");
}
}
@@ -64,8 +63,8 @@ class TemplateNamespaceUpdateCommandTest {
Integer call = PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
// assertThat(call, is(1));
assertThat(out.toString(), containsString("Unable to parse templates"));
assertThat(out.toString(), containsString("must not be empty"));
assertThat(out.toString()).contains("Unable to parse templates");
assertThat(out.toString()).contains("must not be empty");
}
}
@@ -93,7 +92,7 @@ class TemplateNamespaceUpdateCommandTest {
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, args);
assertThat(out.toString(), containsString("3 template(s)"));
assertThat(out.toString()).contains("3 template(s)");
String[] newArgs = {
"--server",
@@ -107,7 +106,7 @@ class TemplateNamespaceUpdateCommandTest {
};
PicocliRunner.call(TemplateNamespaceUpdateCommand.class, ctx, newArgs);
assertThat(out.toString(), containsString("1 template(s)"));
assertThat(out.toString()).contains("1 template(s)");
}
}
}

View File

@@ -10,8 +10,7 @@ import java.io.IOException;
import java.nio.file.Files;
import java.util.Map;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.assertj.core.api.Assertions.assertThat;
class DeleteConfigurationApplicationListenersTest {
@@ -28,7 +27,7 @@ class DeleteConfigurationApplicationListenersTest {
);
try (ApplicationContext ctx = ApplicationContext.run(mapPropertySource, Environment.CLI, Environment.TEST)) {
assertThat(tempFile.exists(), is(false));
assertThat(tempFile.exists()).isFalse();
}
}
}

View File

@@ -19,8 +19,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import static io.kestra.core.utils.Rethrow.throwRunnable;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.assertj.core.api.Assertions.assertThat;
@MicronautTest(environments = {"test", "file-watch"}, transactional = false)
class FileChangedEventListenerTest {
@@ -77,9 +76,9 @@ class FileChangedEventListenerTest {
Duration.ofSeconds(10)
);
Flow myflow = flowRepository.findById(null, "io.kestra.tests.watch", "myflow").orElseThrow();
assertThat(myflow.getTasks(), hasSize(1));
assertThat(myflow.getTasks().getFirst().getId(), is("hello"));
assertThat(myflow.getTasks().getFirst().getType(), is("io.kestra.plugin.core.log.Log"));
assertThat(myflow.getTasks()).hasSize(1);
assertThat(myflow.getTasks().getFirst().getId()).isEqualTo("hello");
assertThat(myflow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
// delete the flow
Files.delete(Path.of(FILE_WATCH + "/myflow.yaml"));
@@ -116,9 +115,9 @@ class FileChangedEventListenerTest {
Duration.ofSeconds(10)
);
Flow pluginDefaultFlow = flowRepository.findById(null, "io.kestra.tests.watch", "pluginDefault").orElseThrow();
assertThat(pluginDefaultFlow.getTasks(), hasSize(1));
assertThat(pluginDefaultFlow.getTasks().getFirst().getId(), is("helloWithDefault"));
assertThat(pluginDefaultFlow.getTasks().getFirst().getType(), is("io.kestra.plugin.core.log.Log"));
assertThat(pluginDefaultFlow.getTasks()).hasSize(1);
assertThat(pluginDefaultFlow.getTasks().getFirst().getId()).isEqualTo("helloWithDefault");
assertThat(pluginDefaultFlow.getTasks().getFirst().getType()).isEqualTo("io.kestra.plugin.core.log.Log");
// delete both files
Files.delete(Path.of(FILE_WATCH + "/plugin-default.yaml"));

View File

@@ -4,6 +4,8 @@ import com.fasterxml.classmate.ResolvedType;
import com.fasterxml.classmate.members.HierarchicType;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
@@ -47,10 +49,18 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static io.kestra.core.serializers.JacksonMapper.MAP_TYPE_REFERENCE;
@Singleton
public class JsonSchemaGenerator {
private static final List<Class<?>> TYPES_RESOLVED_AS_STRING = List.of(Duration.class, LocalTime.class, LocalDate.class, LocalDateTime.class, ZonedDateTime.class, OffsetDateTime.class, OffsetTime.class);
private static final ObjectMapper MAPPER = JacksonMapper.ofJson().copy()
.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false);
private static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml().copy()
.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false);
private final PluginRegistry pluginRegistry;
@Inject
@@ -92,7 +102,7 @@ public class JsonSchemaGenerator {
pullDocumentationAndDefaultFromAnyOf(objectNode);
removeRequiredOnPropsWithDefaults(objectNode);
return JacksonMapper.toMap(objectNode);
return MAPPER.convertValue(objectNode, MAP_TYPE_REFERENCE);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Unable to generate jsonschema for '" + cls.getName() + "'", e);
}
@@ -176,7 +186,7 @@ public class JsonSchemaGenerator {
try {
sb.append("Default value is : `")
.append(JacksonMapper.ofYaml().writeValueAsString(collectedTypeAttributes.get("default")).trim())
.append(YAML_MAPPER.writeValueAsString(collectedTypeAttributes.get("default")).trim())
.append("`");
} catch (JsonProcessingException ignored) {
@@ -216,6 +226,7 @@ public class JsonSchemaGenerator {
}
protected void build(SchemaGeneratorConfigBuilder builder, boolean draft7) {
// builder.withObjectMapper(builder.getObjectMapper().configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false));
builder
.with(new JakartaValidationModule(
JakartaValidationOption.NOT_NULLABLE_METHOD_IS_REQUIRED,
@@ -645,7 +656,7 @@ public class JsonSchemaGenerator {
pullDocumentationAndDefaultFromAnyOf(objectNode);
removeRequiredOnPropsWithDefaults(objectNode);
return JacksonMapper.toMap(extractMainRef(objectNode));
return MAPPER.convertValue(extractMainRef(objectNode), MAP_TYPE_REFERENCE);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Unable to generate jsonschema for '" + cls.getName() + "'", e);
}

View File

@@ -0,0 +1,24 @@
package io.kestra.core.exceptions;
import java.io.Serial;
/**
* Exception class for all problems encountered when processing (parsing, injecting defaults, validating) a flow.
*/
public class FlowProcessingException extends KestraException {
@Serial
private static final long serialVersionUID = 1L;
public FlowProcessingException(String message) {
super(message);
}
public FlowProcessingException(String message, Throwable cause) {
super(message, cause);
}
public FlowProcessingException(Throwable cause) {
super(cause);
}
}

View File

@@ -0,0 +1,27 @@
package io.kestra.core.exceptions;
import java.io.Serial;
/**
* The top-level {@link KestraException}..
*/
public class KestraException extends Exception {
@Serial
private static final long serialVersionUID = 1L;
public KestraException() {
}
public KestraException(String message) {
super(message);
}
public KestraException(String message, Throwable cause) {
super(message, cause);
}
public KestraException(Throwable cause) {
super(cause);
}
}

View File

@@ -20,49 +20,88 @@ import org.apache.commons.lang3.ArrayUtils;
@Slf4j
public class MetricRegistry {
public static final String METRIC_WORKER_JOB_PENDING_COUNT = "worker.job.pending";
public static final String METRIC_WORKER_JOB_PENDING_COUNT_DESCRIPTION = "The number of jobs (tasks or triggers) pending to be run by the Worker";
public static final String METRIC_WORKER_JOB_RUNNING_COUNT = "worker.job.running";
public static final String METRIC_WORKER_JOB_RUNNING_COUNT_DESCRIPTION = "The number of jobs (tasks or triggers) currently running inside the Worker";
public static final String METRIC_WORKER_JOB_THREAD_COUNT = "worker.job.thread";
public static final String METRIC_WORKER_JOB_THREAD_COUNT_DESCRIPTION = "The number of worker threads";
public static final String METRIC_WORKER_RUNNING_COUNT = "worker.running.count";
public static final String METRIC_WORKER_RUNNING_COUNT_DESCRIPTION = "The number of tasks currently running inside the Worker";
public static final String METRIC_WORKER_QUEUED_DURATION = "worker.queued.duration";
public static final String METRIC_WORKER_QUEUED_DURATION_DESCRIPTION = "Task queued duration inside the Worker";
public static final String METRIC_WORKER_STARTED_COUNT = "worker.started.count";
public static final String METRIC_WORKER_STARTED_COUNT_DESCRIPTION = "The total number of tasks started by the Worker";
public static final String METRIC_WORKER_TIMEOUT_COUNT = "worker.timeout.count";
public static final String METRIC_WORKER_TIMEOUT_COUNT_DESCRIPTION = "The total number of tasks that timeout inside the Worker";
public static final String METRIC_WORKER_ENDED_COUNT = "worker.ended.count";
public static final String METRIC_WORKER_ENDED_COUNT_DESCRIPTION = "The total number of tasks ended by the Worker";
public static final String METRIC_WORKER_ENDED_DURATION = "worker.ended.duration";
public static final String METRIC_WORKER_ENDED_DURATION_DESCRIPTION = "Task run duration inside the Worker";
public static final String METRIC_WORKER_TRIGGER_DURATION = "worker.trigger.duration";
public static final String METRIC_WORKER_TRIGGER_DURATION_DESCRIPTION = "Trigger evaluation duration inside the Worker";
public static final String METRIC_WORKER_TRIGGER_RUNNING_COUNT = "worker.trigger.running.count";
public static final String METRIC_WORKER_TRIGGER_RUNNING_COUNT_DESCRIPTION = "The number of triggers currently evaluating inside the Worker";
public static final String METRIC_WORKER_TRIGGER_STARTED_COUNT = "worker.trigger.started.count";
public static final String METRIC_WORKER_TRIGGER_STARTED_COUNT_DESCRIPTION = "The total number of trigger evaluations started by the Worker";
public static final String METRIC_WORKER_TRIGGER_ENDED_COUNT = "worker.trigger.ended.count";
public static final String METRIC_WORKER_TRIGGER_ENDED_COUNT_DESCRIPTION = "The total number of trigger evaluations ended by the Worker";
public static final String METRIC_WORKER_TRIGGER_ERROR_COUNT = "worker.trigger.error.count";
public static final String METRIC_WORKER_TRIGGER_ERROR_COUNT_DESCRIPTION = "The total number of trigger evaluations that failed inside the Worker";
public static final String METRIC_WORKER_TRIGGER_EXECUTION_COUNT = "worker.trigger.execution.count";
public static final String METRIC_WORKER_TRIGGER_EXECUTION_COUNT_DESCRIPTION = "The total number of triggers evaluated by the Worker";
public static final String EXECUTOR_TASKRUN_NEXT_COUNT = "executor.taskrun.next.count";
public static final String EXECUTOR_TASKRUN_ENDED_COUNT = "executor.taskrun.ended.count";
public static final String EXECUTOR_TASKRUN_ENDED_DURATION = "executor.taskrun.ended.duration";
public static final String EXECUTOR_WORKERTASKRESULT_COUNT = "executor.workertaskresult.count";
public static final String EXECUTOR_EXECUTION_STARTED_COUNT = "executor.execution.started.count";
public static final String EXECUTOR_EXECUTION_END_COUNT = "executor.execution.end.count";
public static final String EXECUTOR_EXECUTION_DURATION = "executor.execution.duration";
public static final String METRIC_EXECUTOR_TASKRUN_CREATED_COUNT = "executor.taskrun.created.count";
public static final String METRIC_EXECUTOR_TASKRUN_CREATED_COUNT_DESCRIPTION = "The total number of tasks created by the Executor";
public static final String METRIC_EXECUTOR_TASKRUN_ENDED_COUNT = "executor.taskrun.ended.count";
public static final String METRIC_EXECUTOR_TASKRUN_ENDED_COUNT_DESCRIPTION = "The total number of tasks ended by the Executor";
public static final String METRIC_EXECUTOR_TASKRUN_ENDED_DURATION = "executor.taskrun.ended.duration";
public static final String METRIC_EXECUTOR_TASKRUN_ENDED_DURATION_DESCRIPTION = "Task duration inside the Executor";
public static final String METRIC_EXECUTOR_EXECUTION_STARTED_COUNT = "executor.execution.started.count";
public static final String METRIC_EXECUTOR_EXECUTION_STARTED_COUNT_DESCRIPTION = "The total number of executions started by the Executor";
public static final String METRIC_EXECUTOR_EXECUTION_END_COUNT = "executor.execution.end.count";
public static final String METRIC_EXECUTOR_EXECUTION_END_COUNT_DESCRIPTION = "The total number of executions ended by the Executor";
public static final String METRIC_EXECUTOR_EXECUTION_DURATION = "executor.execution.duration";
public static final String METRIC_EXECUTOR_EXECUTION_DURATION_DESCRIPTION = "Execution duration inside the Executor";
public static final String METRIC_EXECUTOR_EXECUTION_MESSAGE_PROCESS_DURATION = "executor.execution.message.process";
public static final String METRIC_EXECUTOR_EXECUTION_MESSAGE_PROCESS_DURATION_DESCRIPTION = "Duration of a single execution message processed by the Executor";
public static final String METRIC_INDEXER_REQUEST_COUNT = "indexer.request.count";
public static final String METRIC_INDEXER_REQUEST_COUNT_DESCRIPTION = "Total number of batches of records received by the Indexer";
public static final String METRIC_INDEXER_REQUEST_DURATION = "indexer.request.duration";
public static final String METRIC_INDEXER_REQUEST_DURATION_DESCRIPTION = "Batch of records duration inside the Indexer";
public static final String METRIC_INDEXER_REQUEST_RETRY_COUNT = "indexer.request.retry.count";
public static final String METRIC_INDEXER_REQUEST_RETRY_COUNT_DESCRIPTION = "Total number of batches of records retries by the Indexer";
public static final String METRIC_INDEXER_SERVER_DURATION = "indexer.server.duration";
public static final String METRIC_INDEXER_SERVER_DURATION_DESCRIPTION = "Batch of records indexation duration";
public static final String METRIC_INDEXER_MESSAGE_FAILED_COUNT = "indexer.message.failed.count";
public static final String METRIC_INDEXER_MESSAGE_FAILED_COUNT_DESCRIPTION = "Total number of records which failed to be indexed by the Indexer";
public static final String METRIC_INDEXER_MESSAGE_IN_COUNT = "indexer.message.in.count";
public static final String METRIC_INDEXER_MESSAGE_IN_COUNT_DESCRIPTION = "Total number of records received by the Indexer";
public static final String METRIC_INDEXER_MESSAGE_OUT_COUNT = "indexer.message.out.count";
public static final String METRIC_INDEXER_MESSAGE_OUT_COUNT_DESCRIPTION = "Total number of records indexed by the Indexer";
public static final String SCHEDULER_LOOP_COUNT = "scheduler.loop.count";
public static final String SCHEDULER_TRIGGER_COUNT = "scheduler.trigger.count";
public static final String SCHEDULER_TRIGGER_DELAY_DURATION = "scheduler.trigger.delay.duration";
public static final String SCHEDULER_EVALUATE_COUNT = "scheduler.evaluate.count";
public static final String SCHEDULER_EXECUTION_RUNNING_DURATION = "scheduler.execution.running.duration";
public static final String SCHEDULER_EXECUTION_MISSING_DURATION = "scheduler.execution.missing.duration";
public static final String METRIC_SCHEDULER_LOOP_COUNT = "scheduler.loop.count";
public static final String METRIC_SCHEDULER_LOOP_COUNT_DESCRIPTION = "Total number of evaluation loops executed by the Scheduler";
public static final String METRIC_SCHEDULER_TRIGGER_COUNT = "scheduler.trigger.count";
public static final String METRIC_SCHEDULER_TRIGGER_COUNT_DESCRIPTION = "Total number of executions triggered by the Scheduler";
public static final String METRIC_SCHEDULER_TRIGGER_DELAY_DURATION = "scheduler.trigger.delay.duration";
public static final String METRIC_SCHEDULER_TRIGGER_DELAY_DURATION_DESCRIPTION = "Trigger delay duration inside the Scheduler";
public static final String METRIC_SCHEDULER_EVALUATE_COUNT = "scheduler.evaluate.count";
public static final String METRIC_SCHEDULER_EVALUATE_COUNT_DESCRIPTION = "Total number of triggers evaluated by the Scheduler";
public static final String METRIC_SCHEDULER_EXECUTION_LOCK_DURATION = "scheduler.execution.lock.duration";
public static final String METRIC_SCHEDULER_EXECUTION_LOCK_DURATION_DESCRIPTION = "Trigger lock duration waiting for an execution to be terminated";
public static final String METRIC_SCHEDULER_EXECUTION_MISSING_DURATION = "scheduler.execution.missing.duration";
public static final String METRIC_SCHEDULER_EXECUTION_MISSING_DURATION_DESCRIPTION = "Missing execution duration inside the Scheduler. A missing execution is an execution that was triggered by the Scheduler but not yet started by the Executor";
public static final String METRIC_SCHEDULER_EVALUATION_LOOP_DURATION = "scheduler.evaluation.loop.duration";
public static final String METRIC_SCHEDULER_EVALUATION_LOOP_DURATION_DESCRIPTION = "Trigger evaluation loop duration inside the Scheduler";
public static final String STREAMS_STATE_COUNT = "stream.state.count";
public static final String METRIC_STREAMS_STATE_COUNT = "stream.state.count";
public static final String METRIC_STREAMS_STATE_COUNT_DESCRIPTION = "Number of Kafka Stream applications by state";
public static final String JDBC_QUERY_DURATION = "jdbc.query.duration";
public static final String METRIC_JDBC_QUERY_DURATION = "jdbc.query.duration";
public static final String METRIC_JDBC_QUERY_DURATION_DESCRIPTION = "Duration of database queries";
public static final String QUEUE_BIG_MESSAGE_COUNT = "queue.big_message.count";
public static final String METRIC_QUEUE_BIG_MESSAGE_COUNT = "queue.big_message.count";
public static final String METRIC_QUEUE_BIG_MESSAGE_COUNT_DESCRIPTION = "Total number of big messages";
public static final String TAG_TASK_TYPE = "task_type";
public static final String TAG_TRIGGER_TYPE = "trigger_type";
@@ -84,47 +123,64 @@ public class MetricRegistry {
* Tracks a monotonically increasing value.
*
* @param name The base metric name
* @param description The metric description
* @param tags MUST be an even number of arguments representing key/value pairs of tags.
* @return A new or existing counter.
*/
public Counter counter(String name, String... tags) {
return this.meterRegistry.counter(metricName(name), tags);
public Counter counter(String name, String description, String... tags) {
return Counter.builder(metricName(name))
.description(description)
.tags(tags)
.register(this.meterRegistry);
}
/**
* Register a gauge that reports the value of the {@link Number}.
*
* @param name Name of the gauge being registered.
* @param description The metric description
* @param number Thread-safe implementation of {@link Number} used to access the value.
* @param tags Sequence of dimensions for breaking down the name.
* @param <T> The type of the number from which the gauge value is extracted.
* @return The number that was passed in so the registration can be done as part of an assignment
* statement.
*/
public <T extends Number> T gauge(String name, T number, String... tags) {
return this.meterRegistry.gauge(metricName(name), Tags.of(tags), number);
public <T extends Number> T gauge(String name, String description, T number, String... tags) {
Gauge.builder(metricName(name), () -> number)
.description(description)
.tags(tags)
.register(this.meterRegistry);
return number;
}
/**
* Measures the time taken for short tasks and the count of these tasks.
*
* @param name The base metric name
* @param description The metric description
* @param tags MUST be an even number of arguments representing key/value pairs of tags.
* @return A new or existing timer.
*/
public Timer timer(String name, String... tags) {
return this.meterRegistry.timer(metricName(name), tags);
public Timer timer(String name, String description, String... tags) {
return Timer.builder(metricName(name))
.description(description)
.tags(tags)
.register(this.meterRegistry);
}
/**
* Measures the distribution of samples.
*
* @param name The base metric name
* @param description The metric description
* @param tags MUST be an even number of arguments representing key/value pairs of tags.
* @return A new or existing distribution summary.
*/
public DistributionSummary summary(String name, String... tags) {
return this.meterRegistry.summary(metricName(name), tags);
public DistributionSummary summary(String name, String description, String... tags) {
return DistributionSummary.builder(metricName(name))
.description(description)
.tags(tags)
.register(this.meterRegistry);
}
/**

View File

@@ -7,6 +7,7 @@ import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.executions.metrics.Timer;
import io.micronaut.core.annotation.Introspected;
import jakarta.annotation.Nullable;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -36,12 +37,15 @@ abstract public class AbstractMetricEntry<T> {
@NotNull
protected String name;
protected String description;
protected Map<String, String> tags;
protected Instant timestamp = Instant.now();
protected AbstractMetricEntry(@NotNull String name, String[] tags) {
protected AbstractMetricEntry(@NotNull String name, @Nullable String description, String[] tags) {
this.name = name;
this.description = description;
this.tags = tagsAsMap(tags);
}
@@ -79,7 +83,7 @@ abstract public class AbstractMetricEntry<T> {
abstract public T getValue();
abstract public void register(MetricRegistry meterRegistry, String prefix, Map<String, String> tags);
abstract public void register(MetricRegistry meterRegistry, String name, @Nullable String description, Map<String, String> tags);
abstract public void increment(T value);
}

View File

@@ -66,7 +66,7 @@ public class MetricEntry implements DeletedInterface, TenantInterface {
.taskId(taskRun.getTaskId())
.taskRunId(taskRun.getId())
.type(metricEntry.getType())
.name(metricEntry.name)
.name(metricEntry.getName())
.tags(metricEntry.getTags())
.value(computeValue(metricEntry))
.timestamp(metricEntry.getTimestamp())

View File

@@ -1,6 +1,7 @@
package io.kestra.core.models.executions.metrics;
import com.fasterxml.jackson.annotation.JsonInclude;
import jakarta.annotation.Nullable;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -17,6 +18,7 @@ import java.util.Map;
@NoArgsConstructor
public final class Counter extends AbstractMetricEntry<Double> {
public static final String TYPE = "counter";
@NotNull
@JsonInclude
private final String type = TYPE;
@@ -25,32 +27,48 @@ public final class Counter extends AbstractMetricEntry<Double> {
@EqualsAndHashCode.Exclude
private Double value;
private Counter(@NotNull String name, @NotNull Double value, String... tags) {
super(name, tags);
private Counter(@NotNull String name, @Nullable String description, @NotNull Double value, String... tags) {
super(name, description, tags);
this.value = value;
}
public static Counter of(@NotNull String name, @NotNull Double value, String... tags) {
return new Counter(name, value, tags);
return new Counter(name, null, value, tags);
}
public static Counter of(@NotNull String name, @Nullable String description, @NotNull Double value, String... tags) {
return new Counter(name, description, value, tags);
}
public static Counter of(@NotNull String name, @NotNull Integer value, String... tags) {
return new Counter(name, (double) value, tags);
return new Counter(name, null, (double) value, tags);
}
public static Counter of(@NotNull String name, @Nullable String description, @NotNull Integer value, String... tags) {
return new Counter(name, description, (double) value, tags);
}
public static Counter of(@NotNull String name, @NotNull Long value, String... tags) {
return new Counter(name, (double) value, tags);
return new Counter(name, null, (double) value, tags);
}
public static Counter of(@NotNull String name, @Nullable String description, @NotNull Long value, String... tags) {
return new Counter(name, description, (double) value, tags);
}
public static Counter of(@NotNull String name, @NotNull Float value, String... tags) {
return new Counter(name, (double) value, tags);
return new Counter(name, null, (double) value, tags);
}
public static Counter of(@NotNull String name, @Nullable String description, @NotNull Float value, String... tags) {
return new Counter(name, description, (double) value, tags);
}
@Override
public void register(MetricRegistry meterRegistry, String prefix, Map<String, String> tags) {
public void register(MetricRegistry meterRegistry, String name, String description, Map<String, String> tags) {
meterRegistry
.counter(this.metricName(prefix), this.tagsAsArray(tags))
.counter(this.metricName(name), description, this.tagsAsArray(tags))
.increment(this.value);
}

View File

@@ -1,6 +1,7 @@
package io.kestra.core.models.executions.metrics;
import com.fasterxml.jackson.annotation.JsonInclude;
import jakarta.annotation.Nullable;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -27,20 +28,24 @@ public class Timer extends AbstractMetricEntry<Duration> {
@EqualsAndHashCode.Exclude
private Duration value;
private Timer(@NotNull String name, @NotNull Duration value, String... tags) {
super(name, tags);
private Timer(@NotNull String name, @Nullable String description, @NotNull Duration value, String... tags) {
super(name, description, tags);
this.value = value;
}
public static Timer of(@NotNull String name, @NotNull Duration value, String... tags) {
return new Timer(name, value, tags);
return new Timer(name, null, value, tags);
}
public static Timer of(@NotNull String name, @Nullable String description, @NotNull Duration value, String... tags) {
return new Timer(name, description, value, tags);
}
@Override
public void register(MetricRegistry meterRegistry, String prefix, Map<String, String> tags) {
public void register(MetricRegistry meterRegistry, String name, String description, Map<String, String> tags) {
meterRegistry
.timer(this.metricName(prefix), this.tagsAsArray(tags))
.timer(this.metricName(name), description, this.tagsAsArray(tags))
.record(this.value);
}

View File

@@ -39,30 +39,6 @@ import java.util.Optional;
@NoArgsConstructor
@JsonDeserialize
public class GenericFlow extends AbstractFlow implements HasUID {
private String id;
private String namespace;
private Integer revision;
private List<Input<?>> inputs;
private Map<String, Object> variables;
@Builder.Default
private boolean disabled = false;
@Builder.Default
private boolean deleted = false;
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
@JsonDeserialize(using = ListOrMapOfLabelDeserializer.class)
@Schema(implementation = Object.class, oneOf = {List.class, Map.class})
private List<Label> labels;
private String tenantId;
private String source;
private List<SLA> sla;
@@ -84,7 +60,6 @@ public class GenericFlow extends AbstractFlow implements HasUID {
* @return a new {@link GenericFlow}
* @throws DeserializationException if source cannot be deserialized.
*/
@VisibleForTesting
public static GenericFlow of(final FlowInterface flow) throws DeserializationException {
return fromYaml(flow.getTenantId(), flow.sourceOrGenerateIfNull());
}

View File

@@ -11,7 +11,6 @@ import jakarta.validation.Valid;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import jakarta.validation.constraints.Size;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -26,6 +25,7 @@ import lombok.experimental.SuperBuilder;
@JsonSubTypes({
@JsonSubTypes.Type(value = ArrayInput.class, name = "ARRAY"),
@JsonSubTypes.Type(value = BooleanInput.class, name = "BOOLEAN"),
@JsonSubTypes.Type(value = BoolInput.class, name = "BOOL"),
@JsonSubTypes.Type(value = DateInput.class, name = "DATE"),
@JsonSubTypes.Type(value = DateTimeInput.class, name = "DATETIME"),
@JsonSubTypes.Type(value = DurationInput.class, name = "DURATION"),

View File

@@ -69,7 +69,7 @@ public class State {
public State withState(Type state) {
if (this.current == state) {
log.warn("Can't change state, already " + current);
log.warn("Can't change state, already {}", current);
return this;
}

View File

@@ -14,6 +14,7 @@ public enum Type {
INT(IntInput.class.getName()),
FLOAT(FloatInput.class.getName()),
BOOLEAN(BooleanInput.class.getName()),
BOOL(BoolInput.class.getName()),
DATETIME(DateTimeInput.class.getName()),
DATE(DateInput.class.getName()),
TIME(TimeInput.class.getName()),

View File

@@ -0,0 +1,17 @@
package io.kestra.core.models.flows.input;
import io.kestra.core.models.flows.Input;
import jakarta.validation.ConstraintViolationException;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@SuperBuilder
@Getter
@NoArgsConstructor
public class BoolInput extends Input<Boolean> {
@Override
public void validate(Boolean input) throws ConstraintViolationException {
// no validation yet
}
}

View File

@@ -10,6 +10,7 @@ import jakarta.validation.ConstraintViolationException;
@SuperBuilder
@Getter
@NoArgsConstructor
@Deprecated
public class BooleanInput extends Input<Boolean> {
@Override
public void validate(Boolean input) throws ConstraintViolationException {

View File

@@ -1,5 +1,6 @@
package io.kestra.core.models.namespaces;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;
import lombok.*;
@@ -11,6 +12,7 @@ import lombok.experimental.SuperBuilder;
@NoArgsConstructor
@ToString
@EqualsAndHashCode
@Schema(name = "NamespaceLight")
public class Namespace implements NamespaceInterface {
@NotNull
@Pattern(regexp="^[a-z0-9][a-z0-9._-]*")

View File

@@ -35,6 +35,8 @@ public abstract class AbstractMetric {
@NotNull
protected Property<String> name;
protected Property<String> description;
protected Property<Map<String, String>> tags;
@NotNull

View File

@@ -10,7 +10,6 @@ import lombok.*;
import lombok.experimental.SuperBuilder;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
@ToString
@@ -28,18 +27,15 @@ public class CounterMetric extends AbstractMetric {
@Override
public AbstractMetricEntry<?> toMetric(RunContext runContext) throws IllegalVariableEvaluationException {
Optional<String> name = runContext.render(this.name).as(String.class);
Optional<Double> value = runContext.render(this.value).as(Double.class);
String name = runContext.render(this.name).as(String.class).orElseThrow();
Double value = runContext.render(this.value).as(Double.class).orElseThrow();
String description = runContext.render(this.description).as(String.class).orElse(null);
Map<String, String> tags = runContext.render(this.tags).asMap(String.class, String.class);
String[] tagsAsStrings = tags.entrySet().stream()
.flatMap(e -> Stream.of(e.getKey(), e.getValue()))
.toArray(String[]::new);
if (name.isEmpty() || value.isEmpty()) {
throw new IllegalVariableEvaluationException("Metric name and value can't be null");
}
return Counter.of(name.get(), value.get(), tagsAsStrings);
return Counter.of(name, description, value, tagsAsStrings);
}
public String getType() {

View File

@@ -11,7 +11,6 @@ import lombok.experimental.SuperBuilder;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
@ToString
@@ -29,18 +28,15 @@ public class TimerMetric extends AbstractMetric {
@Override
public AbstractMetricEntry<?> toMetric(RunContext runContext) throws IllegalVariableEvaluationException {
Optional<String> name = runContext.render(this.name).as(String.class);
Optional<Duration> value = runContext.render(this.value).as(Duration.class);
String name = runContext.render(this.name).as(String.class).orElseThrow();
Duration value = runContext.render(this.value).as(Duration.class).orElseThrow();
String description = runContext.render(this.description).as(String.class).orElse(null);
Map<String, String> tags = runContext.render(this.tags).asMap(String.class, String.class);
String[] tagsAsStrings = tags.entrySet().stream()
.flatMap(e -> Stream.of(e.getKey(), e.getValue()))
.toArray(String[]::new);
if (name.isEmpty() || value.isEmpty()) {
throw new IllegalVariableEvaluationException("Metric name and value can't be null");
}
return Timer.of(name.get(), value.get(), tagsAsStrings);
return Timer.of(name, description, value, tagsAsStrings);
}
public String getType() {

View File

@@ -2,38 +2,41 @@ package io.kestra.core.models.tasks.runners;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.executions.AbstractMetricEntry;
import io.kestra.core.models.tasks.runners.TaskLogLineMatcher.TaskLogMatch;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.services.FlowService;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.event.Level;
import org.slf4j.spi.LoggingEventBuilder;
import java.io.*;
import java.io.BufferedOutputStream;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static io.kestra.core.runners.RunContextLogger.ORIGINAL_TIMESTAMP_KEY;
import static io.kestra.core.utils.Rethrow.throwConsumer;
abstract public class PluginUtilsService {
private static final ObjectMapper MAPPER = JacksonMapper.ofJson(false);
private static final Pattern PATTERN = Pattern.compile("^::(\\{.*})::$");
private static final TypeReference<Map<String, String>> MAP_TYPE_REFERENCE = new TypeReference<>() {};
public static Map<String, String> createOutputFiles(
@@ -52,12 +55,12 @@ abstract public class PluginUtilsService {
) throws IOException {
List<String> outputs = new ArrayList<>();
if (outputFiles != null && outputFiles.size() > 0) {
if (outputFiles != null && !outputFiles.isEmpty()) {
outputs.addAll(outputFiles);
}
Map<String, String> result = new HashMap<>();
if (outputs.size() > 0) {
if (!outputs.isEmpty()) {
outputs
.forEach(throwConsumer(s -> {
PluginUtilsService.validFilename(s);
@@ -168,64 +171,27 @@ abstract public class PluginUtilsService {
}
public static Map<String, Object> parseOut(String line, Logger logger, RunContext runContext, boolean isStdErr, Instant customInstant) {
Matcher m = PATTERN.matcher(line);
TaskLogLineMatcher logLineMatcher = ((DefaultRunContext) runContext).getApplicationContext().getBean(TaskLogLineMatcher.class);
Map<String, Object> outputs = new HashMap<>();
if (m.find()) {
try {
BashCommand<?> bashCommand = MAPPER.readValue(m.group(1), BashCommand.class);
if (bashCommand.getOutputs() != null) {
outputs.putAll(bashCommand.getOutputs());
}
if (bashCommand.getMetrics() != null) {
bashCommand.getMetrics().forEach(runContext::metric);
}
if (bashCommand.getLogs() != null) {
bashCommand.getLogs().forEach(logLine -> {
try {
LoggingEventBuilder builder = runContext
.logger()
.atLevel(logLine.getLevel())
.addKeyValue(ORIGINAL_TIMESTAMP_KEY, customInstant);
builder.log(logLine.getMessage());
} catch (Exception e) {
logger.warn("Invalid log '{}'", m.group(1), e);
}
});
}
}
catch (JsonProcessingException e) {
logger.warn("Invalid outputs '{}'", e.getMessage(), e);
}
} else {
if (isStdErr) {
try {
Optional<TaskLogMatch> matches = logLineMatcher.matches(line, logger, runContext, customInstant);
if (matches.isPresent()) {
TaskLogMatch taskLogMatch = matches.get();
outputs.putAll(taskLogMatch.outputs());
} else if (isStdErr) {
runContext.logger().error(line);
} else {
runContext.logger().info(line);
}
} catch (IOException e) {
logger.warn("Invalid outputs '{}'", e.getMessage(), e);
}
return outputs;
}
@NoArgsConstructor
@Data
public static class BashCommand <T> {
private Map<String, Object> outputs;
private List<AbstractMetricEntry<T>> metrics;
private List<LogLine> logs;
}
@NoArgsConstructor
@Data
public static class LogLine {
private Level level;
private String message;
}
/**
* This helper method will allow gathering the execution information from a task parameters:
* - If executionId is null, it is fetched from the runContext variables (a.k.a. current execution).

View File

@@ -0,0 +1,111 @@
package io.kestra.core.models.tasks.runners;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.models.executions.AbstractMetricEntry;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.event.Level;
import org.slf4j.spi.LoggingEventBuilder;
import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static io.kestra.core.runners.RunContextLogger.ORIGINAL_TIMESTAMP_KEY;
/**
* Service for matching and capturing structured data from task execution logs.
* <p>
* Example log format that may be matched:
* <pre>{@code
* ::{"outputs":{"key":"value"}}::
* }</pre>
*/
@Singleton
public class TaskLogLineMatcher {
protected static final Pattern LOG_DATA_SYNTAX = Pattern.compile("^::(\\{.*})::$");
protected static final ObjectMapper MAPPER = JacksonMapper.ofJson(false);
/**
* Attempts to match and extract structured data from a given log line.
* <p>
* If the line contains recognized patterns (e.g., JSON-encoded output markers),
* a {@link TaskLogMatch} is returned encapsulating the extracted data.
* </p>
*
* @param logLine the raw log line.
* @param logger the logger
* @param runContext the {@link RunContext}
* @return an {@link Optional} containing the {@link TaskLogMatch} if a match was found,
* otherwise {@link Optional#empty()}
*/
public Optional<TaskLogMatch> matches(String logLine, Logger logger, RunContext runContext, Instant instant) throws IOException {
Optional<String> matches = matches(logLine);
if (matches.isEmpty()) {
return Optional.empty();
}
TaskLogMatch match = MAPPER.readValue(matches.get(), TaskLogLineMatcher.TaskLogMatch.class);
return Optional.of(handle(logger, runContext, instant, match, matches.get()));
}
protected TaskLogMatch handle(Logger logger, RunContext runContext, Instant instant, TaskLogMatch match, String data) {
if (match.metrics() != null) {
match.metrics().forEach(runContext::metric);
}
if (match.logs() != null) {
match.logs().forEach(it -> {
try {
LoggingEventBuilder builder = runContext
.logger()
.atLevel(it.level())
.addKeyValue(ORIGINAL_TIMESTAMP_KEY, instant);
builder.log(it.message());
} catch (Exception e) {
logger.warn("Invalid log '{}'",data, e);
}
});
}
return match;
}
protected Optional<String> matches(String logLine) {
Matcher m = LOG_DATA_SYNTAX.matcher(logLine);
return m.find() ? Optional.ofNullable(m.group(1)) : Optional.empty();
}
/**
* Represents the result of log line match.
*
* @param outputs a map of extracted output key-value pairs
* @param metrics a list of captured metric entries, typically used for reporting or monitoring
* @param logs additional log lines derived from the matched line, if any
*/
public record TaskLogMatch(
Map<String, Object> outputs,
List<AbstractMetricEntry<?>> metrics,
List<LogLine> logs
) {
@Override
public Map<String, Object> outputs() {
return Optional.ofNullable(outputs).orElse(Map.of());
}
}
public record LogLine(
Level level,
String message
) {
}
}

View File

@@ -34,6 +34,7 @@ public class PluginClassLoader extends URLClassLoader {
+ "|io.kestra.plugin.core"
+ "|org.slf4j"
+ "|ch.qos.logback"
+ "|io.swagger"
+ "|com.fasterxml.jackson.core"
+ "|com.fasterxml.jackson.annotation"
+ "|com.fasterxml.jackson.module"

View File

@@ -1,5 +1,6 @@
package io.kestra.core.repositories;
import io.kestra.core.exceptions.FlowProcessingException;
import io.kestra.core.models.flows.FlowId;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithSource;
@@ -95,7 +96,7 @@ public class LocalFlowRepositoryLoader {
flowRepository.update(parsed, existing);
log.trace("Updated flow {}.{}", parsed.getNamespace(), parsed.getId());
}
} catch (ConstraintViolationException e) {
} catch (FlowProcessingException | ConstraintViolationException e) {
log.warn("Unable to create flow {}", file, e);
}
}));

View File

@@ -413,7 +413,8 @@ public class DefaultRunContext extends RunContext {
}
try {
metricEntry.register(this.meterRegistry, this.metricPrefix(), this.metricsTags());
// FIXME there seems to be a bug as the metric name is never used
metricEntry.register(this.meterRegistry, this.metricPrefix(), metricEntry.getDescription(), this.metricsTags());
} catch (IllegalArgumentException e) {
// https://github.com/micrometer-metrics/micrometer/issues/877
// https://github.com/micrometer-metrics/micrometer/issues/2399

View File

@@ -7,7 +7,6 @@ import io.kestra.core.models.Label;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.sla.Violation;
import io.kestra.core.models.tasks.*;
@@ -145,6 +144,7 @@ public class ExecutorService {
return executor;
}
long nanos = System.nanoTime();
try {
executor = this.handleRestart(executor);
executor = this.handleEnd(executor);
@@ -175,6 +175,10 @@ public class ExecutorService {
executor = this.handleExecutableTask(executor);
} catch (Exception e) {
return executor.withException(e, "process");
} finally {
metricRegistry
.timer(MetricRegistry.METRIC_EXECUTOR_EXECUTION_MESSAGE_PROCESS_DURATION, MetricRegistry.METRIC_EXECUTOR_EXECUTION_MESSAGE_PROCESS_DURATION_DESCRIPTION, metricRegistry.tags(executor.getExecution()))
.record(Duration.ofNanos(System.nanoTime() - nanos));
}
return executor;
@@ -206,7 +210,7 @@ public class ExecutorService {
if (execution.getState().getCurrent() == State.Type.CREATED) {
metricRegistry
.counter(MetricRegistry.EXECUTOR_EXECUTION_STARTED_COUNT, metricRegistry.tags(execution))
.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_STARTED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_STARTED_COUNT_DESCRIPTION, metricRegistry.tags(execution))
.increment();
logService.logExecution(
@@ -218,10 +222,6 @@ public class ExecutorService {
newExecution = newExecution.withState(State.Type.RUNNING);
}
metricRegistry
.counter(MetricRegistry.EXECUTOR_TASKRUN_NEXT_COUNT, metricRegistry.tags(execution))
.increment(nexts.size());
return newExecution;
}
@@ -300,18 +300,7 @@ public class ExecutorService {
TaskRun taskRun
) {
return findState
.map(throwFunction(type -> new WorkerTaskResult(taskRun.withState(type))))
.stream()
.peek(workerTaskResult -> {
metricRegistry
.counter(
MetricRegistry.EXECUTOR_WORKERTASKRESULT_COUNT,
metricRegistry.tags(workerTaskResult)
)
.increment();
})
.findFirst();
.map(throwFunction(type -> new WorkerTaskResult(taskRun.withState(type))));
}
private List<TaskRun> childNextsTaskRun(Executor executor, TaskRun parentTaskRun) throws InternalException {
@@ -414,11 +403,11 @@ public class ExecutorService {
}
metricRegistry
.counter(MetricRegistry.EXECUTOR_EXECUTION_END_COUNT, metricRegistry.tags(newExecution))
.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_END_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_END_COUNT_DESCRIPTION, metricRegistry.tags(newExecution))
.increment();
metricRegistry
.timer(MetricRegistry.EXECUTOR_EXECUTION_DURATION, metricRegistry.tags(newExecution))
.timer(MetricRegistry.METRIC_EXECUTOR_EXECUTION_DURATION, MetricRegistry.METRIC_EXECUTOR_EXECUTION_DURATION_DESCRIPTION, metricRegistry.tags(newExecution))
.record(newExecution.getState().getDuration());
return executor.withExecution(newExecution, "onEnd");
@@ -626,16 +615,19 @@ public class ExecutorService {
Task task = executor.getFlow().findTaskByTaskId(workerTaskResult.getTaskRun().getTaskId());
if (task instanceof Pause pauseTask) {
if (pauseTask.getDelay() != null || pauseTask.getTimeout() != null) {
if (pauseTask.getPauseDuration() != null || pauseTask.getTimeout() != null) {
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
Duration delay = runContext.render(pauseTask.getDelay()).as(Duration.class).orElse(null);
Duration duration = runContext.render(pauseTask.getPauseDuration()).as(Duration.class).orElse(null);
Duration timeout = runContext.render(pauseTask.getTimeout()).as(Duration.class).orElse(null);
if (delay != null || timeout != null) { // rendering can lead to null, so we must re-check here
Pause.Behavior behavior = runContext.render(pauseTask.getBehavior()).as(Pause.Behavior.class).orElse(Pause.Behavior.RESUME);
if (duration != null || timeout != null) { // rendering can lead to null, so we must re-check here
// if duration is set, we use it, and we use the Pause behavior as a state
// if no duration, we use the standard timeout property and use FAILED as the target state
return ExecutionDelay.builder()
.taskRunId(workerTaskResult.getTaskRun().getId())
.executionId(executor.getExecution().getId())
.date(workerTaskResult.getTaskRun().getState().maxDate().plus(delay != null ? delay : timeout))
.state(delay != null ? State.Type.RUNNING : State.Type.FAILED)
.date(workerTaskResult.getTaskRun().getState().maxDate().plus(duration != null ? duration : timeout))
.state(duration != null ? behavior.mapToState() : State.Type.FAILED)
.delayType(ExecutionDelay.DelayType.RESUME_FLOW)
.build();
}
@@ -736,7 +728,7 @@ public class ExecutorService {
}
metricRegistry
.counter(MetricRegistry.EXECUTOR_EXECUTION_STARTED_COUNT, metricRegistry.tags(executor.getExecution()))
.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_STARTED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_STARTED_COUNT_DESCRIPTION, metricRegistry.tags(executor.getExecution()))
.increment();
logService.logExecution(
@@ -848,6 +840,8 @@ public class ExecutorService {
List<WorkerTask> processingTasks = workerTasks.get(false);
if (processingTasks != null && !processingTasks.isEmpty()) {
executorToReturn = executorToReturn.withWorkerTasks(processingTasks, "handleWorkerTask");
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_TASKRUN_CREATED_COUNT, MetricRegistry.METRIC_EXECUTOR_TASKRUN_CREATED_COUNT_DESCRIPTION, metricRegistry.tags(executor.getExecution())).increment(processingTasks.size());
}
return executorToReturn;

View File

@@ -417,6 +417,7 @@ public class FlowInputOutput {
// Assuming that after the render we must have a double/int, so we can safely use its toString representation
case FLOAT -> current instanceof Float ? current : Float.valueOf(current.toString());
case BOOLEAN -> current instanceof Boolean ? current : Boolean.valueOf(current.toString());
case BOOL -> current instanceof Boolean ? current : Boolean.valueOf(current.toString());
case DATETIME -> current instanceof Instant ? current : Instant.parse(current.toString());
case DATE -> current instanceof LocalDate ? current : LocalDate.parse(current.toString());
case TIME -> current instanceof LocalTime ? current : LocalTime.parse(current.toString());

View File

@@ -1,5 +1,6 @@
package io.kestra.core.runners;
import io.kestra.core.exceptions.FlowProcessingException;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
@@ -53,14 +54,20 @@ public class FlowListeners implements FlowListenersInterface {
FlowWithSource flow;
if (either.isRight()) {
flow = FlowWithException.from(either.getRight().getRecord(), either.getRight(), log).orElse(null);
if (flow == null) {
return;
}
} else {
flow = pluginDefaultService.injectVersionDefaults(either.getLeft(), true);
try {
flow = pluginDefaultService.injectVersionDefaults(either.getLeft(), true);
} catch (FlowProcessingException ignore) {
// should not occur, safe = true...
flow = null;
}
}
Optional<FlowWithSource> previous = this.previous(flow);
if (flow == null) {
return;
}
final FlowWithSource previous = this.previous(flow).orElse(null);
if (flow.isDeleted()) {
this.remove(flow);
@@ -77,7 +84,7 @@ public class FlowListeners implements FlowListenersInterface {
);
}
this.notifyConsumersEach(flow, previous.orElse(null));
this.notifyConsumersEach(flow, previous);
this.notifyConsumers();
});
@@ -109,7 +116,6 @@ public class FlowListeners implements FlowListenersInterface {
private void upsert(FlowWithSource flow) {
synchronized (this) {
this.remove(flow);
this.flows.add(flow);
}
}

View File

@@ -30,7 +30,7 @@ public class RunContextCache {
@PostConstruct
void init() {
String envPrefix = applicationContext.getProperty("kestra.variables.env-vars-prefix", String.class, "KESTRA_");
String envPrefix = applicationContext.getProperty("kestra.variables.env-vars-prefix", String.class, "ENV_");
envVars = this.envVariables(envPrefix);
globalVars = applicationContext

View File

@@ -234,8 +234,7 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
private String replaceSecret(String data) {
for (String s : runContextLogger.useSecrets) {
if (data.contains(s)) {
data = data.replace(s, "*".repeat(s.length()));
data = data.replaceFirst("[*]{9}", "**masked*");
data = data.replace(s, "******");
}
}

View File

@@ -184,9 +184,9 @@ public class Worker implements Service, Runnable, AutoCloseable {
if (this.init.compareAndSet(false, true)) {
String[] tags = this.workerGroup == null ? new String[0] : new String[]{MetricRegistry.TAG_WORKER_GROUP, this.workerGroup};
// create metrics to store thread count, pending jobs and running jobs, so we can have autoscaling easily
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_THREAD_COUNT, numThreads, tags);
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_PENDING_COUNT, pendingJobCount, tags);
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_RUNNING_COUNT, runningJobCount, tags);
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_THREAD_COUNT, MetricRegistry.METRIC_WORKER_JOB_THREAD_COUNT_DESCRIPTION, numThreads, tags);
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_PENDING_COUNT, MetricRegistry.METRIC_WORKER_JOB_PENDING_COUNT_DESCRIPTION, pendingJobCount, tags);
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_RUNNING_COUNT, MetricRegistry.METRIC_WORKER_JOB_RUNNING_COUNT_DESCRIPTION, runningJobCount, tags);
this.tracer = tracerFactory.getTracer(Worker.class, "WORKER");
}
@@ -420,7 +420,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
private void publishTriggerExecution(WorkerTrigger workerTrigger, Optional<Execution> evaluate) {
metricRegistry
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_EXECUTION_COUNT, metricRegistry.tags(workerTrigger, workerGroup))
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_EXECUTION_COUNT, MetricRegistry.METRIC_WORKER_TRIGGER_EXECUTION_COUNT_DESCRIPTION, metricRegistry.tags(workerTrigger, workerGroup))
.increment();
if (log.isDebugEnabled()) {
@@ -458,7 +458,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
private void handleTriggerError(WorkerTrigger workerTrigger, Throwable e) {
metricRegistry
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_ERROR_COUNT, metricRegistry.tags(workerTrigger, workerGroup))
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_ERROR_COUNT, MetricRegistry.METRIC_WORKER_TRIGGER_ERROR_COUNT_DESCRIPTION, metricRegistry.tags(workerTrigger, workerGroup))
.increment();
logError(workerTrigger, e);
@@ -478,7 +478,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
private void handleRealtimeTriggerError(WorkerTrigger workerTrigger, Throwable e) {
metricRegistry
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_ERROR_COUNT, metricRegistry.tags(workerTrigger, workerGroup))
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_ERROR_COUNT, MetricRegistry.METRIC_WORKER_TRIGGER_ERROR_COUNT_DESCRIPTION, metricRegistry.tags(workerTrigger, workerGroup))
.increment();
// We create a FAILED execution, so the user is aware that the realtime trigger failed to be created
@@ -518,7 +518,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
private void handleTrigger(WorkerTrigger workerTrigger) {
metricRegistry
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_STARTED_COUNT, metricRegistry.tags(workerTrigger, workerGroup))
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_STARTED_COUNT, MetricRegistry.METRIC_WORKER_TRIGGER_STARTED_COUNT_DESCRIPTION, metricRegistry.tags(workerTrigger, workerGroup))
.increment();
// update the trigger so that it contains the workerId
@@ -531,13 +531,13 @@ public class Worker implements Service, Runnable, AutoCloseable {
}
this.metricRegistry
.timer(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION, metricRegistry.tags(workerTrigger, workerGroup))
.timer(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION, MetricRegistry.METRIC_WORKER_TRIGGER_DURATION_DESCRIPTION, metricRegistry.tags(workerTrigger, workerGroup))
.record(() -> {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
this.evaluateTriggerRunningCount.computeIfAbsent(workerTrigger.getTriggerContext().uid(), s -> metricRegistry
.gauge(MetricRegistry.METRIC_WORKER_TRIGGER_RUNNING_COUNT, new AtomicInteger(0), metricRegistry.tags(workerTrigger, workerGroup)));
.gauge(MetricRegistry.METRIC_WORKER_TRIGGER_RUNNING_COUNT, MetricRegistry.METRIC_WORKER_TRIGGER_RUNNING_COUNT_DESCRIPTION, new AtomicInteger(0), metricRegistry.tags(workerTrigger, workerGroup)));
this.evaluateTriggerRunningCount.get(workerTrigger.getTriggerContext().uid()).addAndGet(1);
DefaultRunContext runContext = (DefaultRunContext) workerTrigger.getConditionContext().getRunContext();
@@ -598,18 +598,18 @@ public class Worker implements Service, Runnable, AutoCloseable {
);
metricRegistry
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_ENDED_COUNT, metricRegistry.tags(workerTrigger, workerGroup))
.counter(MetricRegistry.METRIC_WORKER_TRIGGER_ENDED_COUNT, MetricRegistry.METRIC_WORKER_TRIGGER_ENDED_COUNT_DESCRIPTION, metricRegistry.tags(workerTrigger, workerGroup))
.increment();
}
private WorkerTaskResult run(WorkerTask workerTask, Boolean cleanUp) {
metricRegistry
.counter(MetricRegistry.METRIC_WORKER_STARTED_COUNT, metricRegistry.tags(workerTask, workerGroup))
.counter(MetricRegistry.METRIC_WORKER_STARTED_COUNT, MetricRegistry.METRIC_WORKER_STARTED_COUNT_DESCRIPTION, metricRegistry.tags(workerTask, workerGroup))
.increment();
if (workerTask.getTaskRun().getState().getCurrent() == CREATED) {
metricRegistry
.timer(MetricRegistry.METRIC_WORKER_QUEUED_DURATION, metricRegistry.tags(workerTask, workerGroup))
.timer(MetricRegistry.METRIC_WORKER_QUEUED_DURATION, MetricRegistry.METRIC_WORKER_QUEUED_DURATION_DESCRIPTION, metricRegistry.tags(workerTask, workerGroup))
.record(Duration.between(
workerTask.getTaskRun().getState().getStartDate(), Instant.now()
));
@@ -639,11 +639,6 @@ public class Worker implements Service, Runnable, AutoCloseable {
);
workerTask = workerTask.withTaskRun(workerTask.getTaskRun().withState(RUNNING));
try {
this.workerTaskResultQueue.emit(new WorkerTaskResult(workerTask.getTaskRun()));
} catch (QueueException e) {
log.error("Unable to emit the worker task result for task {} taskrun {}", workerTask.getTask().getId(), workerTask.getTaskRun().getId(), e);
}
try {
// run
@@ -720,11 +715,11 @@ public class Worker implements Service, Runnable, AutoCloseable {
private void logTerminated(WorkerTask workerTask) {
metricRegistry
.counter(MetricRegistry.METRIC_WORKER_ENDED_COUNT, metricRegistry.tags(workerTask, workerGroup))
.counter(MetricRegistry.METRIC_WORKER_ENDED_COUNT, MetricRegistry.METRIC_WORKER_ENDED_COUNT_DESCRIPTION, metricRegistry.tags(workerTask, workerGroup))
.increment();
metricRegistry
.timer(MetricRegistry.METRIC_WORKER_ENDED_DURATION, metricRegistry.tags(workerTask, workerGroup))
.timer(MetricRegistry.METRIC_WORKER_ENDED_DURATION, MetricRegistry.METRIC_WORKER_ENDED_DURATION_DESCRIPTION, metricRegistry.tags(workerTask, workerGroup))
.record(workerTask.getTaskRun().getState().getDuration());
logService.logTaskRun(
@@ -784,20 +779,18 @@ public class Worker implements Service, Runnable, AutoCloseable {
TaskRunAttempt.TaskRunAttemptBuilder builder = TaskRunAttempt.builder()
.state(new io.kestra.core.models.flows.State().withState(RUNNING));
AtomicInteger metricRunningCount = getMetricRunningCount(workerTask);
metricRunningCount.incrementAndGet();
WorkerTaskCallable workerTaskCallable = new WorkerTaskCallable(workerTask, task, runContext, metricRegistry);
// emit attempts
// emit the attempt so the execution knows that the task is in RUNNING
this.workerTaskResultQueue.emit(new WorkerTaskResult(
workerTask.getTaskRun()
.withAttempts(this.addAttempt(workerTask, builder.build()))
)
);
AtomicInteger metricRunningCount = getMetricRunningCount(workerTask);
metricRunningCount.incrementAndGet();
// run it
WorkerTaskCallable workerTaskCallable = new WorkerTaskCallable(workerTask, task, runContext, metricRegistry);
io.kestra.core.models.flows.State.Type state = callJob(workerTaskCallable);
metricRunningCount.decrementAndGet();
@@ -873,6 +866,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
return this.metricRunningCount
.computeIfAbsent(index, l -> metricRegistry.gauge(
MetricRegistry.METRIC_WORKER_RUNNING_COUNT,
MetricRegistry.METRIC_WORKER_RUNNING_COUNT_DESCRIPTION,
new AtomicInteger(0),
metricRegistry.tags(workerTask, workerGroup)
));

View File

@@ -66,6 +66,7 @@ public class WorkerTaskCallable extends AbstractWorkerCallable {
.onFailure(event -> metricRegistry
.counter(
MetricRegistry.METRIC_WORKER_TIMEOUT_COUNT,
MetricRegistry.METRIC_WORKER_TIMEOUT_COUNT_DESCRIPTION,
metricRegistry.tags(
this.workerTask,
MetricRegistry.TAG_ATTEMPT_COUNT, String.valueOf(event.getAttemptCount())

View File

@@ -534,7 +534,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
List<FlowWithTriggers> schedulable = this.computeSchedulable(flowWithDefaults, triggerContextsToEvaluate, scheduleContext);
metricRegistry
.counter(MetricRegistry.SCHEDULER_LOOP_COUNT)
.counter(MetricRegistry.METRIC_SCHEDULER_LOOP_COUNT, MetricRegistry.METRIC_SCHEDULER_LOOP_COUNT_DESCRIPTION)
.increment();
if (log.isTraceEnabled()) {
@@ -577,7 +577,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
}
metricRegistry
.counter(MetricRegistry.SCHEDULER_EVALUATE_COUNT)
.counter(MetricRegistry.METRIC_SCHEDULER_EVALUATE_COUNT, MetricRegistry.METRIC_SCHEDULER_EVALUATE_COUNT_DESCRIPTION)
.increment(readyForEvaluate.size());
// submit ready one to the worker
@@ -660,6 +660,9 @@ public abstract class AbstractScheduler implements Scheduler, Service {
}
});
});
metricRegistry
.timer(MetricRegistry.METRIC_SCHEDULER_EVALUATION_LOOP_DURATION, MetricRegistry.METRIC_SCHEDULER_EVALUATION_LOOP_DURATION_DESCRIPTION)
.record(Duration.between(now, ZonedDateTime.now()));
}
private List<FlowWithSource> getFlowsWithDefaults() {
@@ -744,7 +747,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
if (execution.isEmpty()) {
if (lastTrigger.getUpdatedDate() != null) {
metricRegistry
.timer(MetricRegistry.SCHEDULER_EXECUTION_MISSING_DURATION, metricRegistry.tags(lastTrigger))
.timer(MetricRegistry.METRIC_SCHEDULER_EXECUTION_MISSING_DURATION, MetricRegistry.METRIC_SCHEDULER_EXECUTION_MISSING_DURATION_DESCRIPTION, metricRegistry.tags(lastTrigger))
.record(Duration.between(lastTrigger.getUpdatedDate(), Instant.now()));
}
@@ -763,7 +766,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
if (lastTrigger.getUpdatedDate() != null) {
metricRegistry
.timer(MetricRegistry.SCHEDULER_EXECUTION_RUNNING_DURATION, metricRegistry.tags(lastTrigger))
.timer(MetricRegistry.METRIC_SCHEDULER_EXECUTION_LOCK_DURATION, MetricRegistry.METRIC_SCHEDULER_EXECUTION_LOCK_DURATION_DESCRIPTION, metricRegistry.tags(lastTrigger))
.record(Duration.between(lastTrigger.getUpdatedDate(), Instant.now()));
}
@@ -783,7 +786,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
private void log(SchedulerExecutionWithTrigger executionWithTrigger) {
metricRegistry
.counter(MetricRegistry.SCHEDULER_TRIGGER_COUNT, metricRegistry.tags(executionWithTrigger))
.counter(MetricRegistry.METRIC_SCHEDULER_TRIGGER_COUNT, MetricRegistry.METRIC_SCHEDULER_TRIGGER_COUNT_DESCRIPTION, metricRegistry.tags(executionWithTrigger))
.increment();
ZonedDateTime now = now();
@@ -800,7 +803,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
// FIXME : "late" are not excluded and can increase delay value (false positive)
if (next != null && now.isBefore(next)) {
metricRegistry
.timer(MetricRegistry.SCHEDULER_TRIGGER_DELAY_DURATION, metricRegistry.tags(executionWithTrigger))
.timer(MetricRegistry.METRIC_SCHEDULER_TRIGGER_DELAY_DURATION, MetricRegistry.METRIC_SCHEDULER_TRIGGER_DELAY_DURATION_DESCRIPTION, metricRegistry.tags(executionWithTrigger))
.record(Duration.between(
executionWithTrigger.getTriggerContext().getDate(), now
));

View File

@@ -6,11 +6,12 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.validations.ManualConstraintViolation;
import jakarta.validation.ConstraintViolationException;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
import jakarta.validation.ConstraintViolationException;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -42,7 +43,7 @@ public final class YamlParser {
return currentMapper.convertValue(input, cls);
} catch (IllegalArgumentException e) {
if(e.getCause() instanceof JsonProcessingException jsonProcessingException) {
jsonProcessingExceptionHandler(input, type(cls), jsonProcessingException);
throw toConstraintViolationException(input, type(cls), jsonProcessingException);
}
throw e;
@@ -50,7 +51,7 @@ public final class YamlParser {
}
private static <T> String type(Class<T> cls) {
return cls.getSimpleName().toLowerCase();
return FlowInterface.class.isAssignableFrom(cls) ? "flow" : cls.getSimpleName().toLowerCase();
}
public static <T> T parse(File file, Class<T> cls) throws ConstraintViolationException {
@@ -78,18 +79,17 @@ public final class YamlParser {
try {
return STRICT_MAPPER.readValue(input, objectClass);
} catch (JsonProcessingException e) {
jsonProcessingExceptionHandler(input, resource, e);
throw toConstraintViolationException(input, resource, e);
}
return null;
}
@SuppressWarnings("unchecked")
private static <T> void jsonProcessingExceptionHandler(T target, String resource, JsonProcessingException e) throws ConstraintViolationException {
public static <T> ConstraintViolationException toConstraintViolationException(T target, String resource, JsonProcessingException e) {
if (e.getCause() instanceof ConstraintViolationException constraintViolationException) {
throw constraintViolationException;
return constraintViolationException;
} else if (e instanceof InvalidTypeIdException invalidTypeIdException) {
// This error is thrown when a non-existing task is used
throw new ConstraintViolationException(
return new ConstraintViolationException(
"Invalid type: " + invalidTypeIdException.getTypeId(),
Set.of(
ManualConstraintViolation.of(
@@ -110,7 +110,7 @@ public final class YamlParser {
);
} else if (e instanceof UnrecognizedPropertyException unrecognizedPropertyException) {
var message = unrecognizedPropertyException.getOriginalMessage() + unrecognizedPropertyException.getMessageSuffix();
throw new ConstraintViolationException(
return new ConstraintViolationException(
message,
Collections.singleton(
ManualConstraintViolation.of(
@@ -122,8 +122,8 @@ public final class YamlParser {
)
));
} else {
throw new ConstraintViolationException(
"Illegal "+ resource +" yaml: " + e.getMessage(),
return new ConstraintViolationException(
"Illegal " + resource + " source: " + e.getMessage(),
Collections.singleton(
ManualConstraintViolation.of(
e.getCause() == null ? e.getMessage() : e.getMessage() + "\nCaused by: " + e.getCause().getMessage(),

View File

@@ -2,6 +2,7 @@ package io.kestra.core.services;
import io.kestra.core.events.CrudEvent;
import io.kestra.core.events.CrudEventType;
import io.kestra.core.exceptions.FlowProcessingException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.Label;
import io.kestra.core.models.conditions.ConditionContext;
@@ -343,7 +344,7 @@ public class ExecutionService {
}
// if it's a Pause task with no subtask, we terminate the task
if (task instanceof Pause pauseTask && pauseTask.getTasks() == null) {
if (task instanceof Pause pauseTask && ListUtils.isEmpty(pauseTask.getTasks())) {
if (newState == State.Type.RUNNING) {
newTaskRun = newTaskRun.withState(State.Type.SUCCESS);
} else if (newState == State.Type.KILLING) {
@@ -364,11 +365,12 @@ public class ExecutionService {
}
if (newExecution.getTaskRunList().stream().anyMatch(t -> t.getState().getCurrent() == State.Type.PAUSED)) {
// there is still some tasks paused, this can occur with parallel pause
// there are still some tasks paused, this can occur with parallel pause
return newExecution;
}
return newExecution
.withState(State.Type.RESTARTED);
// we need to cancel immediately or the executor will process the next task if it's restarted.
return newState == State.Type.CANCELLED ? newExecution.withState(State.Type.CANCELLED) : newExecution.withState(State.Type.RESTARTED);
}
public Execution markWithTaskRunAs(final Execution execution, String taskRunId, State.Type newState, Boolean markParents) throws Exception {
@@ -554,15 +556,14 @@ public class ExecutionService {
}
private Mono<Optional<Task>> getFirstPausedTaskOr(Execution execution, FlowInterface flow){
final FlowWithSource flowWithSource = pluginDefaultService.injectVersionDefaults(flow, false);
return Mono.create(sink -> {
try {
final FlowWithSource flowWithSource = pluginDefaultService.injectVersionDefaults(flow, false);
var runningTaskRun = execution
.findFirstByState(State.Type.PAUSED)
.map(throwFunction(task -> flowWithSource.findTaskByTaskId(task.getTaskId())));
sink.success(runningTaskRun);
} catch (InternalException e) {
} catch (InternalException | FlowProcessingException e) {
sink.error(e);
}
});
@@ -655,7 +656,7 @@ public class ExecutionService {
*
* @return the execution in a KILLING state if not already terminated
*/
public Execution kill(Execution execution, Flow flow) {
public Execution kill(Execution execution, FlowInterface flow) {
if (execution.getState().getCurrent() == State.Type.KILLING || execution.getState().isTerminated()) {
return execution;
}

View File

@@ -1,6 +1,8 @@
package io.kestra.core.services;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.kestra.core.exceptions.FlowProcessingException;
import io.kestra.core.exceptions.KestraRuntimeException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowId;
@@ -70,7 +72,7 @@ public class FlowService {
* @param strictValidation Specifies whether to perform a strict validation of the flow.
* @return The created {@link FlowWithSource}.
*/
public FlowWithSource create(final GenericFlow flow, final boolean strictValidation) {
public FlowWithSource create(final GenericFlow flow, final boolean strictValidation) throws FlowProcessingException {
Objects.requireNonNull(flow, "Cannot create null flow");
if (flow.getSource() == null || flow.getSource().isBlank()) {
throw new IllegalArgumentException("Cannot create flow with null or blank source");
@@ -123,6 +125,13 @@ public class FlowService {
modelValidator.validate(flow);
} catch (ConstraintViolationException e) {
validateConstraintViolationBuilder.constraints(e.getMessage());
} catch (FlowProcessingException e) {
if (e.getCause() instanceof ConstraintViolationException) {
validateConstraintViolationBuilder.constraints(e.getMessage());
} else {
Throwable cause = e.getCause() != null ? e.getCause() : e;
validateConstraintViolationBuilder.constraints("Unable to validate the flow: " + cause.getMessage());
}
} catch (RuntimeException re) {
// In case of any error, we add a validation violation so the error is displayed in the UI.
// We may change that by throwing an internal error and handle it in the UI, but this should not occur except for rare cases
@@ -130,25 +139,20 @@ public class FlowService {
log.error("Unable to validate the flow", re);
validateConstraintViolationBuilder.constraints("Unable to validate the flow: " + re.getMessage());
}
return validateConstraintViolationBuilder.build();
})
.collect(Collectors.toList());
}
public FlowWithSource importFlow(String tenantId, String source) {
public FlowWithSource importFlow(String tenantId, String source) throws FlowProcessingException {
return this.importFlow(tenantId, source, false);
}
public FlowWithSource importFlow(String tenantId, String source, boolean dryRun) {
if (flowRepository.isEmpty()) {
throw noRepositoryException();
}
public FlowWithSource importFlow(String tenantId, String source, boolean dryRun) throws FlowProcessingException {
final GenericFlow flow = GenericFlow.fromYaml(tenantId, source);
FlowRepositoryInterface flowRepository = this.flowRepository.get();
Optional<FlowWithSource> maybeExisting = flowRepository.findByIdWithSource(
Optional<FlowWithSource> maybeExisting = repository().findByIdWithSource(
flow.getTenantId(),
flow.getNamespace(),
flow.getId(),
@@ -169,8 +173,8 @@ public class FlowService {
.orElseGet(() -> FlowWithSource.of(flowToImport, source).toBuilder().revision(1).build());
} else {
return maybeExisting
.map(previous -> flowRepository.update(flow, previous))
.orElseGet(() -> flowRepository.create(flow));
.map(previous -> repository().update(flow, previous))
.orElseGet(() -> repository().create(flow));
}
}

View File

@@ -1,5 +1,6 @@
package io.kestra.core.services;
import io.kestra.core.exceptions.FlowProcessingException;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.FlowWithSource;
@@ -15,6 +16,7 @@ import io.kestra.core.utils.GraphUtils;
import io.micronaut.data.model.Pageable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
@@ -34,27 +36,28 @@ public class GraphService {
@Inject
private RunContextFactory runContextFactory;
public FlowGraph flowGraph(FlowWithSource flow, List<String> expandedSubflows) throws IllegalVariableEvaluationException {
public FlowGraph flowGraph(FlowWithSource flow, List<String> expandedSubflows) throws IllegalVariableEvaluationException, FlowProcessingException {
return this.flowGraph(flow, expandedSubflows, null);
}
public FlowGraph flowGraph(FlowWithSource flow, List<String> expandedSubflows, Execution execution) throws IllegalVariableEvaluationException {
public FlowGraph flowGraph(FlowWithSource flow, List<String> expandedSubflows, Execution execution) throws IllegalVariableEvaluationException, FlowProcessingException {
return FlowGraph.of(this.of(flow, Optional.ofNullable(expandedSubflows).orElse(Collections.emptyList()), new HashMap<>(), execution));
}
public FlowGraph executionGraph(FlowWithSource flow, List<String> expandedSubflows, Execution execution) throws IllegalVariableEvaluationException {
public FlowGraph executionGraph(FlowWithSource flow, List<String> expandedSubflows, Execution execution) throws IllegalVariableEvaluationException, FlowProcessingException {
return FlowGraph.of(this.of(flow, Optional.ofNullable(expandedSubflows).orElse(Collections.emptyList()), new HashMap<>(), execution));
}
public GraphCluster of(FlowWithSource flow, List<String> expandedSubflows, Map<String, FlowWithSource> flowByUid, Execution execution) throws IllegalVariableEvaluationException {
public GraphCluster of(FlowWithSource flow, List<String> expandedSubflows, Map<String, FlowWithSource> flowByUid, Execution execution) throws IllegalVariableEvaluationException, FlowProcessingException {
return this.of(null, flow, expandedSubflows, flowByUid, execution);
}
public GraphCluster of(GraphCluster baseGraph, FlowWithSource flow, List<String> expandedSubflows, Map<String, FlowWithSource> flowByUid) throws IllegalVariableEvaluationException {
public GraphCluster of(GraphCluster baseGraph, FlowWithSource flow, List<String> expandedSubflows, Map<String, FlowWithSource> flowByUid) throws IllegalVariableEvaluationException, FlowProcessingException {
return this.of(baseGraph, flow, expandedSubflows, flowByUid, null);
}
public GraphCluster of(GraphCluster baseGraph, FlowWithSource flow, List<String> expandedSubflows, Map<String, FlowWithSource> flowByUid, Execution execution) throws IllegalVariableEvaluationException {
@SneakyThrows
public GraphCluster of(GraphCluster baseGraph, FlowWithSource flow, List<String> expandedSubflows, Map<String, FlowWithSource> flowByUid, Execution execution) throws IllegalVariableEvaluationException, FlowProcessingException {
String tenantId = flow.getTenantId();
flow = pluginDefaultService.injectAllDefaults(flow, false);
List<Trigger> triggers = null;

View File

@@ -6,6 +6,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import io.kestra.core.exceptions.FlowProcessingException;
import io.kestra.core.exceptions.KestraRuntimeException;
import io.kestra.core.models.Plugin;
import io.kestra.core.models.executions.Execution;
@@ -196,14 +197,8 @@ public class PluginDefaultService {
try {
return this.injectAllDefaults(flow, false);
} catch (Exception e) {
logger.warn(
"Can't inject plugin defaults on tenant {}, namespace '{}', flow '{}' with errors '{}'",
flow.getTenantId(),
flow.getNamespace(),
flow.getId(),
e.getMessage(),
e
);
String cause = e.getCause() != null ? e.getCause().getMessage() : e.getMessage();
logService.get().logExecution(flow, logger, Level.WARN, "Unable to inject plugin defaults. Cause: '{}'", cause);
return readWithoutDefaultsOrThrow(flow);
}
}
@@ -238,12 +233,12 @@ public class PluginDefaultService {
* </ul>
*
* @param flow the flow to be parsed
* @param strictParsing specifies if the source must meet strict validation requirements
* @return a parsed {@link FlowWithSource}
*
* @throws ConstraintViolationException if {@code strictParsing} is {@code true} and the source does not meet strict validation requirements
* @throws KestraRuntimeException if an error occurs while parsing the flow and it cannot be processed
* @throws FlowProcessingException if an error occurred while processing the flow
*/
public FlowWithSource injectAllDefaults(final FlowInterface flow, final boolean strictParsing) {
public FlowWithSource injectAllDefaults(final FlowInterface flow, final boolean strictParsing) throws FlowProcessingException {
// Flow revisions created from older Kestra versions may not be linked to their original source.
// In such cases, fall back to the generated source approach to enable plugin default injection.
@@ -256,15 +251,21 @@ public class PluginDefaultService {
throw new IllegalArgumentException(error);
}
return parseFlowWithAllDefaults(
flow.getTenantId(),
flow.getNamespace(),
flow.getRevision(),
flow.isDeleted(),
source,
false,
strictParsing
);
try {
return parseFlowWithAllDefaults(
flow.getTenantId(),
flow.getNamespace(),
flow.getRevision(),
flow.isDeleted(),
source,
false,
strictParsing
);
} catch (ConstraintViolationException e) {
throw new FlowProcessingException(e);
} catch (JsonProcessingException e) {
throw new FlowProcessingException(YamlParser.toConstraintViolationException(source, "Flow", e));
}
}
/**
@@ -282,17 +283,19 @@ public class PluginDefaultService {
* @param flow the flow to be parsed
* @param safe whether parsing errors should be handled gracefully
* @return a parsed {@link FlowWithSource}, or a {@link FlowWithException} if parsing fails and {@code safe} is {@code true}
*
* @throws FlowProcessingException if an error occurred while processing the flow and {@code safe} is {@code false}.
*/
public FlowWithSource injectVersionDefaults(final FlowInterface flow, final boolean safe) {
public FlowWithSource injectVersionDefaults(final FlowInterface flow, final boolean safe) throws FlowProcessingException {
if (flow instanceof FlowWithSource flowWithSource) {
// shortcut - if the flow is already fully parsed return it immediately.
return flowWithSource;
}
FlowWithSource result;
String source = flow.getSource();
try {
try {
String source = flow.getSource();
if (source == null) {
source = OBJECT_MAPPER.writeValueAsString(flow);
}
@@ -300,13 +303,13 @@ public class PluginDefaultService {
result = parseFlowWithAllDefaults(flow.getTenantId(), flow.getNamespace(), flow.getRevision(), flow.isDeleted(), source, true, false);
} catch (Exception e) {
if (safe) {
logService.get().logExecution(flow, log, Level.ERROR, "Failed to read flow.", e);
logService.get().logExecution(flow, log, Level.WARN, "Unable to inject plugin default versions. Cause: {}", e.getMessage());
result = FlowWithException.from(flow, e);
// deleted is not part of the original 'source'
result = result.toBuilder().deleted(flow.isDeleted()).build();
} else {
throw new KestraRuntimeException(e);
throw new FlowProcessingException(e);
}
}
return result;
@@ -314,7 +317,7 @@ public class PluginDefaultService {
public Map<String, Object> injectVersionDefaults(@Nullable final String tenantId,
final String namespace,
final Map<String, Object> mapFlow) {
final Map<String, Object> mapFlow) throws FlowProcessingException {
return innerInjectDefault(tenantId, namespace, mapFlow, true);
}
@@ -325,10 +328,16 @@ public class PluginDefaultService {
* @param source the flow source.
* @return a new {@link FlowWithSource}.
*
* @throws ConstraintViolationException when parsing flow.
* @throws FlowProcessingException when parsing flow.
*/
public FlowWithSource parseFlowWithAllDefaults(@Nullable final String tenantId, final String source, final boolean strict) throws ConstraintViolationException {
return parseFlowWithAllDefaults(tenantId, null, null, false, source, false, strict);
public FlowWithSource parseFlowWithAllDefaults(@Nullable final String tenantId, final String source, final boolean strict) throws FlowProcessingException {
try {
return parseFlowWithAllDefaults(tenantId, null, null, false, source, false, strict);
} catch (ConstraintViolationException e) {
throw new FlowProcessingException(e);
} catch (JsonProcessingException e) {
throw new FlowProcessingException(YamlParser.toConstraintViolationException(source, "Flow", e));
}
}
/**
@@ -348,36 +357,32 @@ public class PluginDefaultService {
final boolean isDeleted,
final String source,
final boolean onlyVersions,
final boolean strictParsing) throws ConstraintViolationException {
try {
Map<String, Object> mapFlow = OBJECT_MAPPER.readValue(source, JacksonMapper.MAP_TYPE_REFERENCE);
namespace = namespace == null ? (String) mapFlow.get("namespace") : namespace;
revision = revision == null ? (Integer) mapFlow.get("revision") : revision;
final boolean strictParsing) throws ConstraintViolationException, JsonProcessingException {
Map<String, Object> mapFlow = OBJECT_MAPPER.readValue(source, JacksonMapper.MAP_TYPE_REFERENCE);
namespace = namespace == null ? (String) mapFlow.get("namespace") : namespace;
revision = revision == null ? (Integer) mapFlow.get("revision") : revision;
mapFlow = innerInjectDefault(tenant, namespace, mapFlow, onlyVersions);
mapFlow = innerInjectDefault(tenant, namespace, mapFlow, onlyVersions);
FlowWithSource withDefault = YamlParser.parse(mapFlow, FlowWithSource.class, strictParsing);
FlowWithSource withDefault = YamlParser.parse(mapFlow, FlowWithSource.class, strictParsing);
// revision, tenants, and deleted are not in the 'source', so we copy them manually
FlowWithSource full = withDefault.toBuilder()
.tenantId(tenant)
.revision(revision)
.deleted(isDeleted)
.source(source)
.build();
// revision, tenants, and deleted are not in the 'source', so we copy them manually
FlowWithSource full = withDefault.toBuilder()
.tenantId(tenant)
.revision(revision)
.deleted(isDeleted)
.source(source)
.build();
if (tenant != null) {
// This is a hack to set the tenant in template tasks.
// When using the Template task, we need the tenant to fetch the Template from the database.
// However, as the task is executed on the Executor we cannot retrieve it from the tenant service and have no other options.
// So we save it at flow creation/updating time.
full.allTasksWithChilds().stream().filter(task -> task instanceof Template).forEach(task -> ((Template) task).setTenantId(tenant));
}
return full;
} catch (JsonProcessingException e) {
throw new KestraRuntimeException(e);
if (tenant != null) {
// This is a hack to set the tenant in template tasks.
// When using the Template task, we need the tenant to fetch the Template from the database.
// However, as the task is executed on the Executor we cannot retrieve it from the tenant service and have no other options.
// So we save it at flow creation/updating time.
full.allTasksWithChilds().stream().filter(task -> task instanceof Template).forEach(task -> ((Template) task).setTenantId(tenant));
}
return full;
}
@@ -576,7 +581,14 @@ public class PluginDefaultService {
@Deprecated(forRemoval = true, since = "0.20")
public Flow injectDefaults(Flow flow) throws ConstraintViolationException {
if (flow instanceof FlowWithSource flowWithSource) {
return this.injectAllDefaults(flowWithSource, false);
try {
return this.injectAllDefaults(flowWithSource, false);
} catch (FlowProcessingException e) {
if (e.getCause() instanceof ConstraintViolationException cve) {
throw cve;
}
throw new KestraRuntimeException(e);
}
}
Map<String, Object> mapFlow = NON_DEFAULT_OBJECT_MAPPER.convertValue(flow, JacksonMapper.MAP_TYPE_REFERENCE);

View File

@@ -1,12 +1,14 @@
package io.kestra.plugin.core.dashboard.chart;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.dashboards.ColumnDescriptor;
import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.charts.DataChart;
import io.kestra.plugin.core.dashboard.chart.bars.BarOption;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -15,9 +17,40 @@ import lombok.experimental.SuperBuilder;
@SuperBuilder(toBuilder = true)
@Getter
@NoArgsConstructor
@Plugin
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
@EqualsAndHashCode
@Schema(
title = "Compare categorical data visually with bar charts."
)
@Plugin(
examples = {
@Example(
title = "Display a bar chart with with Executions per Namespace.",
full = true,
code = {
"charts:\n" +
"- id: executions_per_namespace_bars\n" +
"type: io.kestra.plugin.core.dashboard.chart.Bar\n" +
"chartOptions:\n" +
"displayName: Executions (per namespace)\n" +
"description: Executions count per namespace\n" +
"legend:\n" +
"enabled: true\n" +
"column: namespace\n" +
"data:\n" +
"type: io.kestra.plugin.core.dashboard.data.Executions\n" +
"columns:\n" +
"namespace:\n" +
"field: NAMESPACE\n" +
"state:\n" +
"field: STATE\n" +
"total:\n" +
"displayName: Execution\n" +
"agg: COUNT\n"
}
)
}
)
public class Bar<F extends Enum<F>, D extends DataFilter<F, ? extends ColumnDescriptor<F>>> extends DataChart<BarOption, D> {
@Override
public Integer minNumberOfAggregations() {

View File

@@ -1,9 +1,12 @@
package io.kestra.plugin.core.dashboard.chart;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.dashboards.ChartOption;
import io.kestra.core.models.dashboards.charts.Chart;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -12,9 +15,35 @@ import lombok.experimental.SuperBuilder;
@SuperBuilder(toBuilder = true)
@Getter
@NoArgsConstructor
@Plugin
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
@EqualsAndHashCode
@Schema(
title = "Add context and insights with customizable Markdown text."
)
@Plugin(
examples = {
@Example(
title = "Display custom content in place with Markdown.",
full = true,
code = {
"charts:\n" +
"- id: markdown_insight\n" +
"type: io.kestra.plugin.core.dashboard.chart.Markdown\n" +
"chartOptions:\n" +
"displayName: Chart Insights\n" +
"description: How to interpret this chart\n" +
"content: \"## Execution Success Rate\n" +
"This chart displays the percentage of successful executions over time.\n" +
"- A **higher success rate** indicates stable and reliable workflows.\n" +
"- Sudden **drops** may signal issues in task execution or external dependencies.\n" +
"- Use this insight to identify trends and optimize performance.\"\n"
}
)
}
)
public class Markdown extends Chart<ChartOption> {
private String content;
}

View File

@@ -1,11 +1,14 @@
package io.kestra.plugin.core.dashboard.chart;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.dashboards.ColumnDescriptor;
import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.charts.DataChart;
import io.kestra.plugin.core.dashboard.chart.pies.PieOption;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -14,9 +17,37 @@ import lombok.experimental.SuperBuilder;
@SuperBuilder(toBuilder = true)
@Getter
@NoArgsConstructor
@Plugin
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
@EqualsAndHashCode
@Schema(
title = "Show proportions and distributions using pie charts."
)
@Plugin(
examples = {
@Example(
title = "Display a pie chart with with Executions per State.",
full = true,
code = {
"charts:\n" +
"- id: executions_pie\n" +
"type: io.kestra.plugin.core.dashboard.chart.Pie\n" +
"chartOptions:\n" +
"displayName: Total Executions\n" +
"description: Total executions per state\n" +
"legend:\n" +
"enabled: true\n" +
"colorByColumn: state\n" +
"data:\n" +
"type: io.kestra.plugin.core.dashboard.data.Executions\n" +
"columns:\n" +
"state:\n" +
"field: STATE\n" +
"total:\n" +
"agg: COUNT\n"
}
)
}
)
public class Pie<F extends Enum<F>, D extends DataFilter<F, ? extends ColumnDescriptor<F>>> extends DataChart<PieOption, D> {
@Override
public Integer minNumberOfAggregations() {

View File

@@ -1,11 +1,14 @@
package io.kestra.plugin.core.dashboard.chart;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.charts.DataChart;
import io.kestra.plugin.core.dashboard.chart.tables.TableColumnDescriptor;
import io.kestra.plugin.core.dashboard.chart.tables.TableOption;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -14,8 +17,38 @@ import lombok.experimental.SuperBuilder;
@SuperBuilder(toBuilder = true)
@Getter
@NoArgsConstructor
@Plugin
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
@EqualsAndHashCode
@Schema(
title = "Display structured data in a clear, sortable table."
)
@Plugin(
examples = {
@Example(
title = "Display a table with a Log count for each level by Namespace.",
full = true,
code = {
"charts:\n" +
"- id: table_logs\n" +
"type: io.kestra.plugin.core.dashboard.chart.Table\n" +
"chartOptions:\n" +
"displayName: Log count by level for filtered namespace\n" +
"data:\n" +
"type: io.kestra.plugin.core.dashboard.data.Logs\n" +
"columns:\n" +
"level:\n" +
"field: LEVEL\n" +
"count:\n" +
"agg: COUNT\n" +
"where:\n" +
"- field: NAMESPACE\n" +
"type: IN\n" +
"values:\n" +
"- dev_graph\n" +
"- prod_graph\n"
}
)
}
)
public class Table<F extends Enum<F>, D extends DataFilter<F, ? extends TableColumnDescriptor<F>>> extends DataChart<TableOption, D> {
}

View File

@@ -1,12 +1,15 @@
package io.kestra.plugin.core.dashboard.chart;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.dashboards.DataFilter;
import io.kestra.core.models.dashboards.charts.DataChart;
import io.kestra.core.validations.TimeSeriesChartValidation;
import io.kestra.plugin.core.dashboard.chart.timeseries.TimeSeriesColumnDescriptor;
import io.kestra.plugin.core.dashboard.chart.timeseries.TimeSeriesOption;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -15,10 +18,49 @@ import lombok.experimental.SuperBuilder;
@SuperBuilder(toBuilder = true)
@Getter
@NoArgsConstructor
@Plugin
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
@EqualsAndHashCode
@TimeSeriesChartValidation
@Schema(
title = "Track trends over time with dynamic time series charts."
)
@Plugin(
examples = {
@Example(
title = "Display a chart with Executions over the last week.",
full = true,
code = {
"charts:\n" +
"- id: executions_timeseries\n" +
"type: io.kestra.plugin.core.dashboard.chart.TimeSeries\n" +
"chartOptions:\n" +
"displayName: Executions\n" +
"description: Executions last week\n" +
"legend:\n" +
"enabled: true\n" +
"column: date\n" +
"colorByColumn: state\n" +
"data:\n" +
"type: io.kestra.plugin.core.dashboard.data.Executions\n" +
"columns:\n" +
"date:\n" +
"field: START_DATE\n" +
"displayName: Date\n" +
"state:\n" +
"field: STATE\n" +
"total:\n" +
"displayName: Executions\n" +
"agg: COUNT\n" +
"graphStyle: BARS\n" +
"duration:\n" +
"displayName: Duration\n" +
"field: DURATION\n" +
"agg: SUM\n" +
"graphStyle: LINES\n"
}
)
}
)
public class TimeSeries<F extends Enum<F>, D extends DataFilter<F, ? extends TimeSeriesColumnDescriptor<F>>> extends DataChart<TimeSeriesOption, D> {
@Override
public Integer minNumberOfAggregations() {

View File

@@ -22,7 +22,7 @@ import org.slf4j.event.Level;
@Getter
@NoArgsConstructor
@Schema(
title = "Log a message in the task logs.",
title = "Log a message in the task logs (Deprecated).",
description = "This task is deprecated, please use the `io.kestra.plugin.core.log.Log` task instead.",
deprecated = true
)

View File

@@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
@Getter
@NoArgsConstructor
@Schema(
title = "Assert some conditions.",
title = "Assert some conditions to control task output data.",
description = "Used to control outputs data emitted from previous task on this execution."
)
@Plugin(

View File

@@ -31,7 +31,7 @@ import static io.kestra.core.utils.Rethrow.throwPredicate;
@Getter
@NoArgsConstructor
@Schema(
title = "List execution counts for a list of flow.",
title = "List execution counts for a list of flows.",
description = "This can be used to send an alert if a condition is met about execution counts."
)
@Plugin(

View File

@@ -30,7 +30,7 @@ import java.util.Optional;
@Getter
@NoArgsConstructor
@Schema(
title = "Exit the execution: terminate it in the state defined by the property `state`.",
title = "Terminate an execution in the state defined by the property state.",
description = "Note that if this execution has running tasks, for example in a parallel branch, the tasks will not be terminated except if `state` is set to `KILLED`."
)
@Plugin(

View File

@@ -23,7 +23,7 @@ import lombok.experimental.SuperBuilder;
@Getter
@NoArgsConstructor
@Schema(
title = "Fail the execution.",
title = "Intentionally fail the execution.",
description = "Used to fail the execution, for example, on a switch branch or on some conditions based on the execution context."
)
@Plugin(
@@ -32,82 +32,82 @@ import lombok.experimental.SuperBuilder;
full = true,
title = "Fail on a switch branch",
code = """
id: fail_on_switch
namespace: company.team
inputs:
- id: param
type: STRING
required: true
tasks:
- id: switch
type: io.kestra.plugin.core.flow.Switch
value: "{{inputs.param}}"
cases:
case1:
- id: case1
type: io.kestra.plugin.core.log.Log
message: Case 1
case2:
- id: case2
type: io.kestra.plugin.core.log.Log
message: Case 2
notexist:
- id: fail
type: io.kestra.plugin.core.execution.Fail
default:
- id: default
type: io.kestra.plugin.core.log.Log
message: default
id: fail_on_switch
namespace: company.team
inputs:
- id: param
type: STRING
required: true
tasks:
- id: switch
type: io.kestra.plugin.core.flow.Switch
value: "{{inputs.param}}"
cases:
case1:
- id: case1
type: io.kestra.plugin.core.log.Log
message: Case 1
case2:
- id: case2
type: io.kestra.plugin.core.log.Log
message: Case 2
notexist:
- id: fail
type: io.kestra.plugin.core.execution.Fail
default:
- id: default
type: io.kestra.plugin.core.log.Log
message: default
"""
),
@Example(
full = true,
title = "Fail on a condition",
code = """
id: fail_on_condition
namespace: company.team
inputs:
- name: param
type: STRING
required: true
tasks:
- id: before
type: io.kestra.plugin.core.debug.Echo
format: I'm before the fail on condition
id: fail_on_condition
namespace: company.team
inputs:
- name: param
type: STRING
required: true
tasks:
- id: before
type: io.kestra.plugin.core.debug.Echo
format: I'm before the fail on condition
- id: fail
type: io.kestra.plugin.core.execution.Fail
condition: '{{ inputs.param == "fail" }}'
- id: fail
type: io.kestra.plugin.core.execution.Fail
condition: '{{ inputs.param == "fail" }}'
- id: after
type: io.kestra.plugin.core.debug.Echo
format: I'm after the fail on condition
- id: after
type: io.kestra.plugin.core.debug.Echo
format: I'm after the fail on condition
"""
),
@Example(
full = true,
title = "Using errorLogs function to send error message to Slack",
code = """
id: error_logs
namespace: company.team
id: error_logs
namespace: company.team
tasks:
- id: fail
type: io.kestra.plugin.core.execution.Fail
errorMessage: Something went wrong, make sure to fix it asap!
tasks:
- id: fail
type: io.kestra.plugin.core.execution.Fail
errorMessage: Something went wrong, make sure to fix it asap!
errors:
- id: slack
type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
url: "{{ secret('SLACK_WEBHOOK') }}"
payload: |
{
"text": "Failure alert for flow `{{ flow.namespace }}.{{ flow.id }}` with ID `{{ execution.id }}`. Here is a bit more context about why the execution failed: `{{ errorLogs()[0]['message'] }}`"
}
errors:
- id: slack
type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
url: "{{ secret('SLACK_WEBHOOK') }}"
payload: |
{
"text": "Failure alert for flow `{{ flow.namespace }}.{{ flow.id }}` with ID `{{ execution.id }}`. Here is a bit more context about why the execution failed: `{{ errorLogs()[0]['message'] }}`"
}
"""
)
},

View File

@@ -37,7 +37,8 @@ import java.util.Map;
@Getter
@NoArgsConstructor
@Schema(
title = "Resume a paused execution. By default, the task assumes that you want to resume the current `executionId`. If you want to programmatically resume an execution of another flow, make sure to define the `executionId`, `flowId`, and `namespace` properties explicitly. Using the `inputs` property, you can additionally pass custom `onResume` input values to the execution."
title = "Resume a paused execution.",
description = "By default, the task assumes that you want to resume the current `executionId`. If you want to programmatically resume an execution of another flow, make sure to define the `executionId`, `flowId`, and `namespace` properties explicitly. Using the `inputs` property, you can additionally pass custom `onResume` input values to the execution."
)
@Plugin(
examples = {

View File

@@ -10,6 +10,7 @@ import io.kestra.core.models.executions.NextTaskRun;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.hierarchies.RelationType;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.*;
import io.kestra.core.runners.FlowableUtils;
import io.kestra.core.runners.RunContext;
@@ -35,7 +36,7 @@ import java.util.stream.Stream;
@NoArgsConstructor
@DagTaskValidation
@Schema(
title = "Create a directed acyclic graph (DAG) of tasks without explicitly specifying the order in which the tasks need to run.",
title = "Create a DAG of tasks without explicitly specifying the order in which the tasks must run.",
description = "List your tasks and their dependencies, and Kestra will figure out the execution sequence.\n" +
"Each task can only depend on other tasks from the DAG task.\n" +
"For technical reasons, low-code interaction via UI forms is disabled for now when using this task."
@@ -93,8 +94,7 @@ public class Dag extends Task implements FlowableTask<VoidOutput> {
title = "Number of concurrent parallel tasks that can be running at any point in time.",
description = "If the value is `0`, no concurrency limit exists for the tasks in a DAG and all tasks that can run in parallel will start at the same time."
)
@PluginProperty
private final Integer concurrent = 0;
private final Property<Integer> concurrent = Property.of(0);
@Valid
@NotEmpty
@@ -171,7 +171,7 @@ public class Dag extends Task implements FlowableTask<VoidOutput> {
FlowableUtils.resolveTasks(this.errors, parentTaskRun),
FlowableUtils.resolveTasks(this._finally, parentTaskRun),
parentTaskRun,
this.concurrent,
runContext.render(this.concurrent).as(Integer.class).orElseThrow(),
this.tasks
);
}

View File

@@ -10,6 +10,7 @@ import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.hierarchies.RelationType;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.VoidOutput;
@@ -31,7 +32,7 @@ import java.util.Optional;
@Getter
@NoArgsConstructor
@Schema(
title = "For each value in the list, execute one or more tasks in parallel.",
title = "For each value in the list, execute one or more tasks in parallel (Deprecated).",
description = "This task is deprecated, please use the `io.kestra.plugin.core.flow.ForEach` task instead.\n\n" +
"The list of `tasks` will be executed for each item in parallel. " +
"The value must be a valid JSON string representing an array, e.g. a list of strings `[\"value1\", \"value2\"]` or a list of dictionaries `[{\"key\": \"value1\"}, {\"key\": \"value2\"}]`.\n" +
@@ -128,8 +129,7 @@ public class EachParallel extends Parallel implements FlowableTask<VoidOutput> {
title = "Number of concurrent parallel tasks that can be running at any point in time.",
description = "If the value is `0`, no limit exist and all the tasks will start at the same time."
)
@PluginProperty
private final Integer concurrent = 0;
private final Property<Integer> concurrent = Property.of(0);
@NotNull
@PluginProperty(dynamic = true)
@@ -191,7 +191,7 @@ public class EachParallel extends Parallel implements FlowableTask<VoidOutput> {
FlowableUtils.resolveTasks(this.errors, parentTaskRun),
FlowableUtils.resolveTasks(this._finally, parentTaskRun),
parentTaskRun,
this.concurrent
runContext.render(this.concurrent).as(Integer.class).orElseThrow()
);
}
}

View File

@@ -34,7 +34,7 @@ import java.util.Optional;
@Getter
@NoArgsConstructor
@Schema(
title = "For each value in the list, execute one or more tasks sequentially.",
title = "For each value in the list, execute one or more tasks sequentially (Deprecated).",
description = "This task is deprecated, please use the `io.kestra.plugin.core.flow.ForEach` task instead.\n\n" +
"The list of `tasks` will be executed for each item sequentially. " +
"The value must be a valid JSON string representing an array, e.g. a list of strings `[\"value1\", \"value2\"]` or a list of dictionaries `[{\"key\": \"value1\"}, {\"key\": \"value2\"}]`. \n\n" +

View File

@@ -11,6 +11,7 @@ import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.hierarchies.RelationType;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
@@ -69,12 +70,11 @@ import java.util.stream.Stream;
aliases = "io.kestra.core.tasks.flows.If"
)
public class If extends Task implements FlowableTask<If.Output> {
@PluginProperty(dynamic = true)
@Schema(
title = "The `If` condition which can be any expression that evaluates to a boolean value.",
description = "Boolean coercion allows 0, -0, null and '' to evaluate to false, all other values will evaluate to true."
)
private String condition;
private Property<String> condition;
@Valid
@PluginProperty
@@ -205,7 +205,7 @@ public class If extends Task implements FlowableTask<If.Output> {
}
private Boolean isTrue(RunContext runContext) throws IllegalVariableEvaluationException {
String rendered = runContext.render(condition);
String rendered = runContext.render(condition).as(String.class).orElse(null);
return TruthUtils.isTruthy(rendered);
}

View File

@@ -12,6 +12,7 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.models.hierarchies.AbstractGraph;
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.hierarchies.RelationType;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
@@ -90,19 +91,17 @@ public class LoopUntil extends Task implements FlowableTask<LoopUntil.Output> {
private List<Task> tasks;
@NotNull
@PluginProperty(dynamic = true)
@Schema(
title = "The condition expression that should evaluate to `true` or `false`.",
description = "Boolean coercion allows 0, -0, null and '' to evaluate to false; all other values will evaluate to true."
)
private String condition;
private Property<String> condition;
@Schema(
title = "If set to `true`, the task run will end in a failed state once the `maxIterations` or `maxDuration` are reached."
)
@Builder.Default
@PluginProperty
private Boolean failOnMaxReached = false;
private Property<Boolean> failOnMaxReached = Property.of(false);
@Schema(
title = "Check the frequency configuration."
@@ -159,16 +158,16 @@ public class LoopUntil extends Task implements FlowableTask<LoopUntil.Output> {
public Instant nextExecutionDate(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
if (!this.reachedMaximums(runContext, execution, parentTaskRun, false)) {
String continueLoop = runContext.render(this.condition);
String continueLoop = runContext.render(this.condition).as(String.class).orElse(null);
if (!TruthUtils.isTruthy(continueLoop)) {
return Instant.now().plus(this.checkFrequency.interval);
return Instant.now().plus(runContext.render(this.getCheckFrequency().getInterval()).as(Duration.class).orElseThrow());
}
}
return null;
}
private boolean reachedMaximums(RunContext runContext, Execution execution, TaskRun parentTaskRun, Boolean printLog) {
private boolean reachedMaximums(RunContext runContext, Execution execution, TaskRun parentTaskRun, Boolean printLog) throws IllegalVariableEvaluationException {
Logger logger = runContext.logger();
if (!this.childTaskRunExecuted(execution, parentTaskRun)) {
@@ -178,14 +177,18 @@ public class LoopUntil extends Task implements FlowableTask<LoopUntil.Output> {
Integer iterationCount = Optional.ofNullable(parentTaskRun.getOutputs())
.map(outputs -> (Integer) outputs.get("iterationCount"))
.orElse(0);
if (this.checkFrequency.maxIterations != null && iterationCount != null && iterationCount > this.checkFrequency.maxIterations) {
Optional<Integer> maxIterations = runContext.render(this.getCheckFrequency().getMaxIterations()).as(Integer.class);
if (maxIterations.isPresent() && iterationCount != null && iterationCount > maxIterations.get()) {
if (printLog) {logger.warn("Max iterations reached");}
return true;
}
Instant creationDate = parentTaskRun.getState().getHistories().getFirst().getDate();
if (this.checkFrequency.maxDuration != null &&
creationDate != null && creationDate.plus(this.checkFrequency.maxDuration).isBefore(Instant.now())) {
Optional<Duration> maxDuration = runContext.render(this.getCheckFrequency().getMaxDuration()).as(Duration.class);
if (maxDuration.isPresent()
&& creationDate != null
&& creationDate.plus(maxDuration.get()).isBefore(Instant.now())) {
if (printLog) {logger.warn("Max duration reached");}
return true;
@@ -201,7 +204,10 @@ public class LoopUntil extends Task implements FlowableTask<LoopUntil.Output> {
return Optional.empty();
}
if (childTaskExecuted && this.reachedMaximums(runContext, execution, parentTaskRun, true) && this.failOnMaxReached) {
if (childTaskExecuted
&& this.reachedMaximums(runContext, execution, parentTaskRun, true)
&& Boolean.TRUE.equals(runContext.render(this.failOnMaxReached).as(Boolean.class).orElseThrow())
) {
return Optional.of(State.Type.FAILED);
}
@@ -269,21 +275,18 @@ public class LoopUntil extends Task implements FlowableTask<LoopUntil.Output> {
title = "Maximum count of iterations."
)
@Builder.Default
@PluginProperty
private Integer maxIterations = 100;
private Property<Integer> maxIterations = Property.of(100);
@Schema(
title = "Maximum duration of the task."
)
@Builder.Default
@PluginProperty
private Duration maxDuration = Duration.ofHours(1);
private Property<Duration> maxDuration = Property.of(Duration.ofHours(1));
@Schema(
title = "Interval between each iteration."
)
@Builder.Default
@PluginProperty
private Duration interval = Duration.ofSeconds(1);
private Property<Duration> interval = Property.of(Duration.ofSeconds(1));
}
}

View File

@@ -2,6 +2,7 @@ package io.kestra.plugin.core.flow;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
@@ -110,8 +111,7 @@ public class Parallel extends Task implements FlowableTask<VoidOutput> {
title = "Number of concurrent parallel tasks that can be running at any point in time.",
description = "If the value is `0`, no limit exist and all tasks will start at the same time."
)
@PluginProperty
private final Integer concurrent = 0;
private final Property<Integer> concurrent = Property.of(0);
@Valid
@PluginProperty
@@ -173,7 +173,7 @@ public class Parallel extends Task implements FlowableTask<VoidOutput> {
FlowableUtils.resolveTasks(this.errors, parentTaskRun),
FlowableUtils.resolveTasks(this._finally, parentTaskRun),
parentTaskRun,
this.concurrent
runContext.render(this.concurrent).as(Integer.class).orElseThrow()
);
}
}

View File

@@ -22,8 +22,10 @@ import io.kestra.core.runners.FlowableUtils;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.GraphUtils;
import io.kestra.core.utils.ListUtils;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import lombok.*;
import lombok.experimental.SuperBuilder;
@@ -40,9 +42,9 @@ import java.util.stream.Stream;
@Getter
@NoArgsConstructor
@Schema(
title = "Pause the current execution and wait for a manual approval (either by humans or other automated processes).",
description = "All tasks downstream from the Pause task will be put on hold until the execution is manually resumed from the UI.\n\n" +
"The Execution will be in a Paused state, and you can either manually resume it by clicking on the \"Resume\" button in the UI or by calling the POST API endpoint `/api/v1/executions/{executionId}/resume`. The execution can also be resumed automatically after a timeout."
title = "Pause the current execution and wait for approval (either by humans or other automated processes).",
description = "All tasks downstream from the Pause task will be put on hold until the execution is manually resumed from the UI.\n\n" +
"The Execution will be in a Paused state, and you can either manually resume it by clicking on the \"Resume\" button in the UI or by calling the POST API endpoint `/api/v1/executions/{executionId}/resume`. The execution can also be resumed automatically after the `pauseDuration`."
)
@Plugin(
examples = {
@@ -129,6 +131,24 @@ import java.util.stream.Stream;
type: io.kestra.plugin.core.log.Log
message: Status is {{ outputs.wait_for_approval.onResume.reason }}. Process finished with {{ outputs.approve.body }}
"""
),
@Example(
title = "Pause the execution and set the execution in WARNING if it has not been resumed after 5 minutes",
full = true,
code = """
id: pause_warn
namespace: company.team
tasks:
- id: pause
type: io.kestra.plugin.core.flow.Pause
pauseDuration: PT5M
behavior: WARN
- id: post_resume
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }} started on {{ taskrun.startDate }} after the Pause"
"""
)
},
aliases = "io.kestra.core.tasks.flows.Pause"
@@ -136,17 +156,38 @@ import java.util.stream.Stream;
public class Pause extends Task implements FlowableTask<Pause.Output> {
@Schema(
title = "Duration of the pause — useful if you want to pause the execution for a fixed amount of time.",
description = "The delay is a string in the [ISO 8601 Duration](https://en.wikipedia.org/wiki/ISO_8601#Durations) format, e.g. `PT1H` for 1 hour, `PT30M` for 30 minutes, `PT10S` for 10 seconds, `P1D` for 1 day, etc. If no delay and no timeout are configured, the execution will never end until it's manually resumed from the UI or API.",
description = "**Deprecated**: use `pauseDuration` instead.",
implementation = Duration.class
)
@Deprecated
private Property<Duration> delay;
@Deprecated
public void setDelay(Property<Duration> delay) {
this.delay = delay;
this.pauseDuration = delay;
}
@Schema(
title = "Timeout of the pause — useful to avoid never-ending workflows in a human-in-the-loop scenario. For example, if you want to pause the execution until a human validates some data generated in a previous task, you can set a timeout of e.g. 24 hours. If no manual approval happens within 24 hours, the execution will automatically resume without a prior data validation.",
description = "If no delay and no timeout are configured, the execution will never end until it's manually resumed from the UI or API.",
title = "Duration of the pause. If not set the task will wait forever to be manually resumed except if a timeout is set, in this case, the timeout will be honored.",
description = "The duration is a string in the [ISO 8601 Duration](https://en.wikipedia.org/wiki/ISO_8601#Durations) format, e.g. `PT1H` for 1 hour, `PT30M` for 30 minutes, `PT10S` for 10 seconds, `P1D` for 1 day, etc. If no pauseDuration and no timeout are configured, the execution will never end until it's manually resumed from the UI or API.",
implementation = Duration.class
)
private Property<Duration> timeout;
private Property<Duration> pauseDuration;
@Schema(
title = "Pause behavior, by default RESUME. What happens when a pause task reach its duration.",
description = """
Tasks that are resumed before the duration (for example, from the UI) will not use the behavior property but will always success.
Possible values are:
- RESUME: continue with the execution
- WARN: ends the Pause task in WARNING and continue with the execution
- FAIL: fail the Pause task
- CANCEL: cancel the execution"""
)
@NotNull
@Builder.Default
private Property<Behavior> behavior = Property.of(Behavior.RESUME);
@Valid
@Schema(
@@ -230,17 +271,34 @@ public class Pause extends Task implements FlowableTask<Pause.Output> {
parentTaskRun.getState().getHistories().stream().noneMatch(history -> history.getState() == State.Type.PAUSED);
}
// This method is only called when there are subtasks
@Override
public Optional<State.Type> resolveState(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
if (this.needPause(parentTaskRun)) {
return Optional.of(State.Type.PAUSED);
}
if (this.tasks == null || this.tasks.isEmpty()) {
return Optional.of(State.Type.SUCCESS);
}
return FlowableTask.super.resolveState(runContext, execution, parentTaskRun);
Behavior behavior = runContext.render(this.behavior).as(Behavior.class).orElse(Behavior.RESUME);
return switch (behavior) {
case Behavior.RESUME -> {
// yield SUCCESS or the final flowable task state
if (ListUtils.isEmpty(this.tasks)) {
yield Optional.of(State.Type.SUCCESS);
} else {
yield FlowableTask.super.resolveState(runContext, execution, parentTaskRun);
}
}
case Behavior.WARN -> {
// yield WARNING or the final flowable task state, if the flowable ends in SUCCESS, yield WARNING
if (ListUtils.isEmpty(this.tasks)) {
yield Optional.of(State.Type.WARNING);
} else {
Optional<State.Type> finalState = FlowableTask.super.resolveState(runContext, execution, parentTaskRun);
yield finalState.map(state -> state == State.Type.SUCCESS ? State.Type.WARNING : state);
}
}
case Behavior.CANCEL ,Behavior.FAIL -> throw new IllegalArgumentException("The " + behavior + " cannot be handled at this stage, this is certainly a bug!");
};
}
public Map<String, Object> generateOutputs(Map<String, Object> inputs) {
@@ -256,4 +314,21 @@ public class Pause extends Task implements FlowableTask<Pause.Output> {
public static class Output implements io.kestra.core.models.tasks.Output {
private Map<String, Object> onResume;
}
public enum Behavior {
RESUME(State.Type.RUNNING),
WARN(State.Type.WARNING),
CANCEL(State.Type.CANCELLED),
FAIL(State.Type.FAILED);
private final State.Type executionState;
Behavior(State.Type executionState) {
this.executionState = executionState;
}
public State.Type mapToState() {
return this.executionState;
}
}
}

View File

@@ -16,6 +16,7 @@ import io.kestra.core.runners.FlowableUtils;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.GraphUtils;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotEmpty;
import lombok.*;
import lombok.experimental.SuperBuilder;
@@ -30,7 +31,7 @@ import java.util.stream.Stream;
@Getter
@NoArgsConstructor
@Schema(
title = "Run tasks sequentially, one after the other, in the order they are defined.",
title = "Run tasks sequentially in the order they are defined.",
description = "Used to visually group tasks."
)
@Plugin(
@@ -76,7 +77,7 @@ public class Sequential extends Task implements FlowableTask<VoidOutput> {
@Valid
@PluginProperty
// FIXME -> issue with Pause @NotEmpty
@NotEmpty(message = "The 'tasks' property cannot be empty")
private List<Task> tasks;
@Override

Some files were not shown because too many files have changed in this diff Show More