Compare commits

...

94 Commits

Author SHA1 Message Date
Loïc Mathieu
63e11c7d94 chore(system): remove unused deleted column in logs and metrics 2025-12-18 18:07:02 +01:00
Loïc Mathieu
11e199da33 chore(system): implements soft deletion consistenly accross entities 2025-12-18 18:07:02 +01:00
Loïc Mathieu
5d5165b7b9 fix(test): flag flowConcurrencyKilled() test as flaky 2025-12-18 18:04:01 +01:00
Loïc Mathieu
44d0c10713 fix(api): add netty-codec-multipart-vintage
This should fix the multipart codec issue of Netty.

Fixes #9743
2025-12-18 17:12:55 +01:00
Loïc Mathieu
167734e32a chore(deps): upgrade Micronaut to 4.10.5
Closes https://github.com/kestra-io/kestra/pull/13713
2025-12-18 17:12:55 +01:00
Roman Acevedo
24e61c81c0 feat(blueprints): impl templated flow blueprints
# Conflicts:
#	core/src/main/java/io/kestra/core/serializers/YamlParser.java
2025-12-18 15:57:17 +01:00
brian.mulier
379764a033 fix(ns-files): FilesPurgeBehavior had wrong possible subtype due to wrong import
closes https://github.com/kestra-io/kestra/issues/13748
2025-12-18 15:48:11 +01:00
brian.mulier
d55dd275c3 fix(core): Property rendering was having issues deserializing some @JsonSubTypes
part of https://github.com/kestra-io/kestra/issues/13748
2025-12-18 15:48:11 +01:00
mustafatarek
f409657e8a feat(core): improve exception handling and validation with Inputs/Outputs
- Added InputOutputValidationException to represent Inputs/Outputs
  validation issues and added handler to it in ErrorsController
- Added support for throwing multiple constraint violations for the same
  input
- Added support for throwing multiple constraints at MultiselectInput
- Refactored exception handling at FlowInputOutput
- Added merge() function to combine constraint violation messages and
  added test for it at InputsTest
- Fixed the failed tests
2025-12-18 15:44:34 +01:00
GitHub Action
22f0b3ffdf chore(core): localize to languages other than english
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.
2025-12-18 13:05:14 +01:00
dependabot[bot]
0d99dc6862 build(deps): bump actions/upload-artifact from 5 to 6
Bumps [actions/upload-artifact](https://github.com/actions/upload-artifact) from 5 to 6.
- [Release notes](https://github.com/actions/upload-artifact/releases)
- [Commits](https://github.com/actions/upload-artifact/compare/v5...v6)

---
updated-dependencies:
- dependency-name: actions/upload-artifact
  dependency-version: '6'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-12-18 11:36:46 +01:00
Loïc Mathieu
fd3adc48b8 fix(ui): rephrase "kill parents" to "kill currernt"
This has always been kill current / kill current and sublow as we never kill parent executions, it's a kill on cascade that didn't go backward.

Part-of: #12557
2025-12-18 11:34:47 +01:00
YannC
1a8a47c8cd fix: Make sure parentTaskRun attempts are also set to Killed (#13736)
* fix: Make sure parentTaskRun attempts are also set to Killed

* test: added a test to check the correct behavior
2025-12-18 11:08:44 +01:00
Loïc Mathieu
7ea95f393e feat(execution): add a system.from label
Closes https://github.com/kestra-io/kestra-ee/issues/4699
2025-12-17 15:49:33 +01:00
Piyush Bhaskar
6935900699 fix(core): add a no-op update function to oss store to initialize update (#13732) 2025-12-17 19:47:40 +05:30
Saif M
0bc8e8d74a fix(flow) Improve Exception Handling with clear error message (#13674)
* fix: improved error handling

* including the line

* added tests

* added unit tests
2025-12-17 14:26:53 +01:00
dependabot[bot]
7f77b24ae0 build(deps): bump com.google.cloud:libraries-bom from 26.72.0 to 26.73.0
Bumps [com.google.cloud:libraries-bom](https://github.com/googleapis/java-cloud-bom) from 26.72.0 to 26.73.0.
- [Release notes](https://github.com/googleapis/java-cloud-bom/releases)
- [Changelog](https://github.com/googleapis/java-cloud-bom/blob/main/release-please-config.json)
- [Commits](https://github.com/googleapis/java-cloud-bom/compare/v26.72.0...v26.73.0)

---
updated-dependencies:
- dependency-name: com.google.cloud:libraries-bom
  dependency-version: 26.73.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-12-17 10:14:55 +01:00
dependabot[bot]
ec6820dc25 build(deps): bump org.aspectj:aspectjweaver from 1.9.25 to 1.9.25.1
Bumps [org.aspectj:aspectjweaver](https://github.com/eclipse/org.aspectj) from 1.9.25 to 1.9.25.1.
- [Release notes](https://github.com/eclipse/org.aspectj/releases)
- [Commits](https://github.com/eclipse/org.aspectj/commits)

---
updated-dependencies:
- dependency-name: org.aspectj:aspectjweaver
  dependency-version: 1.9.25.1
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-12-17 09:51:03 +01:00
dependabot[bot]
d94193c143 build(deps): bump software.amazon.awssdk.crt:aws-crt
Bumps [software.amazon.awssdk.crt:aws-crt](https://github.com/awslabs/aws-crt-java) from 0.40.3 to 0.41.0.
- [Release notes](https://github.com/awslabs/aws-crt-java/releases)
- [Commits](https://github.com/awslabs/aws-crt-java/compare/v0.40.3...v0.41.0)

---
updated-dependencies:
- dependency-name: software.amazon.awssdk.crt:aws-crt
  dependency-version: 0.41.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-12-17 09:50:42 +01:00
dependabot[bot]
c9628047fa build(deps): bump io.qameta.allure:allure-bom from 2.31.0 to 2.32.0
Bumps [io.qameta.allure:allure-bom](https://github.com/allure-framework/allure-java) from 2.31.0 to 2.32.0.
- [Release notes](https://github.com/allure-framework/allure-java/releases)
- [Commits](https://github.com/allure-framework/allure-java/compare/2.31.0...2.32.0)

---
updated-dependencies:
- dependency-name: io.qameta.allure:allure-bom
  dependency-version: 2.32.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-12-17 09:50:13 +01:00
dependabot[bot]
4cbc069af4 build(deps): bump nl.basjes.gitignore:gitignore-reader
Bumps [nl.basjes.gitignore:gitignore-reader](https://github.com/nielsbasjes/codeowners) from 1.13.0 to 1.14.1.
- [Release notes](https://github.com/nielsbasjes/codeowners/releases)
- [Changelog](https://github.com/nielsbasjes/codeowners/blob/main/CHANGELOG.md)
- [Commits](https://github.com/nielsbasjes/codeowners/compare/v1.13.0...v1.14.1)

---
updated-dependencies:
- dependency-name: nl.basjes.gitignore:gitignore-reader
  dependency-version: 1.14.1
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-12-17 09:49:11 +01:00
dependabot[bot]
eabe573fe6 build(deps): bump software.amazon.awssdk:bom from 2.40.5 to 2.40.10
Bumps software.amazon.awssdk:bom from 2.40.5 to 2.40.10.

---
updated-dependencies:
- dependency-name: software.amazon.awssdk:bom
  dependency-version: 2.40.10
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-12-17 09:46:17 +01:00
dependabot[bot]
ecd64617c3 build(deps): bump org.testcontainers:junit-jupiter from 1.21.3 to 1.21.4
Bumps [org.testcontainers:junit-jupiter](https://github.com/testcontainers/testcontainers-java) from 1.21.3 to 1.21.4.
- [Release notes](https://github.com/testcontainers/testcontainers-java/releases)
- [Changelog](https://github.com/testcontainers/testcontainers-java/blob/main/CHANGELOG.md)
- [Commits](https://github.com/testcontainers/testcontainers-java/compare/1.21.3...1.21.4)

---
updated-dependencies:
- dependency-name: org.testcontainers:junit-jupiter
  dependency-version: 1.21.4
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-12-17 09:45:26 +01:00
dependabot[bot]
a5650bca0f build(deps): bump org.sonarqube from 7.2.0.6526 to 7.2.1.6560
Bumps org.sonarqube from 7.2.0.6526 to 7.2.1.6560.

---
updated-dependencies:
- dependency-name: org.sonarqube
  dependency-version: 7.2.1.6560
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-12-17 09:44:58 +01:00
dependabot[bot]
ed59e262d4 build(deps): bump org.apache.logging.log4j:log4j-to-slf4j
Bumps org.apache.logging.log4j:log4j-to-slf4j from 2.25.2 to 2.25.3.

---
updated-dependencies:
- dependency-name: org.apache.logging.log4j:log4j-to-slf4j
  dependency-version: 2.25.3
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-12-17 09:44:30 +01:00
dependabot[bot]
a5f9d54f7d build(deps): bump io.pebbletemplates:pebble from 4.0.0 to 4.1.0
Bumps [io.pebbletemplates:pebble](https://github.com/PebbleTemplates/pebble) from 4.0.0 to 4.1.0.
- [Release notes](https://github.com/PebbleTemplates/pebble/releases)
- [Changelog](https://github.com/PebbleTemplates/pebble/blob/master/release.properties)
- [Commits](https://github.com/PebbleTemplates/pebble/compare/4.0.0...4.1.0)

---
updated-dependencies:
- dependency-name: io.pebbletemplates:pebble
  dependency-version: 4.1.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-12-17 09:38:13 +01:00
Aaryan meena
47f4f43198 refactor(core): remove usage of unnecessary i18n composable (#13686)
Closes https://github.com/kestra-io/kestra/issues/13640.

Co-authored-by: MilosPaunovic <paun992@hotmail.com>
2025-12-17 08:23:27 +01:00
dependabot[bot]
5d31c97f7f build(deps): bump the minor group in /ui with 6 updates (#13725)
Bumps the minor group in /ui with 6 updates:

| Package | From | To |
| --- | --- | --- |
| [posthog-js](https://github.com/PostHog/posthog-js) | `1.304.0` | `1.308.0` |
| [shiki](https://github.com/shikijs/shiki/tree/HEAD/packages/shiki) | `3.19.0` | `3.20.0` |
| [@shikijs/markdown-it](https://github.com/shikijs/shiki/tree/HEAD/packages/markdown-it) | `3.19.0` | `3.20.0` |
| [@typescript-eslint/parser](https://github.com/typescript-eslint/typescript-eslint/tree/HEAD/packages/parser) | `8.49.0` | `8.50.0` |
| [sass](https://github.com/sass/dart-sass) | `1.96.0` | `1.97.0` |
| [typescript-eslint](https://github.com/typescript-eslint/typescript-eslint/tree/HEAD/packages/typescript-eslint) | `8.49.0` | `8.50.0` |


Updates `posthog-js` from 1.304.0 to 1.308.0
- [Release notes](https://github.com/PostHog/posthog-js/releases)
- [Changelog](https://github.com/PostHog/posthog-js/blob/main/CHANGELOG.md)
- [Commits](https://github.com/PostHog/posthog-js/compare/posthog-js@1.304.0...posthog-js@1.308.0)

Updates `shiki` from 3.19.0 to 3.20.0
- [Release notes](https://github.com/shikijs/shiki/releases)
- [Commits](https://github.com/shikijs/shiki/commits/v3.20.0/packages/shiki)

Updates `@shikijs/markdown-it` from 3.19.0 to 3.20.0
- [Release notes](https://github.com/shikijs/shiki/releases)
- [Commits](https://github.com/shikijs/shiki/commits/v3.20.0/packages/markdown-it)

Updates `@typescript-eslint/parser` from 8.49.0 to 8.50.0
- [Release notes](https://github.com/typescript-eslint/typescript-eslint/releases)
- [Changelog](https://github.com/typescript-eslint/typescript-eslint/blob/main/packages/parser/CHANGELOG.md)
- [Commits](https://github.com/typescript-eslint/typescript-eslint/commits/v8.50.0/packages/parser)

Updates `sass` from 1.96.0 to 1.97.0
- [Release notes](https://github.com/sass/dart-sass/releases)
- [Changelog](https://github.com/sass/dart-sass/blob/main/CHANGELOG.md)
- [Commits](https://github.com/sass/dart-sass/compare/1.96.0...1.97.0)

Updates `typescript-eslint` from 8.49.0 to 8.50.0
- [Release notes](https://github.com/typescript-eslint/typescript-eslint/releases)
- [Changelog](https://github.com/typescript-eslint/typescript-eslint/blob/main/packages/typescript-eslint/CHANGELOG.md)
- [Commits](https://github.com/typescript-eslint/typescript-eslint/commits/v8.50.0/packages/typescript-eslint)

---
updated-dependencies:
- dependency-name: posthog-js
  dependency-version: 1.308.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: minor
- dependency-name: shiki
  dependency-version: 3.20.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: minor
- dependency-name: "@shikijs/markdown-it"
  dependency-version: 3.20.0
  dependency-type: direct:development
  update-type: version-update:semver-minor
  dependency-group: minor
- dependency-name: "@typescript-eslint/parser"
  dependency-version: 8.50.0
  dependency-type: direct:development
  update-type: version-update:semver-minor
  dependency-group: minor
- dependency-name: sass
  dependency-version: 1.97.0
  dependency-type: direct:development
  update-type: version-update:semver-minor
  dependency-group: minor
- dependency-name: typescript-eslint
  dependency-version: 8.50.0
  dependency-type: direct:development
  update-type: version-update:semver-minor
  dependency-group: minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-12-17 08:18:19 +01:00
dependabot[bot]
f8107285c4 build(deps): bump the patch group in /ui with 5 updates (#13726)
Bumps the patch group in /ui with 5 updates:

| Package | From | To |
| --- | --- | --- |
| [vue-router](https://github.com/vuejs/router) | `4.6.3` | `4.6.4` |
| [@eslint/js](https://github.com/eslint/eslint/tree/HEAD/packages/js) | `9.39.1` | `9.39.2` |
| [@vitejs/plugin-vue](https://github.com/vitejs/vite-plugin-vue/tree/HEAD/packages/plugin-vue) | `6.0.2` | `6.0.3` |
| [eslint](https://github.com/eslint/eslint) | `9.39.1` | `9.39.2` |
| [rolldown-vite](https://github.com/vitejs/rolldown-vite/tree/HEAD/packages/vite) | `7.2.10` | `7.2.11` |


Updates `vue-router` from 4.6.3 to 4.6.4
- [Release notes](https://github.com/vuejs/router/releases)
- [Commits](https://github.com/vuejs/router/compare/v4.6.3...v4.6.4)

Updates `@eslint/js` from 9.39.1 to 9.39.2
- [Release notes](https://github.com/eslint/eslint/releases)
- [Commits](https://github.com/eslint/eslint/commits/v9.39.2/packages/js)

Updates `@vitejs/plugin-vue` from 6.0.2 to 6.0.3
- [Release notes](https://github.com/vitejs/vite-plugin-vue/releases)
- [Changelog](https://github.com/vitejs/vite-plugin-vue/blob/main/packages/plugin-vue/CHANGELOG.md)
- [Commits](https://github.com/vitejs/vite-plugin-vue/commits/plugin-vue@6.0.3/packages/plugin-vue)

Updates `eslint` from 9.39.1 to 9.39.2
- [Release notes](https://github.com/eslint/eslint/releases)
- [Commits](https://github.com/eslint/eslint/compare/v9.39.1...v9.39.2)

Updates `rolldown-vite` from 7.2.10 to 7.2.11
- [Release notes](https://github.com/vitejs/rolldown-vite/releases)
- [Changelog](https://github.com/vitejs/rolldown-vite/blob/rolldown-vite/packages/vite/CHANGELOG.md)
- [Commits](https://github.com/vitejs/rolldown-vite/commits/v7.2.11/packages/vite)

---
updated-dependencies:
- dependency-name: vue-router
  dependency-version: 4.6.4
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch
- dependency-name: "@eslint/js"
  dependency-version: 9.39.2
  dependency-type: direct:development
  update-type: version-update:semver-patch
  dependency-group: patch
- dependency-name: "@vitejs/plugin-vue"
  dependency-version: 6.0.3
  dependency-type: direct:development
  update-type: version-update:semver-patch
  dependency-group: patch
- dependency-name: eslint
  dependency-version: 9.39.2
  dependency-type: direct:development
  update-type: version-update:semver-patch
  dependency-group: patch
- dependency-name: rolldown-vite
  dependency-version: 7.2.11
  dependency-type: direct:development
  update-type: version-update:semver-patch
  dependency-group: patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-12-17 08:13:30 +01:00
dependabot[bot]
8dc8dc1796 build(deps): bump the build group in /ui with 9 updates (#13723)
Bumps the build group in /ui with 9 updates:

| Package | From | To |
| --- | --- | --- |
| [@esbuild/darwin-arm64](https://github.com/evanw/esbuild) | `0.27.1` | `0.27.2` |
| [@esbuild/darwin-x64](https://github.com/evanw/esbuild) | `0.27.1` | `0.27.2` |
| [@esbuild/linux-x64](https://github.com/evanw/esbuild) | `0.27.1` | `0.27.2` |
| [@rollup/rollup-darwin-arm64](https://github.com/rollup/rollup) | `4.53.3` | `4.53.5` |
| [@rollup/rollup-darwin-x64](https://github.com/rollup/rollup) | `4.53.3` | `4.53.5` |
| [@rollup/rollup-linux-x64-gnu](https://github.com/rollup/rollup) | `4.53.3` | `4.53.5` |
| [@swc/core-darwin-arm64](https://github.com/swc-project/swc) | `1.15.3` | `1.15.5` |
| [@swc/core-darwin-x64](https://github.com/swc-project/swc) | `1.15.3` | `1.15.5` |
| [@swc/core-linux-x64-gnu](https://github.com/swc-project/swc) | `1.15.3` | `1.15.5` |


Updates `@esbuild/darwin-arm64` from 0.27.1 to 0.27.2
- [Release notes](https://github.com/evanw/esbuild/releases)
- [Changelog](https://github.com/evanw/esbuild/blob/main/CHANGELOG.md)
- [Commits](https://github.com/evanw/esbuild/compare/v0.27.1...v0.27.2)

Updates `@esbuild/darwin-x64` from 0.27.1 to 0.27.2
- [Release notes](https://github.com/evanw/esbuild/releases)
- [Changelog](https://github.com/evanw/esbuild/blob/main/CHANGELOG.md)
- [Commits](https://github.com/evanw/esbuild/compare/v0.27.1...v0.27.2)

Updates `@esbuild/linux-x64` from 0.27.1 to 0.27.2
- [Release notes](https://github.com/evanw/esbuild/releases)
- [Changelog](https://github.com/evanw/esbuild/blob/main/CHANGELOG.md)
- [Commits](https://github.com/evanw/esbuild/compare/v0.27.1...v0.27.2)

Updates `@rollup/rollup-darwin-arm64` from 4.53.3 to 4.53.5
- [Release notes](https://github.com/rollup/rollup/releases)
- [Changelog](https://github.com/rollup/rollup/blob/master/CHANGELOG.md)
- [Commits](https://github.com/rollup/rollup/compare/v4.53.3...v4.53.5)

Updates `@rollup/rollup-darwin-x64` from 4.53.3 to 4.53.5
- [Release notes](https://github.com/rollup/rollup/releases)
- [Changelog](https://github.com/rollup/rollup/blob/master/CHANGELOG.md)
- [Commits](https://github.com/rollup/rollup/compare/v4.53.3...v4.53.5)

Updates `@rollup/rollup-linux-x64-gnu` from 4.53.3 to 4.53.5
- [Release notes](https://github.com/rollup/rollup/releases)
- [Changelog](https://github.com/rollup/rollup/blob/master/CHANGELOG.md)
- [Commits](https://github.com/rollup/rollup/compare/v4.53.3...v4.53.5)

Updates `@swc/core-darwin-arm64` from 1.15.3 to 1.15.5
- [Release notes](https://github.com/swc-project/swc/releases)
- [Changelog](https://github.com/swc-project/swc/blob/main/CHANGELOG.md)
- [Commits](https://github.com/swc-project/swc/compare/v1.15.3...v1.15.5)

Updates `@swc/core-darwin-x64` from 1.15.3 to 1.15.5
- [Release notes](https://github.com/swc-project/swc/releases)
- [Changelog](https://github.com/swc-project/swc/blob/main/CHANGELOG.md)
- [Commits](https://github.com/swc-project/swc/compare/v1.15.3...v1.15.5)

Updates `@swc/core-linux-x64-gnu` from 1.15.3 to 1.15.5
- [Release notes](https://github.com/swc-project/swc/releases)
- [Changelog](https://github.com/swc-project/swc/blob/main/CHANGELOG.md)
- [Commits](https://github.com/swc-project/swc/compare/v1.15.3...v1.15.5)

---
updated-dependencies:
- dependency-name: "@esbuild/darwin-arm64"
  dependency-version: 0.27.2
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: build
- dependency-name: "@esbuild/darwin-x64"
  dependency-version: 0.27.2
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: build
- dependency-name: "@esbuild/linux-x64"
  dependency-version: 0.27.2
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: build
- dependency-name: "@rollup/rollup-darwin-arm64"
  dependency-version: 4.53.5
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: build
- dependency-name: "@rollup/rollup-darwin-x64"
  dependency-version: 4.53.5
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: build
- dependency-name: "@rollup/rollup-linux-x64-gnu"
  dependency-version: 4.53.5
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: build
- dependency-name: "@swc/core-darwin-arm64"
  dependency-version: 1.15.5
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: build
- dependency-name: "@swc/core-darwin-x64"
  dependency-version: 1.15.5
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: build
- dependency-name: "@swc/core-linux-x64-gnu"
  dependency-version: 1.15.5
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: build
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-12-17 08:09:02 +01:00
dependabot[bot]
834dfd2947 build(deps-dev): bump @types/node in /ui in the types group (#13724)
Bumps the types group in /ui with 1 update: [@types/node](https://github.com/DefinitelyTyped/DefinitelyTyped/tree/HEAD/types/node).


Updates `@types/node` from 25.0.0 to 25.0.3
- [Release notes](https://github.com/DefinitelyTyped/DefinitelyTyped/releases)
- [Commits](https://github.com/DefinitelyTyped/DefinitelyTyped/commits/HEAD/types/node)

---
updated-dependencies:
- dependency-name: "@types/node"
  dependency-version: 25.0.3
  dependency-type: direct:development
  update-type: version-update:semver-patch
  dependency-group: types
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-12-17 08:08:41 +01:00
YannC
6edb88841f feat(jdbc): method without auditlog for setting repository (#13676)
* feat(jdbc): method without auditlog for setting repository

* test: add flaky annotation
2025-12-16 16:38:50 +01:00
Loïc Mathieu
5653531628 fix(test): avoid killing an already killed execution 2025-12-16 14:39:00 +01:00
github-actions[bot]
ee61276106 chore(core): localize to languages other than english (#13698)
Co-authored-by: GitHub Action <actions@github.com>
2025-12-16 14:28:22 +01:00
Barthélémy Ledoux
abcf76f7b4 fix: avoid blocking creation of flow when edition is restricted to a namespace (#13694) 2025-12-16 14:24:16 +01:00
YannC
67ada7f61b fix: remove JsonIgnore annotation from FlowWithSource and add schema(hidden=true) to Flow (#13681) 2025-12-16 14:23:56 +01:00
Florian Hussonnois
0c13633f77 fix(trigger): ScheduleOnDates should work with backfill
Changes:
* ScheduleOnDates must not be re-scheduled when trigger is updated
* ScheduleOnDates must not be scheduled on previous dates when created
* ScheduleOnDates should properly support backfill

Create new SchedulableExecutionFactory class to hold all methods related
to Schedulable trigger and which are only used by core triggers

Related-to: #13673
2025-12-16 13:47:47 +01:00
Loïc Mathieu
a6cf2015ff fix(tests): concurrency test restarted 2025-12-16 13:42:42 +01:00
Sumit Shandillya
2f9216c70b fix(triggers): improve layout of action buttons in trigger table (#13658) 2025-12-16 17:50:46 +05:30
Piyush Bhaskar
1903e6fac5 fix(plugins): avoid list flash when opening plugin (#13690) 2025-12-16 17:38:09 +05:30
Loïc Mathieu
2d2cb00cab feat(execution): bring support for input and output processing in the run context
Part-of: https://github.com/kestra-io/kestra-ee/issues/4228

Encapsulate access the FlowInputOutput from the RunContext in a new InputAndOutput component with a currated list of supported methods used by plugins.
2025-12-16 12:19:48 +01:00
Loïc Mathieu
01b5441d16 feat(trigger): refactor Schedule to not use the application context
Part-of:  https://github.com/kestra-io/kestra-ee/issues/4228
2025-12-16 12:19:30 +01:00
Loïc Mathieu
efc778e294 feat(system): save the edition in settings
This would allow to detect OSS -> EE migration.

Closes https://github.com/kestra-io/kestra-ee/issues/5106
2025-12-16 11:06:01 +01:00
Will Russell
60235a4e73 docs(task-runner): remove deprecated runner from example (#13654) 2025-12-16 10:01:27 +00:00
Piyush Bhaskar
b167c52e76 fix(core): properly sync default namespace filters from settings with default filter (#13685) 2025-12-16 15:30:55 +05:30
Florian Hussonnois
216b124294 feat(trigger): add support for concurrent trigger execution (#311)
Fixes: #311
2025-12-16 09:50:48 +01:00
vamsi172323
b6e4df8de2 refactor(core): remove usage of unnecessary i18n composable (#13683)
Closes https://github.com/kestra-io/kestra/issues/13649.

Co-authored-by: MilosPaunovic <paun992@hotmail.com>
2025-12-16 08:26:27 +01:00
Loïc Mathieu
429e7c7945 feat(execution): allow listing the internal storage from the run context
Part-of: https://github.com/kestra-io/kestra-ee/issues/4228
2025-12-15 18:06:49 +01:00
mustafatarek
e302b4be4a chore(tests): update namings of evaluation tests 2025-12-15 16:11:25 +01:00
mustafatarek
8e7ad9ae25 chore(scheduler): revert back changes at handle() and create failed execution with emitting it directly from the catch block 2025-12-15 16:11:25 +01:00
mustafatarek
41a11abf16 chore(tests): add small notes to tests 2025-12-15 16:11:25 +01:00
mustafatarek
1be16d5e9d feat(tests): add more test coverage for trigger evaluation failure
- This covers failures propagated to evaluateScheduleTrigger() in AbstractScheduler class related to ScheduleTrigger such as (Invalid Expressions, Inputs resolving Issues...)
2025-12-15 16:11:25 +01:00
mustafatarek
e263224d7b fix(core): return backfill check at handleFailedEvaluatedTrigger() 2025-12-15 16:11:25 +01:00
mustafatarek
12b89588a6 fix(core): add failed execution with logs when scheduled triggers fail during evaluation 2025-12-15 16:11:25 +01:00
Loïc Mathieu
eae5eb80cb fix(test): use a separate tenant for each test 2025-12-15 15:41:21 +01:00
Loïc Mathieu
c0f6298484 feat(system)!: change logger name and disable flow logger by default
Change system logger name:
- execution -> executor
- trigger -> scheduler
- task -> worker

Add tenant and namespace in the name of loggers.

Disable by default the flow execution logger.
2025-12-15 15:41:09 +01:00
Barthélémy Ledoux
ba1d6b2232 fix: executing validation twice should display 2 errors (#13670) 2025-12-15 14:02:37 +01:00
Pratik Dey
048dcb80cc fix: Webhook-triggered executions do not generate system.correlationId label while direct API executions do 2025-12-15 12:37:23 +01:00
yuri
a81de811d7 feat(ui): make log buttons friendlier (#13404)
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
Co-authored-by: Barthélémy Ledoux <bledoux@kestra.io>
2025-12-15 10:58:36 +01:00
Loïc Mathieu
a960a9f982 feat(plugin): bring cloneForPlugin to the RunContext
To replace the usage of the RunContextInitializer for that as plugins using another plugin needs it.

Part-of: https://github.com/kestra-io/kestra-ee/issues/4228
2025-12-15 09:58:54 +01:00
Miloš Paunović
c4d4fd935f chore(flows): make trigger icon not a button (#13666)
Closes https://github.com/kestra-io/kestra/issues/13634.
2025-12-15 09:31:22 +01:00
Suraj
f063a5a2d9 refactor(core): remove usage of unnecessary i18n composable (#13663)
Closes https://github.com/kestra-io/kestra/issues/13652.

Co-authored-by: MilosPaunovic <paun992@hotmail.com>
2025-12-15 08:06:22 +01:00
Mohd Toukir Khan
ac91d5605f refactor(core): remove usage of unnecessary i18n composable (#13662)
Closes https://github.com/kestra-io/kestra/issues/13650.

Co-authored-by: Mohd Toukir Khan <Toukir@MacBook-Air-2.local>
Co-authored-by: MilosPaunovic <paun992@hotmail.com>
2025-12-15 08:03:39 +01:00
Madhav Kaushik
e3d3c3651b refactor(core): remove usage of unnecessary i18n composable (#13661)
Closes https://github.com/kestra-io/kestra/issues/13651.

Co-authored-by: MilosPaunovic <paun992@hotmail.com>
2025-12-15 08:00:48 +01:00
Sumit Shandillya
5b6836237e refactor(core): remove usage of unnecessary i18n composable (#13643)
Closes https://github.com/kestra-io/kestra/issues/13639.

Signed-off-by: Sumitsh28 <sumit.off28@gmail.com>
Co-authored-by: MilosPaunovic <paun992@hotmail.com>
2025-12-12 15:38:39 +01:00
Lee Kyeong Joon
2f8284b133 refactor(core): remove usage of unnecessary i18n composable (#13645)
Closes https://github.com/kestra-io/kestra/issues/13590.

Co-authored-by: MilosPaunovic <paun992@hotmail.com>
2025-12-12 15:36:50 +01:00
Siva Sai
42992fd7c3 fix(tests): add multiselect input tests for default and provided values 2025-12-12 14:45:44 +01:00
Siva Sai
3a481f93d3 fix(triggers): resolve MULTISELECT input defaults failing on scheduled executions 2025-12-12 14:45:44 +01:00
pengpeng
7e964ae563 fix(core): align login inputs in center (#13532)
Co-authored-by: Piyush Bhaskar <102078527+Piyush-r-bhaskar@users.noreply.github.com>
2025-12-12 19:08:46 +05:30
Saif M
25e54edbc9 feat(core): always bundle the fonts into the build (#13624)
Closes https://github.com/kestra-io/kestra/issues/13599.

Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-12-12 13:46:28 +01:00
Ameen PJ
e88dc7af76 fix(core) : removed top border for the first log entry (#13621) 2025-12-12 18:16:20 +05:30
Miloš Paunović
b7a027f0dc feat(system): display concurrency limits page based on property from endpoint (#13633)
Closes https://github.com/kestra-io/kestra-ee/issues/5882.
2025-12-12 09:51:58 +01:00
Miloš Paunović
98141d6010 chore(core): amend preprocessor options in vite config (#13632) 2025-12-12 09:48:34 +01:00
Miloš Paunović
bf119ab6df chore(core): align font sizes with the new scss variables from ui-libs (#13598) 2025-12-12 09:01:35 +01:00
Barthélémy Ledoux
9bd6353b77 fix: clicking on a plugin should access the plugin page (#13628)
Co-authored-by: Piyush Bhaskar <102078527+Piyush-r-bhaskar@users.noreply.github.com>
2025-12-11 22:13:07 +01:00
Roman Acevedo
c0ab581cf1 ci: use npm ci instead of install 2025-12-11 18:53:15 +01:00
Loïc Mathieu
0f38e19663 chore(system): refactor NamespaceFilesUtils as a static class
Part-of: https://github.com/kestra-io/kestra-ee/issues/4228
2025-12-11 18:16:07 +01:00
Barthélémy Ledoux
0c14ea621c fix: if multiple definition of a task, call server (#13545) 2025-12-11 17:18:41 +01:00
Malay Dewangan
fb14e57a7c feat(plugin): add title and description to plugin cls 2025-12-11 21:26:11 +05:30
dependabot[bot]
09c707d865 build(deps): bump the minor group in /ui with 15 updates (#13616)
Bumps the minor group in /ui with 15 updates:

| Package | From | To |
| --- | --- | --- |
| [@vue-flow/core](https://github.com/bcakmakoglu/vue-flow/tree/HEAD/packages/core) | `1.47.0` | `1.48.0` |
| [@vueuse/core](https://github.com/vueuse/vueuse/tree/HEAD/packages/core) | `14.0.0` | `14.1.0` |
| [element-plus](https://github.com/element-plus/element-plus) | `2.11.8` | `2.12.0` |
| [posthog-js](https://github.com/PostHog/posthog-js) | `1.296.0` | `1.304.0` |
| [shiki](https://github.com/shikijs/shiki/tree/HEAD/packages/shiki) | `3.15.0` | `3.19.0` |
| [vue-sidebar-menu](https://github.com/yaminncco/vue-sidebar-menu) | `5.8.0` | `5.9.1` |
| [@playwright/test](https://github.com/microsoft/playwright) | `1.56.1` | `1.57.0` |
| [@shikijs/markdown-it](https://github.com/shikijs/shiki/tree/HEAD/packages/markdown-it) | `3.15.0` | `3.19.0` |
| [@typescript-eslint/parser](https://github.com/typescript-eslint/typescript-eslint/tree/HEAD/packages/parser) | `8.47.0` | `8.49.0` |
| [@vueuse/router](https://github.com/vueuse/vueuse/tree/HEAD/packages/router) | `14.0.0` | `14.1.0` |
| [jsdom](https://github.com/jsdom/jsdom) | `27.2.0` | `27.3.0` |
| [playwright](https://github.com/microsoft/playwright) | `1.56.1` | `1.57.0` |
| [prettier](https://github.com/prettier/prettier) | `3.6.2` | `3.7.4` |
| [sass](https://github.com/sass/dart-sass) | `1.94.1` | `1.96.0` |
| [typescript-eslint](https://github.com/typescript-eslint/typescript-eslint/tree/HEAD/packages/typescript-eslint) | `8.47.0` | `8.49.0` |


Updates `@vue-flow/core` from 1.47.0 to 1.48.0
- [Release notes](https://github.com/bcakmakoglu/vue-flow/releases)
- [Changelog](https://github.com/bcakmakoglu/vue-flow/blob/master/packages/core/CHANGELOG.md)
- [Commits](https://github.com/bcakmakoglu/vue-flow/commits/@vue-flow/core@1.48.0/packages/core)

Updates `@vueuse/core` from 14.0.0 to 14.1.0
- [Release notes](https://github.com/vueuse/vueuse/releases)
- [Commits](https://github.com/vueuse/vueuse/commits/v14.1.0/packages/core)

Updates `element-plus` from 2.11.8 to 2.12.0
- [Release notes](https://github.com/element-plus/element-plus/releases)
- [Changelog](https://github.com/element-plus/element-plus/blob/dev/CHANGELOG.en-US.md)
- [Commits](https://github.com/element-plus/element-plus/compare/2.11.8...2.12.0)

Updates `posthog-js` from 1.296.0 to 1.304.0
- [Release notes](https://github.com/PostHog/posthog-js/releases)
- [Changelog](https://github.com/PostHog/posthog-js/blob/main/CHANGELOG.md)
- [Commits](https://github.com/PostHog/posthog-js/compare/posthog-js@1.296.0...posthog-js@1.304.0)

Updates `shiki` from 3.15.0 to 3.19.0
- [Release notes](https://github.com/shikijs/shiki/releases)
- [Commits](https://github.com/shikijs/shiki/commits/v3.19.0/packages/shiki)

Updates `vue-sidebar-menu` from 5.8.0 to 5.9.1
- [Commits](https://github.com/yaminncco/vue-sidebar-menu/commits)

Updates `@playwright/test` from 1.56.1 to 1.57.0
- [Release notes](https://github.com/microsoft/playwright/releases)
- [Commits](https://github.com/microsoft/playwright/compare/v1.56.1...v1.57.0)

Updates `@shikijs/markdown-it` from 3.15.0 to 3.19.0
- [Release notes](https://github.com/shikijs/shiki/releases)
- [Commits](https://github.com/shikijs/shiki/commits/v3.19.0/packages/markdown-it)

Updates `@typescript-eslint/parser` from 8.47.0 to 8.49.0
- [Release notes](https://github.com/typescript-eslint/typescript-eslint/releases)
- [Changelog](https://github.com/typescript-eslint/typescript-eslint/blob/main/packages/parser/CHANGELOG.md)
- [Commits](https://github.com/typescript-eslint/typescript-eslint/commits/v8.49.0/packages/parser)

Updates `@vueuse/router` from 14.0.0 to 14.1.0
- [Release notes](https://github.com/vueuse/vueuse/releases)
- [Commits](https://github.com/vueuse/vueuse/commits/v14.1.0/packages/router)

Updates `jsdom` from 27.2.0 to 27.3.0
- [Release notes](https://github.com/jsdom/jsdom/releases)
- [Changelog](https://github.com/jsdom/jsdom/blob/main/Changelog.md)
- [Commits](https://github.com/jsdom/jsdom/compare/27.2.0...27.3.0)

Updates `playwright` from 1.56.1 to 1.57.0
- [Release notes](https://github.com/microsoft/playwright/releases)
- [Commits](https://github.com/microsoft/playwright/compare/v1.56.1...v1.57.0)

Updates `prettier` from 3.6.2 to 3.7.4
- [Release notes](https://github.com/prettier/prettier/releases)
- [Changelog](https://github.com/prettier/prettier/blob/main/CHANGELOG.md)
- [Commits](https://github.com/prettier/prettier/compare/3.6.2...3.7.4)

Updates `sass` from 1.94.1 to 1.96.0
- [Release notes](https://github.com/sass/dart-sass/releases)
- [Changelog](https://github.com/sass/dart-sass/blob/main/CHANGELOG.md)
- [Commits](https://github.com/sass/dart-sass/compare/1.94.1...1.96.0)

Updates `typescript-eslint` from 8.47.0 to 8.49.0
- [Release notes](https://github.com/typescript-eslint/typescript-eslint/releases)
- [Changelog](https://github.com/typescript-eslint/typescript-eslint/blob/main/packages/typescript-eslint/CHANGELOG.md)
- [Commits](https://github.com/typescript-eslint/typescript-eslint/commits/v8.49.0/packages/typescript-eslint)

---
updated-dependencies:
- dependency-name: "@vue-flow/core"
  dependency-version: 1.48.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: minor
- dependency-name: "@vueuse/core"
  dependency-version: 14.1.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: minor
- dependency-name: element-plus
  dependency-version: 2.12.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: minor
- dependency-name: posthog-js
  dependency-version: 1.304.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: minor
- dependency-name: shiki
  dependency-version: 3.19.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: minor
- dependency-name: vue-sidebar-menu
  dependency-version: 5.9.1
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: minor
- dependency-name: "@playwright/test"
  dependency-version: 1.57.0
  dependency-type: direct:development
  update-type: version-update:semver-minor
  dependency-group: minor
- dependency-name: "@shikijs/markdown-it"
  dependency-version: 3.19.0
  dependency-type: direct:development
  update-type: version-update:semver-minor
  dependency-group: minor
- dependency-name: "@typescript-eslint/parser"
  dependency-version: 8.49.0
  dependency-type: direct:development
  update-type: version-update:semver-minor
  dependency-group: minor
- dependency-name: "@vueuse/router"
  dependency-version: 14.1.0
  dependency-type: direct:development
  update-type: version-update:semver-minor
  dependency-group: minor
- dependency-name: jsdom
  dependency-version: 27.3.0
  dependency-type: direct:development
  update-type: version-update:semver-minor
  dependency-group: minor
- dependency-name: playwright
  dependency-version: 1.57.0
  dependency-type: direct:development
  update-type: version-update:semver-minor
  dependency-group: minor
- dependency-name: prettier
  dependency-version: 3.7.4
  dependency-type: direct:development
  update-type: version-update:semver-minor
  dependency-group: minor
- dependency-name: sass
  dependency-version: 1.96.0
  dependency-type: direct:development
  update-type: version-update:semver-minor
  dependency-group: minor
- dependency-name: typescript-eslint
  dependency-version: 8.49.0
  dependency-type: direct:development
  update-type: version-update:semver-minor
  dependency-group: minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-12-11 15:36:31 +01:00
Loïc Mathieu
86e08d71dd fix(test): makes TracesTest.runningAFlowShouldGenerateTraces more tolerant on the number of traces
Rarely, in the CI, only 6 traces are seen most probably due to the asynchronous nature of OpenTelemetry
2025-12-11 14:55:46 +01:00
Miloš Paunović
94c00cedeb build(deps): improve storybook related grouping of dependabot pull requests (#13618) 2025-12-11 14:54:37 +01:00
Loïc Mathieu
eb12832b1e feat(system): add a boolean in the config to know if the concurrency view is enabled
Part-of: https://github.com/kestra-io/kestra-ee/issues/5882
2025-12-11 14:54:09 +01:00
Loïc Mathieu
687cefdfb9 fix(tests): use a different tenant for each concurrency test 2025-12-11 14:34:04 +01:00
Loïc Mathieu
8eae8aba72 feat(executions): add a protection mecanism to avoid any potential concurrency overflow
Concurrency limit is based on a counter that increment and decrement the limit each time a flow is started and terminated.

This count should always be accurate.

But if some unexpected event occurs (bug or user manually do something wrong), the count may not be accurate anymore.

To avoid any potential issue, when we decrement the counter, we chech that concurrency count is bellow the limit before unqueing an execution.

Fixes #12031
Closes  #13301
2025-12-11 14:33:30 +01:00
dependabot[bot]
abdbb8d364 build(deps-dev): bump @types/node in /ui in the types group (#13613)
Bumps the types group in /ui with 1 update: [@types/node](https://github.com/DefinitelyTyped/DefinitelyTyped/tree/HEAD/types/node).


Updates `@types/node` from 24.10.2 to 25.0.0
- [Release notes](https://github.com/DefinitelyTyped/DefinitelyTyped/releases)
- [Commits](https://github.com/DefinitelyTyped/DefinitelyTyped/commits/HEAD/types/node)

---
updated-dependencies:
- dependency-name: "@types/node"
  dependency-version: 25.0.0
  dependency-type: direct:development
  update-type: version-update:semver-major
  dependency-group: types
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-12-11 14:10:45 +01:00
Barthélémy Ledoux
8a55ab3af6 fix: always load the right version of plugin docs (#13571) 2025-12-11 14:06:10 +01:00
dependabot[bot]
b7cb933e1e build(deps): bump the patch group in /ui with 9 updates (#13568)
Bumps the patch group in /ui with 9 updates:

| Package | From | To |
| --- | --- | --- |
| @kestra-io/ui-libs | `0.0.264` | `0.0.266` |
| [humanize-duration](https://github.com/EvanHahn/HumanizeDuration.js) | `3.33.1` | `3.33.2` |
| [pdfjs-dist](https://github.com/mozilla/pdf.js) | `5.4.394` | `5.4.449` |
| [vue](https://github.com/vuejs/core) | `3.5.24` | `3.5.25` |
| [yaml](https://github.com/eemeli/yaml) | `2.8.1` | `2.8.2` |
| [lint-staged](https://github.com/lint-staged/lint-staged) | `16.2.6` | `16.2.7` |
| [rimraf](https://github.com/isaacs/rimraf) | `6.1.0` | `6.1.2` |
| [rolldown-vite](https://github.com/vitejs/rolldown-vite/tree/HEAD/packages/vite) | `7.2.6` | `7.2.10` |
| [vue-tsc](https://github.com/vuejs/language-tools/tree/HEAD/packages/tsc) | `3.1.4` | `3.1.8` |


Updates `@kestra-io/ui-libs` from 0.0.264 to 0.0.266

Updates `humanize-duration` from 3.33.1 to 3.33.2
- [Changelog](https://github.com/EvanHahn/HumanizeDuration.js/blob/main/HISTORY.md)
- [Commits](https://github.com/EvanHahn/HumanizeDuration.js/compare/v3.33.1...v3.33.2)

Updates `pdfjs-dist` from 5.4.394 to 5.4.449
- [Release notes](https://github.com/mozilla/pdf.js/releases)
- [Commits](https://github.com/mozilla/pdf.js/compare/v5.4.394...v5.4.449)

Updates `vue` from 3.5.24 to 3.5.25
- [Release notes](https://github.com/vuejs/core/releases)
- [Changelog](https://github.com/vuejs/core/blob/main/CHANGELOG.md)
- [Commits](https://github.com/vuejs/core/compare/v3.5.24...v3.5.25)

Updates `yaml` from 2.8.1 to 2.8.2
- [Release notes](https://github.com/eemeli/yaml/releases)
- [Commits](https://github.com/eemeli/yaml/compare/v2.8.1...v2.8.2)

Updates `lint-staged` from 16.2.6 to 16.2.7
- [Release notes](https://github.com/lint-staged/lint-staged/releases)
- [Changelog](https://github.com/lint-staged/lint-staged/blob/main/CHANGELOG.md)
- [Commits](https://github.com/lint-staged/lint-staged/compare/v16.2.6...v16.2.7)

Updates `rimraf` from 6.1.0 to 6.1.2
- [Changelog](https://github.com/isaacs/rimraf/blob/main/CHANGELOG.md)
- [Commits](https://github.com/isaacs/rimraf/compare/v6.1.0...v6.1.2)

Updates `rolldown-vite` from 7.2.6 to 7.2.10
- [Release notes](https://github.com/vitejs/rolldown-vite/releases)
- [Changelog](https://github.com/vitejs/rolldown-vite/blob/rolldown-vite/packages/vite/CHANGELOG.md)
- [Commits](https://github.com/vitejs/rolldown-vite/commits/v7.2.10/packages/vite)

Updates `vue-tsc` from 3.1.4 to 3.1.8
- [Release notes](https://github.com/vuejs/language-tools/releases)
- [Changelog](https://github.com/vuejs/language-tools/blob/master/CHANGELOG.md)
- [Commits](https://github.com/vuejs/language-tools/commits/v3.1.8/packages/tsc)

---
updated-dependencies:
- dependency-name: "@kestra-io/ui-libs"
  dependency-version: 0.0.266
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch
- dependency-name: humanize-duration
  dependency-version: 3.33.2
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch
- dependency-name: pdfjs-dist
  dependency-version: 5.4.449
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch
- dependency-name: vue
  dependency-version: 3.5.25
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch
- dependency-name: yaml
  dependency-version: 2.8.2
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch
- dependency-name: lint-staged
  dependency-version: 16.2.7
  dependency-type: direct:development
  update-type: version-update:semver-patch
  dependency-group: patch
- dependency-name: rimraf
  dependency-version: 6.1.2
  dependency-type: direct:development
  update-type: version-update:semver-patch
  dependency-group: patch
- dependency-name: rolldown-vite
  dependency-version: 7.2.10
  dependency-type: direct:development
  update-type: version-update:semver-patch
  dependency-group: patch
- dependency-name: vue-tsc
  dependency-version: 3.1.8
  dependency-type: direct:development
  update-type: version-update:semver-patch
  dependency-group: patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-12-11 13:47:09 +01:00
Shivansh Sharma
3af003e5e4 chore(executions): properly handle the label create/update sequence (#13500)
Closes https://github.com/kestra-io/kestra/issues/13498.

Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-12-11 12:51:59 +01:00
Loïc Mathieu
c3861a5532 fix(system): merging collections should not duplicate items
Fixes https://github.com/kestra-io/kestra-ee/issues/6053
2025-12-11 12:14:48 +01:00
Piyush Bhaskar
ae1f10f45a refactor(core): remove the configurattion details step (#13606) 2025-12-11 16:44:32 +05:30
Ashutosh Jha
612dccfb8c refactor(core): remove usage of unnecessary i18n composable (#13607)
Closes https://github.com/kestra-io/kestra/issues/13589.

Co-authored-by: MilosPaunovic <paun992@hotmail.com>
2025-12-11 12:04:46 +01:00
Ameen PJ
2ae8df2f5f refactor(core): remove usage of unnecessary i18n composable (#13604)
Closes https://github.com/kestra-io/kestra/issues/13588.

Co-authored-by: ameenpj <ameenjami9@gmail.com>
Co-authored-by: MilosPaunovic <paun992@hotmail.com>
2025-12-11 12:01:32 +01:00
Saif M
1abfa74a16 chore(flows): add links to executions on the flows listing table (#13540)
Closes https://github.com/kestra-io/kestra/issues/13536.

Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-12-11 11:17:48 +01:00
195 changed files with 3060 additions and 1687 deletions

View File

@@ -51,7 +51,7 @@ updates:
storybook: storybook:
applies-to: version-updates applies-to: version-updates
patterns: ["storybook*", "@storybook/*"] patterns: ["storybook*", "@storybook/*", "eslint-plugin-storybook"]
vitest: vitest:
applies-to: version-updates applies-to: version-updates
@@ -67,10 +67,10 @@ updates:
"@types/*", "@types/*",
"storybook*", "storybook*",
"@storybook/*", "@storybook/*",
"eslint-plugin-storybook",
"vitest", "vitest",
"@vitest/*", "@vitest/*",
# Temporary exclusion of these packages from major updates # Temporary exclusion of these packages from major updates
"eslint-plugin-storybook",
"eslint-plugin-vue", "eslint-plugin-vue",
] ]
@@ -84,6 +84,7 @@ updates:
"@types/*", "@types/*",
"storybook*", "storybook*",
"@storybook/*", "@storybook/*",
"eslint-plugin-storybook",
"vitest", "vitest",
"@vitest/*", "@vitest/*",
# Temporary exclusion of these packages from minor updates # Temporary exclusion of these packages from minor updates
@@ -102,6 +103,7 @@ updates:
"@types/*", "@types/*",
"storybook*", "storybook*",
"@storybook/*", "@storybook/*",
"eslint-plugin-storybook",
"vitest", "vitest",
"@vitest/*", "@vitest/*",
] ]

View File

@@ -43,7 +43,7 @@ jobs:
# Upload dependency check report # Upload dependency check report
- name: Upload dependency check report - name: Upload dependency check report
uses: actions/upload-artifact@v5 uses: actions/upload-artifact@v6
if: ${{ always() }} if: ${{ always() }}
with: with:
name: dependency-check-report name: dependency-check-report

View File

@@ -29,8 +29,8 @@ start_time2=$(date +%s)
echo "cd ./ui" echo "cd ./ui"
cd ./ui cd ./ui
echo "npm i" echo "npm ci"
npm i npm ci
echo 'sh ./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"' echo 'sh ./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"'
./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION" ./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"

View File

@@ -21,7 +21,7 @@ plugins {
// test // test
id "com.adarshr.test-logger" version "4.0.0" id "com.adarshr.test-logger" version "4.0.0"
id "org.sonarqube" version "7.2.0.6526" id "org.sonarqube" version "7.2.1.6560"
id 'jacoco-report-aggregation' id 'jacoco-report-aggregation'
// helper // helper
@@ -331,7 +331,7 @@ subprojects {
} }
dependencies { dependencies {
agent "org.aspectj:aspectjweaver:1.9.25" agent "org.aspectj:aspectjweaver:1.9.25.1"
} }
test { test {

View File

@@ -137,6 +137,11 @@ flyway:
# We must ignore missing migrations as we delete some wrong or not used anymore migrations # We must ignore missing migrations as we delete some wrong or not used anymore migrations
ignore-migration-patterns: "*:missing,*:future" ignore-migration-patterns: "*:missing,*:future"
out-of-order: true out-of-order: true
properties:
flyway:
postgresql:
transactional:
lock: false
mysql: mysql:
enabled: true enabled: true
locations: locations:

View File

@@ -82,8 +82,8 @@ dependencies {
testImplementation "io.micronaut:micronaut-http-server-netty" testImplementation "io.micronaut:micronaut-http-server-netty"
testImplementation "io.micronaut:micronaut-management" testImplementation "io.micronaut:micronaut-management"
testImplementation "org.testcontainers:testcontainers:1.21.3" testImplementation "org.testcontainers:testcontainers:1.21.4"
testImplementation "org.testcontainers:junit-jupiter:1.21.3" testImplementation "org.testcontainers:junit-jupiter:1.21.4"
testImplementation "org.bouncycastle:bcpkix-jdk18on" testImplementation "org.bouncycastle:bcpkix-jdk18on"
testImplementation "org.wiremock:wiremock-jetty12" testImplementation "org.wiremock:wiremock-jetty12"

View File

@@ -3,6 +3,7 @@ package io.kestra.core.docs;
import io.kestra.core.models.annotations.PluginSubGroup; import io.kestra.core.models.annotations.PluginSubGroup;
import io.kestra.core.plugins.RegisteredPlugin; import io.kestra.core.plugins.RegisteredPlugin;
import io.micronaut.core.annotation.Nullable; import io.micronaut.core.annotation.Nullable;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
@@ -117,10 +118,17 @@ public class Plugin {
.filter(not(io.kestra.core.models.Plugin::isInternal)) .filter(not(io.kestra.core.models.Plugin::isInternal))
.filter(clazzFilter) .filter(clazzFilter)
.filter(c -> !c.getName().startsWith("org.kestra.")) .filter(c -> !c.getName().startsWith("org.kestra."))
.map(c -> new PluginElementMetadata(c.getName(), io.kestra.core.models.Plugin.isDeprecated(c) ? true : null)) .map(c -> {
Schema schema = c.getAnnotation(Schema.class);
var title = Optional.ofNullable(schema).map(Schema::title).filter(t -> !t.isEmpty()).orElse(null);
var description = Optional.ofNullable(schema).map(Schema::description).filter(d -> !d.isEmpty()).orElse(null);
var deprecated = io.kestra.core.models.Plugin.isDeprecated(c) ? true : null;
return new PluginElementMetadata(c.getName(), deprecated, title, description);
})
.toList(); .toList();
} }
public record PluginElementMetadata(String cls, Boolean deprecated) { public record PluginElementMetadata(String cls, Boolean deprecated, String title, String description) {}
}
} }

View File

@@ -0,0 +1,37 @@
package io.kestra.core.exceptions;
import io.kestra.core.models.flows.Data;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.Output;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Exception that can be thrown when Inputs/Outputs have validation problems.
*/
public class InputOutputValidationException extends KestraRuntimeException {
public InputOutputValidationException(String message) {
super(message);
}
public static InputOutputValidationException of( String message, Input<?> input){
String inputMessage = "Invalid value for input" + " `" + input.getId() + "`. Cause: " + message;
return new InputOutputValidationException(inputMessage);
}
public static InputOutputValidationException of( String message, Output output){
String outputMessage = "Invalid value for output" + " `" + output.getId() + "`. Cause: " + message;
return new InputOutputValidationException(outputMessage);
}
public static InputOutputValidationException of(String message){
return new InputOutputValidationException(message);
}
public static InputOutputValidationException merge(Set<InputOutputValidationException> exceptions){
String combinedMessage = exceptions.stream()
.map(InputOutputValidationException::getMessage)
.collect(Collectors.joining(System.lineSeparator()));
throw new InputOutputValidationException(combinedMessage);
}
}

View File

@@ -1,6 +1,8 @@
package io.kestra.core.exceptions; package io.kestra.core.exceptions;
import java.io.Serial; import java.io.Serial;
import java.util.List;
import java.util.stream.Collectors;
/** /**
* The top-level {@link KestraRuntimeException} for non-recoverable errors. * The top-level {@link KestraRuntimeException} for non-recoverable errors.

View File

@@ -1,5 +0,0 @@
package io.kestra.core.models;
public interface DeletedInterface {
boolean isDeleted();
}

View File

@@ -26,6 +26,7 @@ public record Label(
public static final String REPLAYED = SYSTEM_PREFIX + "replayed"; public static final String REPLAYED = SYSTEM_PREFIX + "replayed";
public static final String SIMULATED_EXECUTION = SYSTEM_PREFIX + "simulatedExecution"; public static final String SIMULATED_EXECUTION = SYSTEM_PREFIX + "simulatedExecution";
public static final String TEST = SYSTEM_PREFIX + "test"; public static final String TEST = SYSTEM_PREFIX + "test";
public static final String FROM = SYSTEM_PREFIX + "from";
/** /**
* Static helper method for converting a list of labels to a nested map. * Static helper method for converting a list of labels to a nested map.

View File

@@ -16,6 +16,7 @@ import jakarta.validation.constraints.NotNull;
public class Setting { public class Setting {
public static final String INSTANCE_UUID = "instance.uuid"; public static final String INSTANCE_UUID = "instance.uuid";
public static final String INSTANCE_VERSION = "instance.version"; public static final String INSTANCE_VERSION = "instance.version";
public static final String INSTANCE_EDITION = "instance.edition";
@NotNull @NotNull
private String key; private String key;

View File

@@ -0,0 +1,18 @@
package io.kestra.core.models;
/**
* This interface marks entities that implement a soft deletion mechanism.
* Soft deletion is based on a <code>deleted</code> field that is set to <code>true</code> when the entity is deleted.
* Physical deletion either never occurs or occurs in a dedicated purge mechanism.
*/
public interface SoftDeletable<T> {
/**
* Whether en entity is deleted or not.
*/
boolean isDeleted();
/**
* Delete the current entity: set its <code>deleted</code> field to <code>true</code>.
*/
T toDeleted();
}

View File

@@ -1,7 +1,7 @@
package io.kestra.core.models.dashboards; package io.kestra.core.models.dashboards;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import io.kestra.core.models.DeletedInterface; import io.kestra.core.models.SoftDeletable;
import io.kestra.core.models.HasUID; import io.kestra.core.models.HasUID;
import io.kestra.core.models.dashboards.charts.Chart; import io.kestra.core.models.dashboards.charts.Chart;
import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.IdUtils;
@@ -26,7 +26,7 @@ import java.util.Objects;
@NoArgsConstructor @NoArgsConstructor
@Introspected @Introspected
@ToString @ToString
public class Dashboard implements HasUID, DeletedInterface { public class Dashboard implements HasUID, SoftDeletable<Dashboard> {
@Hidden @Hidden
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*") @Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
private String tenantId; private String tenantId;
@@ -71,6 +71,7 @@ public class Dashboard implements HasUID, DeletedInterface {
); );
} }
@Override
public Dashboard toDeleted() { public Dashboard toDeleted() {
return this.toBuilder() return this.toBuilder()
.deleted(true) .deleted(true)

View File

@@ -11,7 +11,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Streams; import com.google.common.collect.Streams;
import io.kestra.core.debug.Breakpoint; import io.kestra.core.debug.Breakpoint;
import io.kestra.core.exceptions.InternalException; import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.DeletedInterface; import io.kestra.core.models.SoftDeletable;
import io.kestra.core.models.Label; import io.kestra.core.models.Label;
import io.kestra.core.models.TenantInterface; import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.Flow;
@@ -53,7 +53,7 @@ import java.util.zip.CRC32;
@AllArgsConstructor @AllArgsConstructor
@ToString @ToString
@EqualsAndHashCode @EqualsAndHashCode
public class Execution implements DeletedInterface, TenantInterface { public class Execution implements SoftDeletable<Execution>, TenantInterface {
@With @With
@Hidden @Hidden
@@ -1111,7 +1111,7 @@ public class Execution implements DeletedInterface, TenantInterface {
.toList(); .toList();
} }
@Override
public Execution toDeleted() { public Execution toDeleted() {
return this.toBuilder() return this.toBuilder()
.deleted(true) .deleted(true)

View File

@@ -1,7 +1,6 @@
package io.kestra.core.models.executions; package io.kestra.core.models.executions;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.TenantInterface; import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.flows.FlowInterface; import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.models.triggers.AbstractTrigger;
@@ -22,7 +21,7 @@ import java.util.stream.Stream;
@Value @Value
@Builder(toBuilder = true) @Builder(toBuilder = true)
public class LogEntry implements DeletedInterface, TenantInterface { public class LogEntry implements TenantInterface {
@Hidden @Hidden
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*") @Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
String tenantId; String tenantId;
@@ -57,10 +56,6 @@ public class LogEntry implements DeletedInterface, TenantInterface {
String message; String message;
@NotNull
@Builder.Default
boolean deleted = false;
@Nullable @Nullable
ExecutionKind executionKind; ExecutionKind executionKind;

View File

@@ -1,7 +1,6 @@
package io.kestra.core.models.executions; package io.kestra.core.models.executions;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.TenantInterface; import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.executions.metrics.Counter; import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.executions.metrics.Gauge; import io.kestra.core.models.executions.metrics.Gauge;
@@ -18,7 +17,7 @@ import jakarta.validation.constraints.Pattern;
@Value @Value
@Builder(toBuilder = true) @Builder(toBuilder = true)
public class MetricEntry implements DeletedInterface, TenantInterface { public class MetricEntry implements TenantInterface {
@Hidden @Hidden
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*") @Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
String tenantId; String tenantId;
@@ -54,10 +53,6 @@ public class MetricEntry implements DeletedInterface, TenantInterface {
@Nullable @Nullable
Map<String, String> tags; Map<String, String> tags;
@NotNull
@Builder.Default
boolean deleted = false;
@Nullable @Nullable
ExecutionKind executionKind; ExecutionKind executionKind;

View File

@@ -3,9 +3,7 @@ package io.kestra.core.models.executions;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.models.TenantInterface; import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.flows.State; import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.ResolvedTask; import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.retrys.AbstractRetry; import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.IdUtils;
import io.swagger.v3.oas.annotations.Hidden; import io.swagger.v3.oas.annotations.Hidden;
@@ -95,8 +93,16 @@ public class TaskRun implements TenantInterface {
this.forceExecution this.forceExecution
); );
} }
public TaskRun withStateAndAttempt(State.Type state) {
List<TaskRunAttempt> newAttempts = new ArrayList<>(this.attempts != null ? this.attempts : List.of());
if (newAttempts.isEmpty()) {
newAttempts.add(TaskRunAttempt.builder().state(new State(state)).build());
} else {
TaskRunAttempt updatedLast = newAttempts.getLast().withState(state);
newAttempts.set(newAttempts.size() - 1, updatedLast);
}
public TaskRun replaceState(State newState) {
return new TaskRun( return new TaskRun(
this.tenantId, this.tenantId,
this.id, this.id,
@@ -106,9 +112,9 @@ public class TaskRun implements TenantInterface {
this.taskId, this.taskId,
this.parentTaskRunId, this.parentTaskRunId,
this.value, this.value,
this.attempts, newAttempts,
this.outputs, this.outputs,
newState, this.state.withState(state),
this.iteration, this.iteration,
this.dynamic, this.dynamic,
this.forceExecution this.forceExecution

View File

@@ -1,7 +1,5 @@
package io.kestra.core.models.flows; package io.kestra.core.models.flows;
import io.kestra.core.models.validations.ManualConstraintViolation;
import jakarta.validation.ConstraintViolationException;
/** /**
* Interface for defining an identifiable and typed data. * Interface for defining an identifiable and typed data.
@@ -29,16 +27,4 @@ public interface Data {
*/ */
String getDisplayName(); String getDisplayName();
@SuppressWarnings("unchecked")
default ConstraintViolationException toConstraintViolationException(String message, Object value) {
Class<Data> cls = (Class<Data>) this.getClass();
return ManualConstraintViolation.toConstraintViolationException(
"Invalid " + (this instanceof Output ? "output" : "input") + " for `" + getId() + "`, " + message + ", but received `" + value + "`",
this,
cls,
this.getId(),
value
);
}
} }

View File

@@ -1,6 +1,5 @@
package io.kestra.core.models.flows; package io.kestra.core.models.flows;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
@@ -130,7 +129,7 @@ public class Flow extends AbstractFlow implements HasUID {
@Valid @Valid
@PluginProperty @PluginProperty
List<SLA> sla; List<SLA> sla;
@Schema( @Schema(
title = "Conditions evaluated before the flow is executed.", title = "Conditions evaluated before the flow is executed.",
description = "A list of conditions that are evaluated before the flow is executed. If no checks are defined, the flow executes normally." description = "A list of conditions that are evaluated before the flow is executed. If no checks are defined, the flow executes normally."
@@ -343,6 +342,7 @@ public class Flow extends AbstractFlow implements HasUID {
} }
} }
@Override
public Flow toDeleted() { public Flow toDeleted() {
return this.toBuilder() return this.toBuilder()
.revision(this.revision + 1) .revision(this.revision + 1)
@@ -355,7 +355,7 @@ public class Flow extends AbstractFlow implements HasUID {
* To be conservative a flow MUST not return any source. * To be conservative a flow MUST not return any source.
*/ */
@Override @Override
@JsonIgnore @Schema(hidden = true)
public String getSource() { public String getSource() {
return null; return null;
} }

View File

@@ -58,4 +58,9 @@ public class FlowForExecution extends AbstractFlow {
public String getSource() { public String getSource() {
return null; return null;
} }
@Override
public FlowForExecution toDeleted() {
throw new UnsupportedOperationException("Can't delete a FlowForExecution");
}
} }

View File

@@ -5,7 +5,7 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import io.kestra.core.models.DeletedInterface; import io.kestra.core.models.SoftDeletable;
import io.kestra.core.models.HasSource; import io.kestra.core.models.HasSource;
import io.kestra.core.models.HasUID; import io.kestra.core.models.HasUID;
import io.kestra.core.models.Label; import io.kestra.core.models.Label;
@@ -27,7 +27,7 @@ import java.util.stream.Collectors;
* The base interface for FLow. * The base interface for FLow.
*/ */
@JsonDeserialize(as = GenericFlow.class) @JsonDeserialize(as = GenericFlow.class)
public interface FlowInterface extends FlowId, DeletedInterface, TenantInterface, HasUID, HasSource { public interface FlowInterface extends FlowId, SoftDeletable<FlowInterface>, TenantInterface, HasUID, HasSource {
Pattern YAML_REVISION_MATCHER = Pattern.compile("(?m)^revision: \\d+\n?"); Pattern YAML_REVISION_MATCHER = Pattern.compile("(?m)^revision: \\d+\n?");

View File

@@ -1,14 +1,12 @@
package io.kestra.core.models.flows; package io.kestra.core.models.flows;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.micronaut.core.annotation.Introspected; import io.micronaut.core.annotation.Introspected;
import lombok.Getter; import lombok.Getter;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.ToString; import lombok.ToString;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
import io.swagger.v3.oas.annotations.media.Schema;
import java.util.Objects;
import java.util.regex.Pattern;
@SuperBuilder(toBuilder = true) @SuperBuilder(toBuilder = true)
@Getter @Getter
@@ -48,7 +46,7 @@ public class FlowWithSource extends Flow {
} }
@Override @Override
@JsonIgnore(value = false) @Schema(hidden = false)
public String getSource() { public String getSource() {
return this.source; return this.source;
} }

View File

@@ -96,4 +96,9 @@ public class GenericFlow extends AbstractFlow implements HasUID {
public List<GenericTrigger> getTriggers() { public List<GenericTrigger> getTriggers() {
return Optional.ofNullable(triggers).orElse(List.of()); return Optional.ofNullable(triggers).orElse(List.of());
} }
@Override
public FlowInterface toDeleted() {
throw new UnsupportedOperationException("Can't delete a GenericFlow");
}
} }

View File

@@ -1,10 +1,12 @@
package io.kestra.core.models.flows.input; package io.kestra.core.models.flows.input;
import io.kestra.core.exceptions.InputOutputValidationException;
import io.kestra.core.models.flows.Input; import io.kestra.core.models.flows.Input;
import jakarta.annotation.Nullable; import jakarta.annotation.Nullable;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.NotNull;
import java.util.Set;
/** /**
* Represents an input along with its associated value and validation state. * Represents an input along with its associated value and validation state.
* *
@@ -12,15 +14,15 @@ import jakarta.validation.constraints.NotNull;
* @param value The provided value for the input. * @param value The provided value for the input.
* @param enabled {@code true} if the input is enabled; {@code false} otherwise. * @param enabled {@code true} if the input is enabled; {@code false} otherwise.
* @param isDefault {@code true} if the provided value is the default; {@code false} otherwise. * @param isDefault {@code true} if the provided value is the default; {@code false} otherwise.
* @param exception The validation exception, if the input value is invalid; {@code null} otherwise. * @param exceptions The validation exceptions, if the input value is invalid; {@code null} otherwise.
*/ */
public record InputAndValue( public record InputAndValue(
Input<?> input, Input<?> input,
Object value, Object value,
boolean enabled, boolean enabled,
boolean isDefault, boolean isDefault,
ConstraintViolationException exception) { Set<InputOutputValidationException> exceptions) {
/** /**
* Creates a new {@link InputAndValue} instance. * Creates a new {@link InputAndValue} instance.
* *

View File

@@ -7,6 +7,7 @@ import io.kestra.core.models.property.Property;
import io.kestra.core.models.validations.ManualConstraintViolation; import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.validations.Regex; import io.kestra.core.validations.Regex;
import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.ConstraintViolationException; import jakarta.validation.ConstraintViolationException;
import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.NotNull;
import lombok.Builder; import lombok.Builder;
@@ -14,10 +15,7 @@ import lombok.Getter;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
import java.util.Collection; import java.util.*;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function; import java.util.function.Function;
@SuperBuilder @SuperBuilder
@@ -77,30 +75,35 @@ public class MultiselectInput extends Input<List<String>> implements ItemTypeInt
@Override @Override
public void validate(List<String> inputs) throws ConstraintViolationException { public void validate(List<String> inputs) throws ConstraintViolationException {
Set<ConstraintViolation<?>> violations = new HashSet<>();
if (values != null && options != null) { if (values != null && options != null) {
throw ManualConstraintViolation.toConstraintViolationException( violations.add( ManualConstraintViolation.of(
"you can't define both `values` and `options`", "you can't define both `values` and `options`",
this, this,
MultiselectInput.class, MultiselectInput.class,
getId(), getId(),
"" ""
); ));
} }
if (!this.getAllowCustomValue()) { if (!this.getAllowCustomValue()) {
for (String input : inputs) { for (String input : inputs) {
List<@Regex String> finalValues = this.values != null ? this.values : this.options; List<@Regex String> finalValues = this.values != null ? this.values : this.options;
if (!finalValues.contains(input)) { if (!finalValues.contains(input)) {
throw ManualConstraintViolation.toConstraintViolationException( violations.add(ManualConstraintViolation.of(
"it must match the values `" + finalValues + "`", "value `" + input + "` doesn't match the values `" + finalValues + "`",
this, this,
MultiselectInput.class, MultiselectInput.class,
getId(), getId(),
input input
); ));
} }
} }
} }
if (!violations.isEmpty()) {
throw ManualConstraintViolation.toConstraintViolationException(violations);
}
} }
/** {@inheritDoc} **/ /** {@inheritDoc} **/
@@ -145,7 +148,7 @@ public class MultiselectInput extends Input<List<String>> implements ItemTypeInt
String type = Optional.ofNullable(result).map(Object::getClass).map(Class::getSimpleName).orElse("<null>"); String type = Optional.ofNullable(result).map(Object::getClass).map(Class::getSimpleName).orElse("<null>");
throw ManualConstraintViolation.toConstraintViolationException( throw ManualConstraintViolation.toConstraintViolationException(
"Invalid expression result. Expected a list of strings, but received " + type, "Invalid expression result. Expected a list of strings",
this, this,
MultiselectInput.class, MultiselectInput.class,
getId(), getId(),

View File

@@ -125,7 +125,7 @@ public class SelectInput extends Input<String> implements RenderableInput {
String type = Optional.ofNullable(result).map(Object::getClass).map(Class::getSimpleName).orElse("<null>"); String type = Optional.ofNullable(result).map(Object::getClass).map(Class::getSimpleName).orElse("<null>");
throw ManualConstraintViolation.toConstraintViolationException( throw ManualConstraintViolation.toConstraintViolationException(
"Invalid expression result. Expected a list of strings, but received " + type, "Invalid expression result. Expected a list of strings",
this, this,
SelectInput.class, SelectInput.class,
getId(), getId(),

View File

@@ -1,6 +1,6 @@
package io.kestra.core.models.kv; package io.kestra.core.models.kv;
import io.kestra.core.models.DeletedInterface; import io.kestra.core.models.SoftDeletable;
import io.kestra.core.models.HasUID; import io.kestra.core.models.HasUID;
import io.kestra.core.models.TenantInterface; import io.kestra.core.models.TenantInterface;
import io.kestra.core.storages.kv.KVEntry; import io.kestra.core.storages.kv.KVEntry;
@@ -22,7 +22,7 @@ import java.util.Optional;
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE) @FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
@ToString @ToString
@EqualsAndHashCode @EqualsAndHashCode
public class PersistedKvMetadata implements DeletedInterface, TenantInterface, HasUID { public class PersistedKvMetadata implements SoftDeletable<PersistedKvMetadata>, TenantInterface, HasUID {
@With @With
@Hidden @Hidden
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*") @Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
@@ -83,6 +83,7 @@ public class PersistedKvMetadata implements DeletedInterface, TenantInterface, H
return this.toBuilder().updated(Instant.now()).last(true).build(); return this.toBuilder().updated(Instant.now()).last(true).build();
} }
@Override
public PersistedKvMetadata toDeleted() { public PersistedKvMetadata toDeleted() {
return this.toBuilder().updated(Instant.now()).deleted(true).build(); return this.toBuilder().updated(Instant.now()).deleted(true).build();
} }

View File

@@ -17,8 +17,4 @@ public class Namespace implements NamespaceInterface {
@NotNull @NotNull
@Pattern(regexp="^[a-z0-9][a-z0-9._-]*") @Pattern(regexp="^[a-z0-9][a-z0-9._-]*")
protected String id; protected String id;
@NotNull
@Builder.Default
boolean deleted = false;
} }

View File

@@ -1,9 +1,8 @@
package io.kestra.core.models.namespaces; package io.kestra.core.models.namespaces;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.HasUID; import io.kestra.core.models.HasUID;
public interface NamespaceInterface extends DeletedInterface, HasUID { public interface NamespaceInterface extends HasUID {
String getId(); String getId();

View File

@@ -2,8 +2,8 @@ package io.kestra.core.models.namespaces.files;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.HasUID; import io.kestra.core.models.HasUID;
import io.kestra.core.models.SoftDeletable;
import io.kestra.core.models.TenantInterface; import io.kestra.core.models.TenantInterface;
import io.kestra.core.storages.FileAttributes; import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.NamespaceFile; import io.kestra.core.storages.NamespaceFile;
@@ -24,7 +24,7 @@ import java.time.Instant;
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE) @FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
@ToString @ToString
@EqualsAndHashCode @EqualsAndHashCode
public class NamespaceFileMetadata implements DeletedInterface, TenantInterface, HasUID { public class NamespaceFileMetadata implements SoftDeletable<NamespaceFileMetadata>, TenantInterface, HasUID {
@With @With
@Hidden @Hidden
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*") @Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
@@ -116,6 +116,7 @@ public class NamespaceFileMetadata implements DeletedInterface, TenantInterface,
return this.toBuilder().updated(saveDate).last(true).build(); return this.toBuilder().updated(saveDate).last(true).build();
} }
@Override
public NamespaceFileMetadata toDeleted() { public NamespaceFileMetadata toDeleted() {
return this.toBuilder().deleted(true).updated(Instant.now()).build(); return this.toBuilder().deleted(true).updated(Instant.now()).build();
} }

View File

@@ -11,6 +11,7 @@ import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextProperty;
import io.kestra.core.serializers.JacksonMapper; import io.kestra.core.serializers.JacksonMapper;
import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.NotNull;
@@ -156,9 +157,9 @@ public class Property<T> {
/** /**
* Render a property, then convert it to its target type.<br> * Render a property, then convert it to its target type.<br>
* <p> * <p>
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}. * This method is designed to be used only by the {@link RunContextProperty}.
* *
* @see io.kestra.core.runners.RunContextProperty#as(Class) * @see RunContextProperty#as(Class)
*/ */
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz) throws IllegalVariableEvaluationException { public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz) throws IllegalVariableEvaluationException {
return as(property, context, clazz, Map.of()); return as(property, context, clazz, Map.of());
@@ -167,25 +168,57 @@ public class Property<T> {
/** /**
* Render a property with additional variables, then convert it to its target type.<br> * Render a property with additional variables, then convert it to its target type.<br>
* <p> * <p>
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}. * This method is designed to be used only by the {@link RunContextProperty}.
* *
* @see io.kestra.core.runners.RunContextProperty#as(Class, Map) * @see RunContextProperty#as(Class, Map)
*/ */
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException { public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.skipCache || property.value == null) { if (property.skipCache || property.value == null) {
String rendered = context.render(property.expression, variables); String rendered = context.render(property.expression, variables);
property.value = MAPPER.convertValue(rendered, clazz); property.value = deserialize(rendered, clazz);
} }
return property.value; return property.value;
} }
private static <T> T deserialize(Object rendered, Class<T> clazz) throws IllegalVariableEvaluationException {
try {
return MAPPER.convertValue(rendered, clazz);
} catch (IllegalArgumentException e) {
if (rendered instanceof String str) {
try {
return MAPPER.readValue(str, clazz);
} catch (JsonProcessingException ex) {
throw new IllegalVariableEvaluationException(ex);
}
}
throw new IllegalVariableEvaluationException(e);
}
}
private static <T> T deserialize(Object rendered, JavaType type) throws IllegalVariableEvaluationException {
try {
return MAPPER.convertValue(rendered, type);
} catch (IllegalArgumentException e) {
if (rendered instanceof String str) {
try {
return MAPPER.readValue(str, type);
} catch (JsonProcessingException ex) {
throw new IllegalVariableEvaluationException(ex);
}
}
throw new IllegalVariableEvaluationException(e);
}
}
/** /**
* Render a property then convert it as a list of target type.<br> * Render a property then convert it as a list of target type.<br>
* <p> * <p>
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}. * This method is designed to be used only by the {@link RunContextProperty}.
* *
* @see io.kestra.core.runners.RunContextProperty#asList(Class) * @see RunContextProperty#asList(Class)
*/ */
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz) throws IllegalVariableEvaluationException { public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz) throws IllegalVariableEvaluationException {
return asList(property, context, itemClazz, Map.of()); return asList(property, context, itemClazz, Map.of());
@@ -194,37 +227,39 @@ public class Property<T> {
/** /**
* Render a property with additional variables, then convert it as a list of target type.<br> * Render a property with additional variables, then convert it as a list of target type.<br>
* <p> * <p>
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}. * This method is designed to be used only by the {@link RunContextProperty}.
* *
* @see io.kestra.core.runners.RunContextProperty#asList(Class, Map) * @see RunContextProperty#asList(Class, Map)
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException { public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
if (property.skipCache || property.value == null) { if (property.skipCache || property.value == null) {
JavaType type = MAPPER.getTypeFactory().constructCollectionLikeType(List.class, itemClazz); JavaType type = MAPPER.getTypeFactory().constructCollectionLikeType(List.class, itemClazz);
try { String trimmedExpression = property.expression.trim();
String trimmedExpression = property.expression.trim(); // We need to detect if the expression is already a list or if it's a pebble expression (for eg. referencing a variable containing a list).
// We need to detect if the expression is already a list or if it's a pebble expression (for eg. referencing a variable containing a list). // Doing that allows us to, if it's an expression, first render then read it as a list.
// Doing that allows us to, if it's an expression, first render then read it as a list. if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) { property.value = deserialize(context.render(property.expression, variables), type);
property.value = MAPPER.readValue(context.render(property.expression, variables), type); }
} // Otherwise, if it's already a list, we read it as a list first then render it from run context which handle list rendering by rendering each item of the list
// Otherwise, if it's already a list, we read it as a list first then render it from run context which handle list rendering by rendering each item of the list else {
else { List<?> asRawList = deserialize(property.expression, List.class);
List<?> asRawList = MAPPER.readValue(property.expression, List.class); property.value = (T) asRawList.stream()
property.value = (T) asRawList.stream() .map(throwFunction(item -> {
.map(throwFunction(item -> { Object rendered = null;
if (item instanceof String str) { if (item instanceof String str) {
return MAPPER.convertValue(context.render(str, variables), itemClazz); rendered = context.render(str, variables);
} else if (item instanceof Map map) { } else if (item instanceof Map map) {
return MAPPER.convertValue(context.render(map, variables), itemClazz); rendered = context.render(map, variables);
} }
return item;
})) if (rendered != null) {
.toList(); return deserialize(rendered, itemClazz);
} }
} catch (JsonProcessingException e) {
throw new IllegalVariableEvaluationException(e); return item;
}))
.toList();
} }
} }
@@ -234,9 +269,9 @@ public class Property<T> {
/** /**
* Render a property then convert it as a map of target types.<br> * Render a property then convert it as a map of target types.<br>
* <p> * <p>
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}. * This method is designed to be used only by the {@link RunContextProperty}.
* *
* @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class) * @see RunContextProperty#asMap(Class, Class)
*/ */
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException { public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
return asMap(property, runContext, keyClass, valueClass, Map.of()); return asMap(property, runContext, keyClass, valueClass, Map.of());
@@ -248,7 +283,7 @@ public class Property<T> {
* This method is safe to be used as many times as you want as the rendering and conversion will be cached. * This method is safe to be used as many times as you want as the rendering and conversion will be cached.
* Warning, due to the caching mechanism, this method is not thread-safe. * Warning, due to the caching mechanism, this method is not thread-safe.
* *
* @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class, Map) * @see RunContextProperty#asMap(Class, Class, Map)
*/ */
@SuppressWarnings({"rawtypes", "unchecked"}) @SuppressWarnings({"rawtypes", "unchecked"})
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException { public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
@@ -260,12 +295,12 @@ public class Property<T> {
// We need to detect if the expression is already a map or if it's a pebble expression (for eg. referencing a variable containing a map). // We need to detect if the expression is already a map or if it's a pebble expression (for eg. referencing a variable containing a map).
// Doing that allows us to, if it's an expression, first render then read it as a map. // Doing that allows us to, if it's an expression, first render then read it as a map.
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) { if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
property.value = MAPPER.readValue(runContext.render(property.expression, variables), targetMapType); property.value = deserialize(runContext.render(property.expression, variables), targetMapType);
} }
// Otherwise if it's already a map we read it as a map first then render it from run context which handle map rendering by rendering each entry of the map (otherwise it will fail with nested expressions in values for eg.) // Otherwise if it's already a map we read it as a map first then render it from run context which handle map rendering by rendering each entry of the map (otherwise it will fail with nested expressions in values for eg.)
else { else {
Map asRawMap = MAPPER.readValue(property.expression, Map.class); Map asRawMap = MAPPER.readValue(property.expression, Map.class);
property.value = MAPPER.convertValue(runContext.render(asRawMap, variables), targetMapType); property.value = deserialize(runContext.render(asRawMap, variables), targetMapType);
} }
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
throw new IllegalVariableEvaluationException(e); throw new IllegalVariableEvaluationException(e);

View File

@@ -7,8 +7,8 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotatedMember; import com.fasterxml.jackson.databind.introspect.AnnotatedMember;
import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector; import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.HasUID; import io.kestra.core.models.HasUID;
import io.kestra.core.models.SoftDeletable;
import io.kestra.core.models.TenantInterface; import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.validations.ManualConstraintViolation; import io.kestra.core.models.validations.ManualConstraintViolation;
@@ -35,7 +35,7 @@ import jakarta.validation.constraints.Pattern;
@Introspected @Introspected
@ToString @ToString
@EqualsAndHashCode @EqualsAndHashCode
public class Template implements DeletedInterface, TenantInterface, HasUID { public class Template implements SoftDeletable<Template>, TenantInterface, HasUID {
private static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml().copy() private static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml().copy()
.setAnnotationIntrospector(new JacksonAnnotationIntrospector() { .setAnnotationIntrospector(new JacksonAnnotationIntrospector() {
@Override @Override
@@ -141,6 +141,7 @@ public class Template implements DeletedInterface, TenantInterface, HasUID {
} }
} }
@Override
public Template toDeleted() { public Template toDeleted() {
return new Template( return new Template(
this.tenantId, this.tenantId,

View File

@@ -82,6 +82,12 @@ abstract public class AbstractTrigger implements TriggerInterface {
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP) @PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
private boolean failOnTriggerError = false; private boolean failOnTriggerError = false;
@PluginProperty(group = PluginProperty.CORE_GROUP)
@Schema(
title = "Specifies whether a trigger is allowed to start a new execution even if a previous run is still in progress."
)
private boolean allowConcurrent = false;
/** /**
* For backward compatibility: we rename minLogLevel to logLevel. * For backward compatibility: we rename minLogLevel to logLevel.
* @deprecated use {@link #logLevel} instead * @deprecated use {@link #logLevel} instead

View File

@@ -1,22 +1,37 @@
package io.kestra.core.models.triggers; package io.kestra.core.models.triggers;
import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.conditions.ConditionContext; import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.util.Map;
public interface Schedulable extends PollingTriggerInterface{ public interface Schedulable extends PollingTriggerInterface{
String PLUGIN_PROPERTY_RECOVER_MISSED_SCHEDULES = "recoverMissedSchedules"; String PLUGIN_PROPERTY_RECOVER_MISSED_SCHEDULES = "recoverMissedSchedules";
@Schema(
title = "The inputs to pass to the scheduled flow"
)
@PluginProperty(dynamic = true)
Map<String, Object> getInputs();
@Schema(
title = "Action to take in the case of missed schedules",
description = "`ALL` will recover all missed schedules, `LAST` will only recovered the last missing one, `NONE` will not recover any missing schedule.\n" +
"The default is `ALL` unless a different value is configured using the global plugin configuration."
)
@PluginProperty
RecoverMissedSchedules getRecoverMissedSchedules();
/** /**
* Compute the previous evaluation of a trigger. * Compute the previous evaluation of a trigger.
* This is used when a trigger misses some schedule to compute the next date to evaluate in the past. * This is used when a trigger misses some schedule to compute the next date to evaluate in the past.
*/ */
ZonedDateTime previousEvaluationDate(ConditionContext conditionContext) throws IllegalVariableEvaluationException; ZonedDateTime previousEvaluationDate(ConditionContext conditionContext) throws IllegalVariableEvaluationException;
RecoverMissedSchedules getRecoverMissedSchedules();
/** /**
* Load the default RecoverMissedSchedules from plugin property, or else ALL. * Load the default RecoverMissedSchedules from plugin property, or else ALL.
*/ */

View File

@@ -172,7 +172,7 @@ public class Trigger extends TriggerContext implements HasUID {
if (abstractTrigger instanceof PollingTriggerInterface pollingTriggerInterface) { if (abstractTrigger instanceof PollingTriggerInterface pollingTriggerInterface) {
try { try {
nextDate = pollingTriggerInterface.nextEvaluationDate(conditionContext, Optional.empty()); nextDate = pollingTriggerInterface.nextEvaluationDate(conditionContext, lastTrigger);
} catch (InvalidTriggerConfigurationException e) { } catch (InvalidTriggerConfigurationException e) {
disabled = true; disabled = true;
} }

View File

@@ -6,12 +6,9 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionTrigger; import io.kestra.core.models.executions.ExecutionTrigger;
import io.kestra.core.models.tasks.Output; import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.flows.State; import io.kestra.core.models.flows.State;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils; import io.kestra.core.utils.ListUtils;
import java.time.ZonedDateTime;
import java.util.*; import java.util.*;
public abstract class TriggerService { public abstract class TriggerService {
@@ -51,58 +48,6 @@ public abstract class TriggerService {
return generateExecution(IdUtils.create(), trigger, context, executionTrigger, conditionContext); return generateExecution(IdUtils.create(), trigger, context, executionTrigger, conditionContext);
} }
public static Execution generateScheduledExecution(
AbstractTrigger trigger,
ConditionContext conditionContext,
TriggerContext context,
List<Label> labels,
Map<String, Object> inputs,
Map<String, Object> variables,
Optional<ZonedDateTime> scheduleDate
) {
RunContext runContext = conditionContext.getRunContext();
ExecutionTrigger executionTrigger = ExecutionTrigger.of(trigger, variables);
List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(labels));
if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) {
// add a correlation ID if none exist
executionLabels.add(new Label(Label.CORRELATION_ID, runContext.getTriggerExecutionId()));
}
Execution execution = Execution.builder()
.id(runContext.getTriggerExecutionId())
.tenantId(context.getTenantId())
.namespace(context.getNamespace())
.flowId(context.getFlowId())
.flowRevision(conditionContext.getFlow().getRevision())
.variables(conditionContext.getFlow().getVariables())
.labels(executionLabels)
.state(new State())
.trigger(executionTrigger)
.scheduleDate(scheduleDate.map(date -> date.toInstant()).orElse(null))
.build();
Map<String, Object> allInputs = new HashMap<>();
// add flow inputs with default value
var flow = conditionContext.getFlow();
if (flow.getInputs() != null) {
flow.getInputs().stream()
.filter(input -> input.getDefaults() != null)
.forEach(input -> allInputs.put(input.getId(), input.getDefaults()));
}
if (inputs != null) {
allInputs.putAll(inputs);
}
// add inputs and inject defaults
if (!allInputs.isEmpty()) {
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class);
execution = execution.withInputs(flowInputOutput.readExecutionInputs(conditionContext.getFlow(), execution, allInputs));
}
return execution;
}
private static Execution generateExecution( private static Execution generateExecution(
String id, String id,
AbstractTrigger trigger, AbstractTrigger trigger,
@@ -111,6 +56,7 @@ public abstract class TriggerService {
ConditionContext conditionContext ConditionContext conditionContext
) { ) {
List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(trigger.getLabels())); List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(trigger.getLabels()));
executionLabels.add(new Label(Label.FROM, "trigger"));
if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) { if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) {
// add a correlation ID if none exist // add a correlation ID if none exist
executionLabels.add(new Label(Label.CORRELATION_ID, id)); executionLabels.add(new Label(Label.CORRELATION_ID, id));

View File

@@ -67,6 +67,11 @@ public class ManualConstraintViolation<T> implements ConstraintViolation<T> {
invalidValue invalidValue
))); )));
} }
public static <T> ConstraintViolationException toConstraintViolationException(
Set<? extends ConstraintViolation<?>> constraintViolations
) {
return new ConstraintViolationException(constraintViolations);
}
public String getMessageTemplate() { public String getMessageTemplate() {
return "{messageTemplate}"; return "{messageTemplate}";

View File

@@ -36,7 +36,7 @@ public interface KvMetadataRepositoryInterface extends SaveRepositoryInterface<P
); );
default PersistedKvMetadata delete(PersistedKvMetadata persistedKvMetadata) throws IOException { default PersistedKvMetadata delete(PersistedKvMetadata persistedKvMetadata) throws IOException {
return this.save(persistedKvMetadata.toBuilder().deleted(true).build()); return this.save(persistedKvMetadata.toDeleted());
} }
/** /**

View File

@@ -1,10 +1,10 @@
package io.kestra.core.repositories; package io.kestra.core.repositories;
import io.kestra.core.models.Setting; import io.kestra.core.models.Setting;
import jakarta.validation.ConstraintViolationException;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import jakarta.validation.ConstraintViolationException;
public interface SettingRepositoryInterface { public interface SettingRepositoryInterface {
Optional<Setting> findByKey(String key); Optional<Setting> findByKey(String key);
@@ -13,5 +13,7 @@ public interface SettingRepositoryInterface {
Setting save(Setting setting) throws ConstraintViolationException; Setting save(Setting setting) throws ConstraintViolationException;
Setting internalSave(Setting setting) throws ConstraintViolationException;
Setting delete(Setting setting); Setting delete(Setting setting);
} }

View File

@@ -16,8 +16,8 @@ import java.util.function.Function;
public interface TriggerRepositoryInterface extends QueryBuilderInterface<Triggers.Fields> { public interface TriggerRepositoryInterface extends QueryBuilderInterface<Triggers.Fields> {
Optional<Trigger> findLast(TriggerContext trigger); Optional<Trigger> findLast(TriggerContext trigger);
Optional<Trigger> findByExecution(Execution execution); Optional<Trigger> findByUid(String uid);
List<Trigger> findAll(String tenantId); List<Trigger> findAll(String tenantId);
List<Trigger> findAllForAllTenants(); List<Trigger> findAllForAllTenants();

View File

@@ -6,10 +6,12 @@ import com.google.common.base.CaseFormat;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.metrics.MetricRegistry; import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.Plugin;
import io.kestra.core.models.executions.AbstractMetricEntry; import io.kestra.core.models.executions.AbstractMetricEntry;
import io.kestra.core.models.property.Property; import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.plugins.PluginConfigurations;
import io.kestra.core.services.KVStoreService; import io.kestra.core.services.KVStoreService;
import io.kestra.core.storages.Storage; import io.kestra.core.storages.Storage;
import io.kestra.core.storages.StorageInterface; import io.kestra.core.storages.StorageInterface;
@@ -235,6 +237,14 @@ public class DefaultRunContext extends RunContext {
return runContext; return runContext;
} }
@Override
public RunContext cloneForPlugin(Plugin plugin) {
PluginConfigurations pluginConfigurations = applicationContext.getBean(PluginConfigurations.class);
DefaultRunContext runContext = clone();
runContext.pluginConfiguration = pluginConfigurations.getConfigurationByPluginTypeOrAliases(plugin.getType(), plugin.getClass());
return runContext;
}
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */
@@ -589,6 +599,11 @@ public class DefaultRunContext extends RunContext {
return localPath; return localPath;
} }
@Override
public InputAndOutput inputAndOutput() {
return new InputAndOutputImpl(this.applicationContext, this);
}
/** /**
* Builder class for constructing new {@link DefaultRunContext} objects. * Builder class for constructing new {@link DefaultRunContext} objects.
*/ */

View File

@@ -189,12 +189,11 @@ public final class ExecutableUtils {
variables.put("taskRunIteration", currentTaskRun.getIteration()); variables.put("taskRunIteration", currentTaskRun.getIteration());
} }
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class);
Instant scheduleOnDate = runContext.render(scheduleDate).as(ZonedDateTime.class).map(date -> date.toInstant()).orElse(null); Instant scheduleOnDate = runContext.render(scheduleDate).as(ZonedDateTime.class).map(date -> date.toInstant()).orElse(null);
Execution execution = Execution Execution execution = Execution
.newExecution( .newExecution(
flow, flow,
(f, e) -> flowInputOutput.readExecutionInputs(f, e, inputs), (f, e) -> runContext.inputAndOutput().readInputs(f, e, inputs),
newLabels, newLabels,
Optional.empty()) Optional.empty())
.withTrigger(ExecutionTrigger.builder() .withTrigger(ExecutionTrigger.builder()

View File

@@ -3,13 +3,13 @@ package io.kestra.core.runners;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.encryption.EncryptionService; import io.kestra.core.encryption.EncryptionService;
import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.KestraRuntimeException;
import io.kestra.core.exceptions.InputOutputValidationException;
import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Data; import io.kestra.core.models.flows.Data;
import io.kestra.core.models.flows.DependsOn; import io.kestra.core.models.flows.DependsOn;
import io.kestra.core.models.flows.FlowInterface; import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Input; import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.Output;
import io.kestra.core.models.flows.RenderableInput; import io.kestra.core.models.flows.RenderableInput;
import io.kestra.core.models.flows.Type; import io.kestra.core.models.flows.Type;
import io.kestra.core.models.flows.input.FileInput; import io.kestra.core.models.flows.input.FileInput;
@@ -19,7 +19,6 @@ import io.kestra.core.models.property.Property;
import io.kestra.core.models.property.PropertyContext; import io.kestra.core.models.property.PropertyContext;
import io.kestra.core.models.property.URIFetcher; import io.kestra.core.models.property.URIFetcher;
import io.kestra.core.models.tasks.common.EncryptedString; import io.kestra.core.models.tasks.common.EncryptedString;
import io.kestra.core.models.validations.ManualConstraintViolation;
import io.kestra.core.serializers.JacksonMapper; import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.StorageContext; import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface; import io.kestra.core.storages.StorageInterface;
@@ -209,8 +208,8 @@ public class FlowInputOutput {
.filter(InputAndValue::enabled) .filter(InputAndValue::enabled)
.map(it -> { .map(it -> {
//TODO check to return all exception at-once. //TODO check to return all exception at-once.
if (it.exception() != null) { if (it.exceptions() != null && !it.exceptions().isEmpty()) {
throw it.exception(); throw InputOutputValidationException.merge(it.exceptions());
} }
return new AbstractMap.SimpleEntry<>(it.input().getId(), it.value()); return new AbstractMap.SimpleEntry<>(it.input().getId(), it.value());
}) })
@@ -294,13 +293,9 @@ public class FlowInputOutput {
try { try {
isInputEnabled = Boolean.TRUE.equals(runContext.renderTyped(dependsOnCondition.get())); isInputEnabled = Boolean.TRUE.equals(runContext.renderTyped(dependsOnCondition.get()));
} catch (IllegalVariableEvaluationException e) { } catch (IllegalVariableEvaluationException e) {
resolvable.resolveWithError(ManualConstraintViolation.toConstraintViolationException( resolvable.resolveWithError(
"Invalid condition: " + e.getMessage(), InputOutputValidationException.of("Invalid condition: " + e.getMessage())
input, );
(Class<Input>)input.getClass(),
input.getId(),
this
));
isInputEnabled = false; isInputEnabled = false;
} }
} }
@@ -333,7 +328,7 @@ public class FlowInputOutput {
// validate and parse input value // validate and parse input value
if (value == null) { if (value == null) {
if (input.getRequired()) { if (input.getRequired()) {
resolvable.resolveWithError(input.toConstraintViolationException("missing required input", null)); resolvable.resolveWithError(InputOutputValidationException.of("Missing required input:" + input.getId()));
} else { } else {
resolvable.resolveWithValue(null); resolvable.resolveWithValue(null);
} }
@@ -343,17 +338,18 @@ public class FlowInputOutput {
parsedInput.ifPresent(parsed -> ((Input) resolvable.get().input()).validate(parsed.getValue())); parsedInput.ifPresent(parsed -> ((Input) resolvable.get().input()).validate(parsed.getValue()));
parsedInput.ifPresent(typed -> resolvable.resolveWithValue(typed.getValue())); parsedInput.ifPresent(typed -> resolvable.resolveWithValue(typed.getValue()));
} catch (ConstraintViolationException e) { } catch (ConstraintViolationException e) {
ConstraintViolationException exception = e.getConstraintViolations().size() == 1 ? Input<?> finalInput = input;
input.toConstraintViolationException(List.copyOf(e.getConstraintViolations()).getFirst().getMessage(), value) : Set<InputOutputValidationException> exceptions = e.getConstraintViolations().stream()
input.toConstraintViolationException(e.getMessage(), value); .map(c-> InputOutputValidationException.of(c.getMessage(), finalInput))
resolvable.resolveWithError(exception); .collect(Collectors.toSet());
resolvable.resolveWithError(exceptions);
} }
} }
} catch (ConstraintViolationException e) { } catch (IllegalArgumentException e){
resolvable.resolveWithError(e); resolvable.resolveWithError(InputOutputValidationException.of(e.getMessage(), input));
} catch (Exception e) { }
ConstraintViolationException exception = input.toConstraintViolationException(e instanceof IllegalArgumentException ? e.getMessage() : e.toString(), resolvable.get().value()); catch (Exception e) {
resolvable.resolveWithError(exception); resolvable.resolveWithError(InputOutputValidationException.of(e.getMessage()));
} }
return resolvable.get(); return resolvable.get();
@@ -441,8 +437,12 @@ public class FlowInputOutput {
} }
return entry; return entry;
}); });
} catch (Exception e) { }
throw output.toConstraintViolationException(e.getMessage(), current); catch (IllegalArgumentException e){
throw InputOutputValidationException.of(e.getMessage(), output);
}
catch (Exception e) {
throw InputOutputValidationException.of(e.getMessage());
} }
}) })
.filter(Optional::isPresent) .filter(Optional::isPresent)
@@ -505,7 +505,7 @@ public class FlowInputOutput {
if (matcher.matches()) { if (matcher.matches()) {
yield current.toString(); yield current.toString();
} else { } else {
throw new IllegalArgumentException("Expected `URI` but received `" + current + "`"); throw new IllegalArgumentException("Invalid URI format.");
} }
} }
case ARRAY, MULTISELECT -> { case ARRAY, MULTISELECT -> {
@@ -535,34 +535,10 @@ public class FlowInputOutput {
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
throw e; throw e;
} catch (Throwable e) { } catch (Throwable e) {
throw new Exception("Expected `" + type + "` but received `" + current + "` with errors:\n```\n" + e.getMessage() + "\n```"); throw new Exception(" errors:\n```\n" + e.getMessage() + "\n```");
} }
} }
public static Map<String, Object> renderFlowOutputs(List<Output> outputs, RunContext runContext) throws IllegalVariableEvaluationException {
if (outputs == null) return Map.of();
// render required outputs
Map<String, Object> outputsById = outputs
.stream()
.filter(output -> output.getRequired() == null || output.getRequired())
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
outputsById = runContext.render(outputsById);
// render optional outputs one by one to catch, log, and skip any error.
for (io.kestra.core.models.flows.Output output : outputs) {
if (Boolean.FALSE.equals(output.getRequired())) {
try {
outputsById.putAll(runContext.render(Map.of(output.getId(), output.getValue())));
} catch (Exception e) {
runContext.logger().warn("Failed to render optional flow output '{}'. Output is ignored.", output.getId(), e);
outputsById.put(output.getId(), null);
}
}
}
return outputsById;
}
/** /**
* Mutable wrapper to hold a flow's input, and it's resolved value. * Mutable wrapper to hold a flow's input, and it's resolved value.
*/ */
@@ -591,27 +567,30 @@ public class FlowInputOutput {
} }
public void isDefault(boolean isDefault) { public void isDefault(boolean isDefault) {
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), isDefault, this.input.exception()); this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), isDefault, this.input.exceptions());
} }
public void setInput(final Input<?> input) { public void setInput(final Input<?> input) {
this.input = new InputAndValue(input, this.input.value(), this.input.enabled(), this.input.isDefault(), this.input.exception()); this.input = new InputAndValue(input, this.input.value(), this.input.enabled(), this.input.isDefault(), this.input.exceptions());
} }
public void resolveWithEnabled(boolean enabled) { public void resolveWithEnabled(boolean enabled) {
this.input = new InputAndValue(this.input.input(), input.value(), enabled, this.input.isDefault(), this.input.exception()); this.input = new InputAndValue(this.input.input(), input.value(), enabled, this.input.isDefault(), this.input.exceptions());
markAsResolved(); markAsResolved();
} }
public void resolveWithValue(@Nullable Object value) { public void resolveWithValue(@Nullable Object value) {
this.input = new InputAndValue(this.input.input(), value, this.input.enabled(), this.input.isDefault(), this.input.exception()); this.input = new InputAndValue(this.input.input(), value, this.input.enabled(), this.input.isDefault(), this.input.exceptions());
markAsResolved(); markAsResolved();
} }
public void resolveWithError(@Nullable ConstraintViolationException exception) { public void resolveWithError(@Nullable Set<InputOutputValidationException> exception) {
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), this.input.isDefault(), exception); this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), this.input.isDefault(), exception);
markAsResolved(); markAsResolved();
} }
private void resolveWithError(@Nullable InputOutputValidationException exception){
resolveWithError(Collections.singleton(exception));
}
private void markAsResolved() { private void markAsResolved() {
this.isResolved = true; this.isResolved = true;

View File

@@ -0,0 +1,29 @@
package io.kestra.core.runners;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Output;
import java.util.List;
import java.util.Map;
/**
* InputAndOutput could be used to work with flow execution inputs and outputs.
*/
public interface InputAndOutput {
/**
* Reads the inputs of a flow execution.
*/
Map<String, Object> readInputs(FlowInterface flow, Execution execution, Map<String, Object> inputs);
/**
* Processes the outputs of a flow execution (parse them based on their types).
*/
Map<String, Object> typedOutputs(FlowInterface flow, Execution execution, Map<String, Object> rOutputs);
/**
* Render flow execution outputs.
*/
Map<String, Object> renderOutputs(List<Output> outputs) throws IllegalVariableEvaluationException;
}

View File

@@ -0,0 +1,56 @@
package io.kestra.core.runners;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.Output;
import io.micronaut.context.ApplicationContext;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
class InputAndOutputImpl implements InputAndOutput {
private final FlowInputOutput flowInputOutput;
private final RunContext runContext;
InputAndOutputImpl(ApplicationContext applicationContext, RunContext runContext) {
this.flowInputOutput = applicationContext.getBean(FlowInputOutput.class);
this.runContext = runContext;
}
@Override
public Map<String, Object> readInputs(FlowInterface flow, Execution execution, Map<String, Object> inputs) {
return flowInputOutput.readExecutionInputs(flow, execution, inputs);
}
@Override
public Map<String, Object> typedOutputs(FlowInterface flow, Execution execution, Map<String, Object> rOutputs) {
return flowInputOutput.typedOutputs(flow, execution, rOutputs);
}
@Override
public Map<String, Object> renderOutputs(List<Output> outputs) throws IllegalVariableEvaluationException {
if (outputs == null) return Map.of();
// render required outputs
Map<String, Object> outputsById = outputs
.stream()
.filter(output -> output.getRequired() == null || output.getRequired())
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
outputsById = runContext.render(outputsById);
// render optional outputs one by one to catch, log, and skip any error.
for (io.kestra.core.models.flows.Output output : outputs) {
if (Boolean.FALSE.equals(output.getRequired())) {
try {
outputsById.putAll(runContext.render(Map.of(output.getId(), output.getValue())));
} catch (Exception e) {
runContext.logger().warn("Failed to render optional flow output '{}'. Output is ignored.", output.getId(), e);
outputsById.put(output.getId(), null);
}
}
}
return outputsById;
}
}

View File

@@ -4,6 +4,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import io.kestra.core.encryption.EncryptionService; import io.kestra.core.encryption.EncryptionService;
import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.Plugin;
import io.kestra.core.models.executions.AbstractMetricEntry; import io.kestra.core.models.executions.AbstractMetricEntry;
import io.kestra.core.models.property.Property; import io.kestra.core.models.property.Property;
import io.kestra.core.models.property.PropertyContext; import io.kestra.core.models.property.PropertyContext;
@@ -204,4 +205,15 @@ public abstract class RunContext implements PropertyContext {
* when Namespace ACLs are used (EE). * when Namespace ACLs are used (EE).
*/ */
public abstract AclChecker acl(); public abstract AclChecker acl();
/**
* Clone this run context for a specific plugin.
* @return a new run context with the plugin configuration of the given plugin.
*/
public abstract RunContext cloneForPlugin(Plugin plugin);
/**
* @return an InputAndOutput that can be used to work with inputs and outputs.
*/
public abstract InputAndOutput inputAndOutput();
} }

View File

@@ -1,10 +1,8 @@
package io.kestra.core.runners; package io.kestra.core.runners;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import io.kestra.core.models.Plugin;
import io.kestra.core.models.executions.TaskRun; import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.tasks.runners.TaskRunner;
import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.TriggerContext; import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.plugins.PluginConfigurations; import io.kestra.core.plugins.PluginConfigurations;
@@ -53,20 +51,6 @@ public class RunContextInitializer {
@Value("${kestra.encryption.secret-key}") @Value("${kestra.encryption.secret-key}")
protected Optional<String> secretKey; protected Optional<String> secretKey;
/**
* Initializes the given {@link RunContext} for the given {@link Plugin}.
*
* @param runContext The {@link RunContext} to initialize.
* @param plugin The {@link TaskRunner} used for initialization.
* @return The {@link RunContext} to initialize
*/
public DefaultRunContext forPlugin(final DefaultRunContext runContext,
final Plugin plugin) {
runContext.init(applicationContext);
runContext.setPluginConfiguration(pluginConfigurations.getConfigurationByPluginTypeOrAliases(plugin.getType(), plugin.getClass()));
return runContext;
}
/** /**
* Initializes the given {@link RunContext} for the given {@link WorkerTask} for executor. * Initializes the given {@link RunContext} for the given {@link WorkerTask} for executor.
* *

View File

@@ -55,11 +55,11 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
public RunContextLogger(QueueInterface<LogEntry> logQueue, LogEntry logEntry, org.slf4j.event.Level loglevel, boolean logToFile) { public RunContextLogger(QueueInterface<LogEntry> logQueue, LogEntry logEntry, org.slf4j.event.Level loglevel, boolean logToFile) {
if (logEntry.getTaskId() != null) { if (logEntry.getTaskId() != null) {
this.loggerName = "flow." + logEntry.getFlowId() + "." + logEntry.getTaskId(); this.loggerName = baseLoggerName(logEntry) + "." + logEntry.getTaskId();
} else if (logEntry.getTriggerId() != null) { } else if (logEntry.getTriggerId() != null) {
this.loggerName = "flow." + logEntry.getFlowId() + "." + logEntry.getTriggerId(); this.loggerName = baseLoggerName(logEntry) + "." + logEntry.getTriggerId();
} else { } else {
this.loggerName = "flow." + logEntry.getFlowId(); this.loggerName = baseLoggerName(logEntry);
} }
this.logQueue = logQueue; this.logQueue = logQueue;
@@ -68,6 +68,10 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
this.logToFile = logToFile; this.logToFile = logToFile;
} }
private String baseLoggerName(LogEntry logEntry) {
return "flow." + logEntry.getTenantId() + "." + logEntry.getNamespace() + "." + logEntry.getFlowId();
}
private static List<LogEntry> logEntry(ILoggingEvent event, String message, org.slf4j.event.Level level, LogEntry logEntry) { private static List<LogEntry> logEntry(ILoggingEvent event, String message, org.slf4j.event.Level level, LogEntry logEntry) {
Iterable<String> split; Iterable<String> split;

View File

@@ -8,6 +8,7 @@ import io.micronaut.core.annotation.Nullable;
import io.pebbletemplates.pebble.PebbleEngine; import io.pebbletemplates.pebble.PebbleEngine;
import io.pebbletemplates.pebble.extension.Extension; import io.pebbletemplates.pebble.extension.Extension;
import io.pebbletemplates.pebble.extension.Function; import io.pebbletemplates.pebble.extension.Function;
import io.pebbletemplates.pebble.lexer.Syntax;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
@@ -37,6 +38,13 @@ public class PebbleEngineFactory {
return builder.build(); return builder.build();
} }
public PebbleEngine createWithCustomSyntax(Syntax syntax, Class<? extends Extension> extension) {
PebbleEngine.Builder builder = newPebbleEngineBuilder()
.syntax(syntax);
this.applicationContext.getBeansOfType(extension).forEach(builder::extension);
return builder.build();
}
public PebbleEngine createWithMaskedFunctions(VariableRenderer renderer, final List<String> functionsToMask) { public PebbleEngine createWithMaskedFunctions(VariableRenderer renderer, final List<String> functionsToMask) {
PebbleEngine.Builder builder = newPebbleEngineBuilder(); PebbleEngine.Builder builder = newPebbleEngineBuilder();

View File

@@ -35,6 +35,10 @@ public final class YamlParser {
return read(input, cls, type(cls)); return read(input, cls, type(cls));
} }
public static <T> T parse(String input, Class<T> cls, Boolean strict) {
return strict ? read(input, cls, type(cls)) : readNonStrict(input, cls, type(cls));
}
public static <T> T parse(Map<String, Object> input, Class<T> cls, Boolean strict) { public static <T> T parse(Map<String, Object> input, Class<T> cls, Boolean strict) {
ObjectMapper currentMapper = strict ? STRICT_MAPPER : NON_STRICT_MAPPER; ObjectMapper currentMapper = strict ? STRICT_MAPPER : NON_STRICT_MAPPER;
@@ -81,7 +85,31 @@ public final class YamlParser {
throw toConstraintViolationException(input, resource, e); throw toConstraintViolationException(input, resource, e);
} }
} }
private static <T> T readNonStrict(String input, Class<T> objectClass, String resource) {
try {
return NON_STRICT_MAPPER.readValue(input, objectClass);
} catch (JsonProcessingException e) {
throw toConstraintViolationException(input, resource, e);
}
}
private static String formatYamlErrorMessage(String originalMessage, JsonProcessingException e) {
StringBuilder friendlyMessage = new StringBuilder();
if (originalMessage.contains("Expected a field name")) {
friendlyMessage.append("YAML syntax error: Invalid structure. Check indentation and ensure all fields are properly formatted.");
} else if (originalMessage.contains("MappingStartEvent")) {
friendlyMessage.append("YAML syntax error: Unexpected mapping start. Verify that scalar values are properly quoted if needed.");
} else if (originalMessage.contains("Scalar value")) {
friendlyMessage.append("YAML syntax error: Expected a simple value but found complex structure. Check for unquoted special characters.");
} else {
friendlyMessage.append("YAML parsing error: ").append(originalMessage.replaceAll("org\\.yaml\\.snakeyaml.*", "").trim());
}
if (e.getLocation() != null) {
int line = e.getLocation().getLineNr();
friendlyMessage.append(String.format(" (at line %d)", line));
}
// Return a generic but cleaner message for other YAML errors
return friendlyMessage.toString();
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static <T> ConstraintViolationException toConstraintViolationException(T target, String resource, JsonProcessingException e) { public static <T> ConstraintViolationException toConstraintViolationException(T target, String resource, JsonProcessingException e) {
if (e.getCause() instanceof ConstraintViolationException constraintViolationException) { if (e.getCause() instanceof ConstraintViolationException constraintViolationException) {
@@ -121,11 +149,12 @@ public final class YamlParser {
) )
)); ));
} else { } else {
String userFriendlyMessage = formatYamlErrorMessage(e.getMessage(), e);
return new ConstraintViolationException( return new ConstraintViolationException(
"Illegal " + resource + " source: " + e.getMessage(), "Illegal " + resource + " source: " + userFriendlyMessage,
Collections.singleton( Collections.singleton(
ManualConstraintViolation.of( ManualConstraintViolation.of(
e.getCause() == null ? e.getMessage() : e.getMessage() + "\nCaused by: " + e.getCause().getMessage(), userFriendlyMessage,
target, target,
(Class<T>) target.getClass(), (Class<T>) target.getClass(),
"yaml", "yaml",
@@ -136,4 +165,3 @@ public final class YamlParser {
} }
} }
} }

View File

@@ -4,7 +4,6 @@ import com.cronutils.utils.VisibleForTesting;
import io.kestra.core.exceptions.InternalException; import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.conditions.Condition; import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ConditionContext; import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.conditions.ScheduleCondition;
import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowInterface; import io.kestra.core.models.flows.FlowInterface;
@@ -65,16 +64,6 @@ public class ConditionService {
return this.valid(flow, conditions, conditionContext); return this.valid(flow, conditions, conditionContext);
} }
/**
* Check that all conditions are valid.
* Warning, this method throws if a condition cannot be evaluated.
*/
public boolean isValid(List<ScheduleCondition> conditions, ConditionContext conditionContext) throws InternalException {
return conditions
.stream()
.allMatch(throwPredicate(condition -> condition.test(conditionContext)));
}
/** /**
* Check that all conditions are valid. * Check that all conditions are valid.
* Warning, this method throws if a condition cannot be evaluated. * Warning, this method throws if a condition cannot be evaluated.

View File

@@ -754,7 +754,7 @@ public class ExecutionService {
var parentTaskRun = execution.findTaskRunByTaskRunId(taskRun.getParentTaskRunId()); var parentTaskRun = execution.findTaskRunByTaskRunId(taskRun.getParentTaskRunId());
Execution newExecution = execution; Execution newExecution = execution;
if (parentTaskRun.getState().getCurrent() != State.Type.KILLED) { if (parentTaskRun.getState().getCurrent() != State.Type.KILLED) {
newExecution = newExecution.withTaskRun(parentTaskRun.withState(State.Type.KILLED)); newExecution = newExecution.withTaskRun(parentTaskRun.withStateAndAttempt(State.Type.KILLED));
} }
if (parentTaskRun.getParentTaskRunId() != null) { if (parentTaskRun.getParentTaskRunId() != null) {
return killParentTaskruns(parentTaskRun, newExecution); return killParentTaskruns(parentTaskRun, newExecution);

View File

@@ -92,7 +92,14 @@ public class FlowService {
return flowRepository return flowRepository
.orElseThrow(() -> new IllegalStateException("Cannot perform operation on flow. Cause: No FlowRepository")); .orElseThrow(() -> new IllegalStateException("Cannot perform operation on flow. Cause: No FlowRepository"));
} }
private static String formatValidationError(String message) {
if (message.startsWith("Illegal flow source:")) {
// Already formatted by YamlParser, return as-is
return message;
}
// For other validation errors, provide context
return "Validation error: " + message;
}
/** /**
* Evaluates all checks defined in the given flow using the provided inputs. * Evaluates all checks defined in the given flow using the provided inputs.
* <p> * <p>
@@ -174,10 +181,12 @@ public class FlowService {
modelValidator.validate(pluginDefaultService.injectAllDefaults(flow, false)); modelValidator.validate(pluginDefaultService.injectAllDefaults(flow, false));
} catch (ConstraintViolationException e) { } catch (ConstraintViolationException e) {
validateConstraintViolationBuilder.constraints(e.getMessage()); String friendlyMessage = formatValidationError(e.getMessage());
validateConstraintViolationBuilder.constraints(friendlyMessage);
} catch (FlowProcessingException e) { } catch (FlowProcessingException e) {
if (e.getCause() instanceof ConstraintViolationException) { if (e.getCause() instanceof ConstraintViolationException cve) {
validateConstraintViolationBuilder.constraints(e.getMessage()); String friendlyMessage = formatValidationError(cve.getMessage());
validateConstraintViolationBuilder.constraints(friendlyMessage);
} else { } else {
Throwable cause = e.getCause() != null ? e.getCause() : e; Throwable cause = e.getCause() != null ? e.getCause() : e;
validateConstraintViolationBuilder.constraints("Unable to validate the flow: " + cause.getMessage()); validateConstraintViolationBuilder.constraints("Unable to validate the flow: " + cause.getMessage());
@@ -579,4 +588,4 @@ public class FlowService {
private IllegalStateException noRepositoryException() { private IllegalStateException noRepositoryException() {
return new IllegalStateException("No repository found. Make sure the `kestra.repository.type` property is set."); return new IllegalStateException("No repository found. Make sure the `kestra.repository.type` property is set.");
} }
} }

View File

@@ -1,6 +1,5 @@
package io.kestra.core.storages; package io.kestra.core.storages;
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
import io.kestra.core.services.NamespaceService; import io.kestra.core.services.NamespaceService;
import jakarta.annotation.Nullable; import jakarta.annotation.Nullable;
import org.slf4j.Logger; import org.slf4j.Logger;
@@ -272,7 +271,13 @@ public class InternalStorage implements Storage {
return this.storage.put(context.getTenantId(), context.getNamespace(), resolve, new BufferedInputStream(inputStream)); return this.storage.put(context.getTenantId(), context.getNamespace(), resolve, new BufferedInputStream(inputStream));
} }
@Override
public Optional<StorageContext.Task> getTaskStorageContext() { public Optional<StorageContext.Task> getTaskStorageContext() {
return Optional.ofNullable((context instanceof StorageContext.Task task) ? task : null); return Optional.ofNullable((context instanceof StorageContext.Task task) ? task : null);
} }
@Override
public List<FileAttributes> list(URI uri) throws IOException {
return this.storage.list(context.getTenantId(), context.getNamespace(), uri);
}
} }

View File

@@ -173,4 +173,6 @@ public interface Storage {
* @return the task storage context * @return the task storage context
*/ */
Optional<StorageContext.Task> getTaskStorageContext(); Optional<StorageContext.Task> getTaskStorageContext();
List<FileAttributes> list(URI uri) throws IOException;
} }

View File

@@ -1,7 +1,7 @@
package io.kestra.core.test; package io.kestra.core.test;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import io.kestra.core.models.DeletedInterface; import io.kestra.core.models.SoftDeletable;
import io.kestra.core.models.HasSource; import io.kestra.core.models.HasSource;
import io.kestra.core.models.HasUID; import io.kestra.core.models.HasUID;
import io.kestra.core.models.TenantInterface; import io.kestra.core.models.TenantInterface;
@@ -25,7 +25,7 @@ import java.util.List;
@ToString @ToString
@EqualsAndHashCode @EqualsAndHashCode
@TestSuiteValidation @TestSuiteValidation
public class TestSuite implements HasUID, TenantInterface, DeletedInterface, HasSource { public class TestSuite implements HasUID, TenantInterface, SoftDeletable<TestSuite>, HasSource {
@NotNull @NotNull
@NotBlank @NotBlank
@@ -85,10 +85,6 @@ public class TestSuite implements HasUID, TenantInterface, DeletedInterface, Has
); );
} }
public TestSuite delete() {
return this.toBuilder().deleted(true).build();
}
public TestSuite disable() { public TestSuite disable() {
var disabled = true; var disabled = true;
return this.toBuilder() return this.toBuilder()
@@ -120,4 +116,9 @@ public class TestSuite implements HasUID, TenantInterface, DeletedInterface, Has
return yamlSource + String.format("\ndisabled: %s", disabled); return yamlSource + String.format("\ndisabled: %s", disabled);
} }
@Override
public TestSuite toDeleted() {
return toBuilder().deleted(true).build();
}
} }

View File

@@ -1,6 +1,6 @@
package io.kestra.core.test; package io.kestra.core.test;
import io.kestra.core.models.DeletedInterface; import io.kestra.core.models.SoftDeletable;
import io.kestra.core.models.HasUID; import io.kestra.core.models.HasUID;
import io.kestra.core.models.TenantInterface; import io.kestra.core.models.TenantInterface;
import io.kestra.core.test.flow.UnitTestResult; import io.kestra.core.test.flow.UnitTestResult;
@@ -24,7 +24,7 @@ public record TestSuiteRunEntity(
String flowId, String flowId,
TestState state, TestState state,
List<UnitTestResult> results List<UnitTestResult> results
) implements DeletedInterface, TenantInterface, HasUID { ) implements SoftDeletable<TestSuiteRunEntity>, TenantInterface, HasUID {
public static TestSuiteRunEntity create(String tenantId, TestSuiteUid testSuiteUid, TestSuiteRunResult testSuiteRunResult) { public static TestSuiteRunEntity create(String tenantId, TestSuiteUid testSuiteUid, TestSuiteRunResult testSuiteRunResult) {
return new TestSuiteRunEntity( return new TestSuiteRunEntity(
@@ -43,23 +43,6 @@ public record TestSuiteRunEntity(
); );
} }
public TestSuiteRunEntity delete() {
return new TestSuiteRunEntity(
this.uid,
this.id,
this.tenantId,
true,
this.startDate,
this.endDate,
this.testSuiteId,
this.testSuiteUid,
this.namespace,
this.flowId,
this.state,
this.results
);
}
/** /**
* only used for backup * only used for backup
* @param newTenantId the tenant to migrate to * @param newTenantId the tenant to migrate to
@@ -86,6 +69,24 @@ public record TestSuiteRunEntity(
return this.deleted; return this.deleted;
} }
@Override
public TestSuiteRunEntity toDeleted() {
return new TestSuiteRunEntity(
this.uid,
this.id,
this.tenantId,
true,
this.startDate,
this.endDate,
this.testSuiteId,
this.testSuiteUid,
this.namespace,
this.flowId,
this.state,
this.results
);
}
@Override @Override
public String getTenantId() { public String getTenantId() {
return this.tenantId; return this.tenantId;

View File

@@ -1,13 +1,39 @@
package io.kestra.core.utils; package io.kestra.core.utils;
import io.kestra.core.models.Setting;
import io.kestra.core.repositories.SettingRepositoryInterface;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
import java.util.Optional;
@Singleton @Singleton
public class EditionProvider { public class EditionProvider {
public Edition get() { public Edition get() {
return Edition.OSS; return Edition.OSS;
} }
@Inject
private Optional<SettingRepositoryInterface> settingRepository; // repositories are not always there on unit tests
@PostConstruct
void start() {
// check the edition in the settings and update if needed, we didn't use it would allow us to detect incompatible update later if needed
settingRepository.ifPresent(settingRepositoryInterface -> persistEdition(settingRepositoryInterface, get()));
}
private void persistEdition(SettingRepositoryInterface settingRepositoryInterface, Edition edition) {
Optional<Setting> versionSetting = settingRepositoryInterface.findByKey(Setting.INSTANCE_EDITION);
if (versionSetting.isEmpty() || !versionSetting.get().getValue().equals(edition)) {
settingRepositoryInterface.save(Setting.builder()
.key(Setting.INSTANCE_EDITION)
.value(edition)
.build()
);
}
}
public enum Edition { public enum Edition {
OSS, OSS,
EE EE

View File

@@ -11,6 +11,11 @@ import jakarta.inject.Inject;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
/**
* Utility class to create {@link java.util.concurrent.ExecutorService} with {@link java.util.concurrent.ExecutorService} instances.
* WARNING: those instances will use the {@link ThreadUncaughtExceptionHandler} which terminates Kestra if an error occurs in any thread,
* so it should not be used inside plugins.
*/
@Singleton @Singleton
@Slf4j @Slf4j
public class ExecutorsUtils { public class ExecutorsUtils {

View File

@@ -10,7 +10,7 @@ import org.slf4j.LoggerFactory;
import org.slf4j.event.Level; import org.slf4j.event.Level;
/** /**
* Utility class for logging * Utility class for server logging
*/ */
public final class Logs { public final class Logs {
@@ -18,7 +18,7 @@ public final class Logs {
private static final String EXECUTION_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[execution: {}] "; private static final String EXECUTION_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[execution: {}] ";
private static final String TRIGGER_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[trigger: {}] "; private static final String TRIGGER_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[trigger: {}] ";
private static final String TASKRUN_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[task: {}] [execution: {}] [taskrun: {}] "; private static final String TASKRUN_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[task: {}] [execution: {}] [taskrun: {}] ";
private Logs() {} private Logs() {}
public static void logExecution(FlowId flow, Logger logger, Level level, String message, Object... args) { public static void logExecution(FlowId flow, Logger logger, Level level, String message, Object... args) {
@@ -29,7 +29,7 @@ public final class Logs {
} }
/** /**
* Log an {@link Execution} via the execution logger named: 'execution.{flowId}'. * Log an {@link Execution} via the executor logger named: 'executor.{tenantId}.{namespace}.{flowId}'.
*/ */
public static void logExecution(Execution execution, Level level, String message, Object... args) { public static void logExecution(Execution execution, Level level, String message, Object... args) {
Logger logger = logger(execution); Logger logger = logger(execution);
@@ -43,7 +43,7 @@ public final class Logs {
} }
/** /**
* Log a {@link TriggerContext} via the trigger logger named: 'trigger.{flowId}.{triggereId}'. * Log a {@link TriggerContext} via the scheduler logger named: 'trigger.{tenantId}.{namespace}.{flowId}.{triggerId}'.
*/ */
public static void logTrigger(TriggerContext triggerContext, Level level, String message, Object... args) { public static void logTrigger(TriggerContext triggerContext, Level level, String message, Object... args) {
Logger logger = logger(triggerContext); Logger logger = logger(triggerContext);
@@ -57,7 +57,7 @@ public final class Logs {
} }
/** /**
* Log a {@link TaskRun} via the taskRun logger named: 'task.{flowId}.{taskId}'. * Log a {@link TaskRun} via the worker logger named: 'worker.{tenantId}.{namespace}.{flowId}.{taskId}'.
*/ */
public static void logTaskRun(TaskRun taskRun, Level level, String message, Object... args) { public static void logTaskRun(TaskRun taskRun, Level level, String message, Object... args) {
String prefix = TASKRUN_PREFIX_WITH_TENANT; String prefix = TASKRUN_PREFIX_WITH_TENANT;
@@ -73,19 +73,19 @@ public final class Logs {
private static Logger logger(TaskRun taskRun) { private static Logger logger(TaskRun taskRun) {
return LoggerFactory.getLogger( return LoggerFactory.getLogger(
"task." + taskRun.getFlowId() + "." + taskRun.getTaskId() "worker." + taskRun.getTenantId() + "." + taskRun.getNamespace() + "." + taskRun.getFlowId() + "." + taskRun.getTaskId()
); );
} }
private static Logger logger(TriggerContext triggerContext) { private static Logger logger(TriggerContext triggerContext) {
return LoggerFactory.getLogger( return LoggerFactory.getLogger(
"trigger." + triggerContext.getFlowId() + "." + triggerContext.getTriggerId() "scheduler." + triggerContext.getTenantId() + "." + triggerContext.getNamespace() + "." + triggerContext.getFlowId() + "." + triggerContext.getTriggerId()
); );
} }
private static Logger logger(Execution execution) { private static Logger logger(Execution execution) {
return LoggerFactory.getLogger( return LoggerFactory.getLogger(
"execution." + execution.getFlowId() "executor." + execution.getTenantId() + "." + execution.getNamespace() + "." + execution.getFlowId()
); );
} }
} }

View File

@@ -120,7 +120,10 @@ public class MapUtils {
private static Collection<?> mergeCollections(Collection<?> colA, Collection<?> colB) { private static Collection<?> mergeCollections(Collection<?> colA, Collection<?> colB) {
List<Object> merged = new ArrayList<>(colA.size() + colB.size()); List<Object> merged = new ArrayList<>(colA.size() + colB.size());
merged.addAll(colA); merged.addAll(colA);
merged.addAll(colB); if (!colB.isEmpty()) {
List<?> filtered = colB.stream().filter(it -> !colA.contains(it)).toList();
merged.addAll(filtered);
}
return merged; return merged;
} }

View File

@@ -1,14 +1,12 @@
package io.kestra.core.utils; package io.kestra.core.utils;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.kestra.core.models.executions.metrics.Counter; import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.executions.metrics.Timer; import io.kestra.core.models.executions.metrics.Timer;
import io.kestra.core.models.tasks.FileExistComportment; import io.kestra.core.models.tasks.FileExistComportment;
import io.kestra.core.models.tasks.NamespaceFiles; import io.kestra.core.models.tasks.NamespaceFiles;
import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContext;
import io.kestra.core.storages.NamespaceFile; import io.kestra.core.storages.NamespaceFile;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DurationFormatUtils; import org.apache.commons.lang3.time.DurationFormatUtils;
import org.apache.commons.lang3.time.StopWatch; import org.apache.commons.lang3.time.StopWatch;
@@ -19,28 +17,27 @@ import java.io.InputStream;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.concurrent.*;
import java.util.concurrent.ExecutorService;
import static io.kestra.core.utils.Rethrow.throwConsumer; import static io.kestra.core.utils.Rethrow.throwConsumer;
@Singleton public final class NamespaceFilesUtils {
public class NamespaceFilesUtils { private static final int maxThreads = Math.max(Runtime.getRuntime().availableProcessors() * 4, 32);
@Inject private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
private ExecutorsUtils executorsUtils; 0,
maxThreads,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new ThreadFactoryBuilder().setNameFormat("namespace-files").build()
);;
private ExecutorService executorService; private NamespaceFilesUtils() {
private int maxThreads; // utility class pattern
@PostConstruct
public void postConstruct() {
this.maxThreads = Math.max(Runtime.getRuntime().availableProcessors() * 4, 32);
this.executorService = executorsUtils.maxCachedThreadPool(maxThreads, "namespace-file");
} }
public void loadNamespaceFiles( public static void loadNamespaceFiles(
RunContext runContext, RunContext runContext,
NamespaceFiles namespaceFiles NamespaceFiles namespaceFiles
) )
@@ -69,7 +66,7 @@ public class NamespaceFilesUtils {
int parallelism = maxThreads / 2; int parallelism = maxThreads / 2;
Flux.fromIterable(matchedNamespaceFiles) Flux.fromIterable(matchedNamespaceFiles)
.parallel(parallelism) .parallel(parallelism)
.runOn(Schedulers.fromExecutorService(executorService)) .runOn(Schedulers.fromExecutorService(EXECUTOR_SERVICE))
.doOnNext(throwConsumer(nsFile -> { .doOnNext(throwConsumer(nsFile -> {
InputStream content = runContext.storage().getFile(nsFile.uri()); InputStream content = runContext.storage().getFile(nsFile.uri());
Path path = folderPerNamespace ? Path path = folderPerNamespace ?

View File

@@ -23,7 +23,6 @@ import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
import io.kestra.core.services.StorageService; import io.kestra.core.services.StorageService;
import io.kestra.core.storages.FileAttributes; import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.StorageContext; import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.StorageSplitInterface; import io.kestra.core.storages.StorageSplitInterface;
import io.kestra.core.utils.GraphUtils; import io.kestra.core.utils.GraphUtils;
import io.kestra.core.validations.NoSystemLabelValidation; import io.kestra.core.validations.NoSystemLabelValidation;
@@ -540,7 +539,7 @@ public class ForEachItem extends Task implements FlowableTask<VoidOutput>, Child
.numberOfBatches((Integer) taskRun.getOutputs().get(ExecutableUtils.TASK_VARIABLE_NUMBER_OF_BATCHES)); .numberOfBatches((Integer) taskRun.getOutputs().get(ExecutableUtils.TASK_VARIABLE_NUMBER_OF_BATCHES));
try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
FileSerde.write(bos, FlowInputOutput.renderFlowOutputs(flow.getOutputs(), runContext)); FileSerde.write(bos, runContext.inputAndOutput().renderOutputs(flow.getOutputs()));
URI uri = runContext.storage().putFile( URI uri = runContext.storage().putFile(
new ByteArrayInputStream(bos.toByteArray()), new ByteArrayInputStream(bos.toByteArray()),
URI.create((String) taskRun.getOutputs().get("uri")) URI.create((String) taskRun.getOutputs().get("uri"))
@@ -602,9 +601,8 @@ public class ForEachItem extends Task implements FlowableTask<VoidOutput>, Child
String subflowOutputsBase = (String) taskOutput.get(ExecutableUtils.TASK_VARIABLE_SUBFLOW_OUTPUTS_BASE_URI); String subflowOutputsBase = (String) taskOutput.get(ExecutableUtils.TASK_VARIABLE_SUBFLOW_OUTPUTS_BASE_URI);
URI subflowOutputsBaseUri = URI.create(StorageContext.KESTRA_PROTOCOL + subflowOutputsBase + "/"); URI subflowOutputsBaseUri = URI.create(StorageContext.KESTRA_PROTOCOL + subflowOutputsBase + "/");
StorageInterface storage = ((DefaultRunContext) runContext).getApplicationContext().getBean(StorageInterface.class); if (runContext.storage().isFileExist(subflowOutputsBaseUri)) {
if (storage.exists(runContext.flowInfo().tenantId(), runContext.flowInfo().namespace(), subflowOutputsBaseUri)) { List<FileAttributes> list = runContext.storage().list(subflowOutputsBaseUri);;
List<FileAttributes> list = storage.list(runContext.flowInfo().tenantId(), runContext.flowInfo().namespace(), subflowOutputsBaseUri);
if (!list.isEmpty()) { if (!list.isEmpty()) {
// Merge outputs from each sub-flow into a single stored in the internal storage. // Merge outputs from each sub-flow into a single stored in the internal storage.

View File

@@ -63,7 +63,8 @@ import java.util.*;
- id: run_post_approval - id: run_post_approval
type: io.kestra.plugin.scripts.shell.Commands type: io.kestra.plugin.scripts.shell.Commands
runner: PROCESS taskRunner:
type: io.kestra.plugin.core.runner.Process
commands: commands:
- echo "Manual approval received! Continuing the execution..." - echo "Manual approval received! Continuing the execution..."

View File

@@ -18,7 +18,6 @@ import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.DefaultRunContext; import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.ExecutableUtils; import io.kestra.core.runners.ExecutableUtils;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.FlowMetaStoreInterface; import io.kestra.core.runners.FlowMetaStoreInterface;
import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.SubflowExecution; import io.kestra.core.runners.SubflowExecution;
@@ -38,7 +37,6 @@ import lombok.Getter;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.ToString; import lombok.ToString;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
import org.slf4j.event.Level;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.util.Collections; import java.util.Collections;
@@ -246,11 +244,11 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
if (subflowOutputs != null && !subflowOutputs.isEmpty()) { if (subflowOutputs != null && !subflowOutputs.isEmpty()) {
try { try {
Map<String, Object> rOutputs = FlowInputOutput.renderFlowOutputs(subflowOutputs, runContext); var inputAndOutput = runContext.inputAndOutput();
Map<String, Object> rOutputs = inputAndOutput.renderOutputs(subflowOutputs);
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking if (flow.getOutputs() != null) {
if (flow.getOutputs() != null && flowInputOutput != null) { rOutputs = inputAndOutput.typedOutputs(flow, execution, rOutputs);
rOutputs = flowInputOutput.typedOutputs(flow, execution, rOutputs);
} }
builder.outputs(rOutputs); builder.outputs(rOutputs);
} catch (Exception e) { } catch (Exception e) {

View File

@@ -260,8 +260,7 @@ public class WorkingDirectory extends Sequential implements NamespaceFilesInterf
} }
if (this.namespaceFiles != null && !Boolean.FALSE.equals(runContext.render(this.namespaceFiles.getEnabled()).as(Boolean.class).orElse(true))) { if (this.namespaceFiles != null && !Boolean.FALSE.equals(runContext.render(this.namespaceFiles.getEnabled()).as(Boolean.class).orElse(true))) {
NamespaceFilesUtils namespaceFilesUtils = ((DefaultRunContext) runContext).getApplicationContext().getBean(NamespaceFilesUtils.class); NamespaceFilesUtils.loadNamespaceFiles(runContext, this.namespaceFiles);
namespaceFilesUtils.loadNamespaceFiles(runContext, this.namespaceFiles);
} }
if (this.inputFiles != null) { if (this.inputFiles != null) {

View File

@@ -2,10 +2,8 @@ package io.kestra.plugin.core.namespace;
import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
import io.kestra.core.storages.Namespace; import io.kestra.core.storages.Namespace;
import io.kestra.core.storages.NamespaceFile; import io.kestra.core.storages.NamespaceFile;
import io.kestra.plugin.core.kv.Version;
import io.micronaut.core.annotation.Introspected; import io.micronaut.core.annotation.Introspected;
import lombok.Getter; import lombok.Getter;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;

View File

@@ -0,0 +1,107 @@
package io.kestra.plugin.core.trigger;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.Label;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionTrigger;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.Backfill;
import io.kestra.core.models.triggers.Schedulable;
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.services.LabelService;
import io.kestra.core.utils.ListUtils;
import java.time.ZonedDateTime;
import java.time.chrono.ChronoZonedDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* Factory class for constructing a new {@link Execution} from a {@link Schedulable} trigger.
*
* @see io.kestra.plugin.core.trigger.Schedule
* @see io.kestra.plugin.core.trigger.ScheduleOnDates
*/
final class SchedulableExecutionFactory {
static Execution createFailedExecution(Schedulable trigger, ConditionContext conditionContext, TriggerContext triggerContext) throws IllegalVariableEvaluationException {
return Execution.builder()
.id(conditionContext.getRunContext().getTriggerExecutionId())
.tenantId(triggerContext.getTenantId())
.namespace(triggerContext.getNamespace())
.flowId(triggerContext.getFlowId())
.flowRevision(conditionContext.getFlow().getRevision())
.labels(SchedulableExecutionFactory.getLabels(trigger, conditionContext.getRunContext(), triggerContext.getBackfill(), conditionContext.getFlow()))
.state(new State().withState(State.Type.FAILED))
.build();
}
static Execution createExecution(Schedulable trigger, ConditionContext conditionContext, TriggerContext triggerContext, Map<String, Object> variables, ZonedDateTime scheduleDate) throws IllegalVariableEvaluationException {
RunContext runContext = conditionContext.getRunContext();
ExecutionTrigger executionTrigger = ExecutionTrigger.of((AbstractTrigger) trigger, variables);
List<Label> labels = getLabels(trigger, runContext, triggerContext.getBackfill(), conditionContext.getFlow());
List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(labels));
executionLabels.add(new Label(Label.FROM, "trigger"));
if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) {
// add a correlation ID if none exist
executionLabels.add(new Label(Label.CORRELATION_ID, runContext.getTriggerExecutionId()));
}
Execution execution = Execution.builder()
.id(runContext.getTriggerExecutionId())
.tenantId(triggerContext.getTenantId())
.namespace(triggerContext.getNamespace())
.flowId(triggerContext.getFlowId())
.flowRevision(conditionContext.getFlow().getRevision())
.variables(conditionContext.getFlow().getVariables())
.labels(executionLabels)
.state(new State())
.trigger(executionTrigger)
.scheduleDate(Optional.ofNullable(scheduleDate).map(ChronoZonedDateTime::toInstant).orElse(null))
.build();
Map<String, Object> allInputs = getInputs(trigger, runContext, triggerContext.getBackfill());
// add inputs and inject defaults (FlowInputOutput handles defaults internally)
execution = execution.withInputs(runContext.inputAndOutput().readInputs(conditionContext.getFlow(), execution, allInputs));
return execution;
}
private static Map<String, Object> getInputs(Schedulable trigger, RunContext runContext, Backfill backfill) throws IllegalVariableEvaluationException {
Map<String, Object> inputs = new HashMap<>();
if (trigger.getInputs() != null) {
inputs.putAll(runContext.render(trigger.getInputs()));
}
if (backfill != null && backfill.getInputs() != null) {
inputs.putAll(runContext.render(backfill.getInputs()));
}
return inputs;
}
private static List<Label> getLabels(Schedulable trigger, RunContext runContext, Backfill backfill, FlowInterface flow) throws IllegalVariableEvaluationException {
List<Label> labels = LabelService.fromTrigger(runContext, flow, (AbstractTrigger) trigger);
if (backfill != null && backfill.getLabels() != null) {
for (Label label : backfill.getLabels()) {
final var value = runContext.render(label.value());
if (value != null) {
labels.add(new Label(label.key(), value));
}
}
}
return labels;
}
}

View File

@@ -6,9 +6,7 @@ import com.cronutils.model.time.ExecutionTime;
import com.cronutils.parser.CronParser; import com.cronutils.parser.CronParser;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.InternalException; import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.Label;
import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.annotations.PluginProperty;
@@ -16,12 +14,8 @@ import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ConditionContext; import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.conditions.ScheduleCondition; import io.kestra.core.models.conditions.ScheduleCondition;
import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.*; import io.kestra.core.models.triggers.*;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContext;
import io.kestra.core.services.ConditionService;
import io.kestra.core.services.LabelService;
import io.kestra.core.utils.ListUtils; import io.kestra.core.utils.ListUtils;
import io.kestra.core.validations.ScheduleValidation; import io.kestra.core.validations.ScheduleValidation;
import io.kestra.core.validations.TimezoneId; import io.kestra.core.validations.TimezoneId;
@@ -29,6 +23,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid; import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Null; import jakarta.validation.constraints.Null;
import lombok.AccessLevel;
import lombok.*; import lombok.*;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@@ -40,6 +35,8 @@ import java.time.temporal.ChronoUnit;
import java.util.*; import java.util.*;
import java.util.stream.Stream; import java.util.stream.Stream;
import static io.kestra.core.utils.Rethrow.throwPredicate;
@Slf4j @Slf4j
@SuperBuilder @SuperBuilder
@ToString @ToString
@@ -224,11 +221,7 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
@PluginProperty @PluginProperty
@Deprecated @Deprecated
private List<ScheduleCondition> scheduleConditions; private List<ScheduleCondition> scheduleConditions;
@Schema(
title = "The inputs to pass to the scheduled flow"
)
@PluginProperty(dynamic = true)
private Map<String, Object> inputs; private Map<String, Object> inputs;
@Schema( @Schema(
@@ -248,13 +241,7 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
@PluginProperty @PluginProperty
@Deprecated @Deprecated
private Map<String, Object> backfill; private Map<String, Object> backfill;
@Schema(
title = "Action to take in the case of missed schedules",
description = "`ALL` will recover all missed schedules, `LAST` will only recovered the last missing one, `NONE` will not recover any missing schedule.\n" +
"The default is `ALL` unless a different value is configured using the global plugin configuration."
)
@PluginProperty
private RecoverMissedSchedules recoverMissedSchedules; private RecoverMissedSchedules recoverMissedSchedules;
@Override @Override
@@ -403,20 +390,11 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
if (!conditionResults) { if (!conditionResults) {
return Optional.empty(); return Optional.empty();
} }
} catch(InternalException ie) { } catch (InternalException ie) {
// validate schedule condition can fail to render variables // validate schedule condition can fail to render variables
// in this case, we return a failed execution so the trigger is not evaluated each second // in this case, we return a failed execution so the trigger is not evaluated each second
runContext.logger().error("Unable to evaluate the Schedule trigger '{}'", this.getId(), ie); runContext.logger().error("Unable to evaluate the Schedule trigger '{}'", this.getId(), ie);
Execution execution = Execution.builder() return Optional.of(SchedulableExecutionFactory.createFailedExecution(this, conditionContext, triggerContext));
.id(runContext.getTriggerExecutionId())
.tenantId(triggerContext.getTenantId())
.namespace(triggerContext.getNamespace())
.flowId(triggerContext.getFlowId())
.flowRevision(conditionContext.getFlow().getRevision())
.labels(generateLabels(runContext, conditionContext, backfill))
.state(new State().withState(State.Type.FAILED))
.build();
return Optional.of(execution);
} }
// recalculate true output for previous and next based on conditions // recalculate true output for previous and next based on conditions
@@ -430,14 +408,12 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
variables = scheduleDates.toMap(); variables = scheduleDates.toMap();
} }
Execution execution = TriggerService.generateScheduledExecution( Execution execution = SchedulableExecutionFactory.createExecution(
this, this,
conditionContext, conditionContext,
triggerContext, triggerContext,
generateLabels(runContext, conditionContext, backfill),
generateInputs(runContext, backfill),
variables, variables,
Optional.empty() null
); );
return Optional.of(execution); return Optional.of(execution);
@@ -448,34 +424,6 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
return parser.parse(this.cron); return parser.parse(this.cron);
} }
private List<Label> generateLabels(RunContext runContext, ConditionContext conditionContext, Backfill backfill) throws IllegalVariableEvaluationException {
List<Label> labels = LabelService.fromTrigger(runContext, conditionContext.getFlow(), this);
if (backfill != null && backfill.getLabels() != null) {
for (Label label : backfill.getLabels()) {
final var value = runContext.render(label.value());
if (value != null) {
labels.add(new Label(label.key(), value));
}
}
}
return labels;
}
private Map<String, Object> generateInputs(RunContext runContext, Backfill backfill) throws IllegalVariableEvaluationException {
Map<String, Object> inputs = new HashMap<>();
if (this.inputs != null) {
inputs.putAll(runContext.render(this.inputs));
}
if (backfill != null && backfill.getInputs() != null) {
inputs.putAll(runContext.render(backfill.getInputs()));
}
return inputs;
}
private Optional<Output> scheduleDates(ExecutionTime executionTime, ZonedDateTime date) { private Optional<Output> scheduleDates(ExecutionTime executionTime, ZonedDateTime date) {
Optional<ZonedDateTime> next = executionTime.nextExecution(date.minus(Duration.ofSeconds(1))); Optional<ZonedDateTime> next = executionTime.nextExecution(date.minus(Duration.ofSeconds(1)));
@@ -549,9 +497,9 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
Optional<ZonedDateTime> truePreviousNextDateWithCondition(ExecutionTime executionTime, ConditionContext conditionContext, ZonedDateTime toTestDate, boolean next) throws InternalException { Optional<ZonedDateTime> truePreviousNextDateWithCondition(ExecutionTime executionTime, ConditionContext conditionContext, ZonedDateTime toTestDate, boolean next) throws InternalException {
int upperYearBound = ZonedDateTime.now().getYear() + 10; int upperYearBound = ZonedDateTime.now().getYear() + 10;
int lowerYearBound = ZonedDateTime.now().getYear() - 10; int lowerYearBound = ZonedDateTime.now().getYear() - 10;
while ((next && toTestDate.getYear() < upperYearBound) || (!next && toTestDate.getYear() > lowerYearBound)) { while ((next && toTestDate.getYear() < upperYearBound) || (!next && toTestDate.getYear() > lowerYearBound)) {
Optional<ZonedDateTime> currentDate = next ? Optional<ZonedDateTime> currentDate = next ?
executionTime.nextExecution(toTestDate) : executionTime.nextExecution(toTestDate) :
executionTime.lastExecution(toTestDate); executionTime.lastExecution(toTestDate);
@@ -607,11 +555,10 @@ public class Schedule extends AbstractTrigger implements Schedulable, TriggerOut
private boolean validateScheduleCondition(ConditionContext conditionContext) throws InternalException { private boolean validateScheduleCondition(ConditionContext conditionContext) throws InternalException {
if (conditions != null) { if (conditions != null) {
ConditionService conditionService = ((DefaultRunContext)conditionContext.getRunContext()).getApplicationContext().getBean(ConditionService.class); return conditions.stream()
return conditionService.isValid( .filter(c -> c instanceof ScheduleCondition)
conditions.stream().filter(c -> c instanceof ScheduleCondition).map(c -> (ScheduleCondition) c).toList(), .map(c -> (ScheduleCondition) c)
conditionContext .allMatch(throwPredicate(condition -> condition.test(conditionContext)));
);
} }
return true; return true;

View File

@@ -10,7 +10,6 @@ import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.VoidOutput; import io.kestra.core.models.tasks.VoidOutput;
import io.kestra.core.models.triggers.*; import io.kestra.core.models.triggers.*;
import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContext;
import io.kestra.core.services.LabelService;
import io.kestra.core.validations.TimezoneId; import io.kestra.core.validations.TimezoneId;
import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.NotNull;
@@ -23,7 +22,10 @@ import java.time.Duration;
import java.time.ZoneId; import java.time.ZoneId;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.*; import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate; import java.util.function.Predicate;
import static io.kestra.core.utils.Rethrow.throwFunction; import static io.kestra.core.utils.Rethrow.throwFunction;
@@ -45,11 +47,7 @@ public class ScheduleOnDates extends AbstractTrigger implements Schedulable, Tri
@Builder.Default @Builder.Default
@Null @Null
private final Duration interval = null; private final Duration interval = null;
@Schema(
title = "The inputs to pass to the scheduled flow"
)
@PluginProperty(dynamic = true)
private Map<String, Object> inputs; private Map<String, Object> inputs;
@TimezoneId @TimezoneId
@@ -63,31 +61,24 @@ public class ScheduleOnDates extends AbstractTrigger implements Schedulable, Tri
@NotNull @NotNull
private Property<List<ZonedDateTime>> dates; private Property<List<ZonedDateTime>> dates;
@Schema(
title = "Action to take in the case of missed schedules",
description = "`ALL` will recover all missed schedules, `LAST` will only recovered the last missing one, `NONE` will not recover any missing schedule.\n" +
"The default is `ALL` unless a different value is configured using the global plugin configuration."
)
@PluginProperty
private RecoverMissedSchedules recoverMissedSchedules; private RecoverMissedSchedules recoverMissedSchedules;
@Override @Override
public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext triggerContext) throws Exception { public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext triggerContext) throws Exception {
RunContext runContext = conditionContext.getRunContext(); RunContext runContext = conditionContext.getRunContext();
ZonedDateTime lastEvaluation = triggerContext.getDate(); ZonedDateTime lastEvaluation = triggerContext.getDate();
Optional<ZonedDateTime> nextDate = nextDate(runContext, date -> date.isEqual(lastEvaluation) || date.isAfter(lastEvaluation)); Optional<ZonedDateTime> nextDate = nextDate(runContext, date -> date.isEqual(lastEvaluation) || date.isAfter(lastEvaluation));
if (nextDate.isPresent()) { if (nextDate.isPresent()) {
log.info("Schedule execution on {}", nextDate.get()); log.info("Schedule execution on {}", nextDate.get());
Execution execution = TriggerService.generateScheduledExecution( Execution execution = SchedulableExecutionFactory.createExecution(
this, this,
conditionContext, conditionContext,
triggerContext, triggerContext,
LabelService.fromTrigger(runContext, conditionContext.getFlow(), this),
this.inputs != null ? runContext.render(this.inputs) : Collections.emptyMap(),
Collections.emptyMap(), Collections.emptyMap(),
nextDate nextDate.orElse(null)
); );
return Optional.of(execution); return Optional.of(execution);
@@ -97,29 +88,21 @@ public class ScheduleOnDates extends AbstractTrigger implements Schedulable, Tri
} }
@Override @Override
public ZonedDateTime nextEvaluationDate(ConditionContext conditionContext, Optional<? extends TriggerContext> last) { public ZonedDateTime nextEvaluationDate(ConditionContext conditionContext, Optional<? extends TriggerContext> triggerContext) {
try { return triggerContext
return last .map(ctx -> ctx.getBackfill() != null ? ctx.getBackfill().getCurrentDate() : ctx.getDate())
.map(throwFunction(context -> .map(this::withTimeZone)
nextDate(conditionContext.getRunContext(), date -> date.isAfter(context.getDate())) .or(() -> Optional.of(ZonedDateTime.now()))
.orElse(ZonedDateTime.now().plusYears(1)) .flatMap(dt -> {
)) try {
.orElse(conditionContext.getRunContext() return nextDate(conditionContext.getRunContext(), date -> date.isAfter(dt));
.render(dates) } catch (IllegalVariableEvaluationException e) {
.asList(ZonedDateTime.class) log.warn("Failed to evaluate schedule dates for trigger '{}': {}", this.getId(), e.getMessage());
.stream() throw new InvalidTriggerConfigurationException("Failed to evaluate schedule 'dates'. Cause: " + e.getMessage());
.sorted() }
.findFirst() }).orElseGet(() -> ZonedDateTime.now().plusYears(1));
.orElse(ZonedDateTime.now()))
.truncatedTo(ChronoUnit.SECONDS);
} catch (IllegalVariableEvaluationException e) {
log.warn("Failed to evaluate schedule dates for trigger '{}': {}", this.getId(), e.getMessage());
return ZonedDateTime.now().plusYears(1);
}
} }
@Override @Override
public ZonedDateTime nextEvaluationDate() { public ZonedDateTime nextEvaluationDate() {
// TODO this may be the next date from now? // TODO this may be the next date from now?
@@ -139,9 +122,17 @@ public class ScheduleOnDates extends AbstractTrigger implements Schedulable, Tri
return previousDates.isEmpty() ? ZonedDateTime.now() : previousDates.getFirst(); return previousDates.isEmpty() ? ZonedDateTime.now() : previousDates.getFirst();
} }
private Optional<ZonedDateTime> nextDate(RunContext runContext, Predicate<ZonedDateTime> filter) throws IllegalVariableEvaluationException { private ZonedDateTime withTimeZone(ZonedDateTime date) {
return runContext.render(dates).asList(ZonedDateTime.class).stream().sorted() if (this.timezone == null) {
.filter(date -> filter.test(date)) return date;
}
return date.withZoneSameInstant(ZoneId.of(this.timezone));
}
private Optional<ZonedDateTime> nextDate(RunContext runContext, Predicate<ZonedDateTime> predicate) throws IllegalVariableEvaluationException {
return runContext.render(dates)
.asList(ZonedDateTime.class).stream().sorted()
.filter(predicate)
.map(throwFunction(date -> timezone == null ? date : date.withZoneSameInstant(ZoneId.of(runContext.render(timezone))))) .map(throwFunction(date -> timezone == null ? date : date.withZoneSameInstant(ZoneId.of(runContext.render(timezone)))))
.findFirst() .findFirst()
.map(date -> date.truncatedTo(ChronoUnit.SECONDS)); .map(date -> date.truncatedTo(ChronoUnit.SECONDS));

View File

@@ -9,10 +9,14 @@
<property name="pattern" value="%date{HH:mm:ss}.%ms %highlight(%-5.5level) %magenta(%-12.36thread) %cyan(%-12.36logger{36}) %msg%n" /> <property name="pattern" value="%date{HH:mm:ss}.%ms %highlight(%-5.5level) %magenta(%-12.36thread) %cyan(%-12.36logger{36}) %msg%n" />
<logger name="io.kestra" level="INFO" /> <logger name="io.kestra" level="INFO" />
<logger name="flow" level="INFO" />
<logger name="task" level="INFO" /> <!-- Flow execution logs - disabled by default -->
<logger name="execution" level="INFO" /> <logger name="flow" level="OFF" />
<logger name="trigger" level="INFO" />
<!-- Server loggers -->
<logger name="worker" level="INFO" />
<logger name="executor" level="INFO" />
<logger name="scheduler" level="INFO" />
<logger name="io.kestra.ee.runner.kafka.services.KafkaConsumerService" level="WARN" /> <logger name="io.kestra.ee.runner.kafka.services.KafkaConsumerService" level="WARN" />
<logger name="io.kestra.ee.runner.kafka.services.KafkaProducerService" level="WARN" /> <logger name="io.kestra.ee.runner.kafka.services.KafkaProducerService" level="WARN" />

View File

@@ -170,10 +170,11 @@ class JsonSchemaGeneratorTest {
Map<String, Object> jsonSchema = jsonSchemaGenerator.generate(AbstractTrigger.class, AbstractTrigger.class); Map<String, Object> jsonSchema = jsonSchemaGenerator.generate(AbstractTrigger.class, AbstractTrigger.class);
assertThat((Map<String, Object>) jsonSchema.get("properties"), allOf( assertThat((Map<String, Object>) jsonSchema.get("properties"), allOf(
Matchers.aMapWithSize(3), Matchers.aMapWithSize(4),
hasKey("conditions"), hasKey("conditions"),
hasKey("stopAfter"), hasKey("stopAfter"),
hasKey("type") hasKey("type"),
hasKey("allowConcurrent")
)); ));
}); });
} }

View File

@@ -1,24 +1,36 @@
package io.kestra.core.models.property; package io.kestra.core.models.property;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.kestra.core.context.TestRunContextFactory; import io.kestra.core.context.TestRunContextFactory;
import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.serializers.FileSerde; import io.kestra.core.serializers.FileSerde;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.Namespace;
import io.kestra.core.storages.NamespaceFile;
import io.kestra.core.storages.StorageInterface; import io.kestra.core.storages.StorageInterface;
import io.kestra.plugin.core.namespace.Version;
import io.micronaut.core.annotation.Introspected;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.validation.ConstraintViolationException; import jakarta.validation.ConstraintViolationException;
import lombok.Builder; import lombok.Builder;
import lombok.Getter; import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.slf4j.event.Level; import org.slf4j.event.Level;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT; import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static java.util.Map.entry; import static java.util.Map.entry;
@@ -362,10 +374,43 @@ class PropertyTest {
assertThat(output.getMessages().getFirst().getValue()).isEqualTo("value1"); assertThat(output.getMessages().getFirst().getValue()).isEqualTo("value1");
} }
@Test
void jsonSubtype() throws JsonProcessingException, IllegalVariableEvaluationException {
Optional<WithSubtype> rendered = runContextFactory.of().render(
Property.<WithSubtype>ofExpression(JacksonMapper.ofJson().writeValueAsString(new MySubtype()))
).as(WithSubtype.class);
assertThat(rendered).isPresent();
assertThat(rendered.get()).isInstanceOf(MySubtype.class);
List<WithSubtype> renderedList = runContextFactory.of().render(
Property.<List<WithSubtype>>ofExpression(JacksonMapper.ofJson().writeValueAsString(List.of(new MySubtype())))
).asList(WithSubtype.class);
assertThat(renderedList).hasSize(1);
assertThat(renderedList.get(0)).isInstanceOf(MySubtype.class);
}
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY)
@JsonSubTypes({
@JsonSubTypes.Type(value = MySubtype.class, name = "mySubtype")
})
@Getter
@NoArgsConstructor
@Introspected
public abstract static class WithSubtype {
abstract public String getType();
}
@Getter
public static class MySubtype extends WithSubtype {
private final String type = "mySubtype";
}
@Builder @Builder
@Getter @Getter
private static class TestObj { private static class TestObj {
private String key; private String key;
private String value; private String value;
} }
} }

View File

@@ -60,6 +60,15 @@ class SystemInformationReportTest {
return setting; return setting;
} }
@Override
public Setting internalSave(Setting setting) throws ConstraintViolationException {
if (setting.getKey().equals(Setting.INSTANCE_UUID)) {
UUID = setting.getValue();
}
return setting;
}
@Override @Override
public Setting delete(Setting setting) { public Setting delete(Setting setting) {
return setting; return setting;

View File

@@ -1,9 +1,9 @@
package io.kestra.core.repositories; package io.kestra.core.repositories;
import com.devskiller.friendly_id.FriendlyId; import com.devskiller.friendly_id.FriendlyId;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.InvalidQueryFiltersException; import io.kestra.core.exceptions.InvalidQueryFiltersException;
import io.kestra.core.junit.annotations.FlakyTest;
import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.Label; import io.kestra.core.models.Label;
import io.kestra.core.models.QueryFilter; import io.kestra.core.models.QueryFilter;
@@ -24,7 +24,6 @@ import io.kestra.core.models.flows.State.Type;
import io.kestra.core.models.property.Property; import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.ResolvedTask; import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter; import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.NamespaceUtils; import io.kestra.core.utils.NamespaceUtils;
import io.kestra.core.utils.TestsUtils; import io.kestra.core.utils.TestsUtils;
@@ -42,10 +41,9 @@ import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.event.Level; import org.slf4j.event.Level;
import java.io.IOException; import java.io.IOException;
import java.sql.Timestamp; import java.time.Duration;
import java.time.*; import java.time.Instant;
import java.time.format.DateTimeFormatter; import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function; import java.util.function.Function;
@@ -185,6 +183,7 @@ public abstract class AbstractExecutionRepositoryTest {
@ParameterizedTest @ParameterizedTest
@MethodSource("filterCombinations") @MethodSource("filterCombinations")
@FlakyTest(description = "Filtering tests are sometimes returning 0")
void should_find_all(QueryFilter filter, int expectedSize){ void should_find_all(QueryFilter filter, int expectedSize){
var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName()); var tenant = TestsUtils.randomTenant(this.getClass().getSimpleName());
inject(tenant, "executionTriggerId"); inject(tenant, "executionTriggerId");

View File

@@ -10,82 +10,84 @@ import org.junit.jupiter.api.TestInstance;
@KestraTest(startRunner = true) @KestraTest(startRunner = true)
@TestInstance(TestInstance.Lifecycle.PER_CLASS) @TestInstance(TestInstance.Lifecycle.PER_CLASS)
public abstract class AbstractRunnerConcurrencyTest { public abstract class AbstractRunnerConcurrencyTest {
public static final String TENANT_1 = "tenant1";
@Inject @Inject
protected FlowConcurrencyCaseTest flowConcurrencyCaseTest; protected FlowConcurrencyCaseTest flowConcurrencyCaseTest;
@Test @Test
@LoadFlows({"flows/valids/flow-concurrency-cancel.yml"}) @LoadFlows(value = {"flows/valids/flow-concurrency-cancel.yml"}, tenantId = "concurrency-cancel")
void concurrencyCancel() throws Exception { void concurrencyCancel() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyCancel(); flowConcurrencyCaseTest.flowConcurrencyCancel("concurrency-cancel");
} }
@Test @Test
@LoadFlows({"flows/valids/flow-concurrency-fail.yml"}) @LoadFlows(value = {"flows/valids/flow-concurrency-fail.yml"}, tenantId = "concurrency-fail")
void concurrencyFail() throws Exception { void concurrencyFail() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyFail(); flowConcurrencyCaseTest.flowConcurrencyFail("concurrency-fail");
} }
@Test @Test
@LoadFlows({"flows/valids/flow-concurrency-queue.yml"}) @LoadFlows(value = {"flows/valids/flow-concurrency-queue.yml"}, tenantId = "concurrency-queue")
void concurrencyQueue() throws Exception { void concurrencyQueue() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueue(); flowConcurrencyCaseTest.flowConcurrencyQueue("concurrency-queue");
} }
@Test @Test
@LoadFlows({"flows/valids/flow-concurrency-queue-pause.yml"}) @LoadFlows(value = {"flows/valids/flow-concurrency-queue-pause.yml"}, tenantId = "concurrency-queue-pause")
protected void concurrencyQueuePause() throws Exception { protected void concurrencyQueuePause() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueuePause(); flowConcurrencyCaseTest.flowConcurrencyQueuePause("concurrency-queue-pause");
} }
@Test @Test
@LoadFlows({"flows/valids/flow-concurrency-cancel-pause.yml"}) @LoadFlows(value = {"flows/valids/flow-concurrency-cancel-pause.yml"}, tenantId = "concurrency-cancel-pause")
protected void concurrencyCancelPause() throws Exception { protected void concurrencyCancelPause() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyCancelPause(); flowConcurrencyCaseTest.flowConcurrencyCancelPause("concurrency-cancel-pause");
} }
@Test @Test
@LoadFlows(value = {"flows/valids/flow-concurrency-for-each-item.yaml", "flows/valids/flow-concurrency-queue.yml"}, tenantId = TENANT_1) @LoadFlows(value = {"flows/valids/flow-concurrency-for-each-item.yaml", "flows/valids/flow-concurrency-queue.yml"}, tenantId = "flow-concurrency-with-for-each-item")
protected void flowConcurrencyWithForEachItem() throws Exception { protected void flowConcurrencyWithForEachItem() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem(TENANT_1); flowConcurrencyCaseTest.flowConcurrencyWithForEachItem("flow-concurrency-with-for-each-item");
} }
@Test @Test
@LoadFlows({"flows/valids/flow-concurrency-queue-fail.yml"}) @LoadFlows(value = {"flows/valids/flow-concurrency-queue-fail.yml"}, tenantId = "concurrency-queue-restarted")
protected void concurrencyQueueRestarted() throws Exception { protected void concurrencyQueueRestarted() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueueRestarted(); flowConcurrencyCaseTest.flowConcurrencyQueueRestarted("concurrency-queue-restarted");
} }
@Test @Test
@LoadFlows({"flows/valids/flow-concurrency-queue-after-execution.yml"}) @LoadFlows(value = {"flows/valids/flow-concurrency-queue-after-execution.yml"}, tenantId = "concurrency-queue-after-execution")
void concurrencyQueueAfterExecution() throws Exception { void concurrencyQueueAfterExecution() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueueAfterExecution(); flowConcurrencyCaseTest.flowConcurrencyQueueAfterExecution("concurrency-queue-after-execution");
} }
@Test @Test
@LoadFlows(value = {"flows/valids/flow-concurrency-subflow.yml", "flows/valids/flow-concurrency-cancel.yml"}, tenantId = TENANT_1) @LoadFlows(value = {"flows/valids/flow-concurrency-subflow.yml", "flows/valids/flow-concurrency-cancel.yml"}, tenantId = "flow-concurrency-subflow")
void flowConcurrencySubflow() throws Exception { void flowConcurrencySubflow() throws Exception {
flowConcurrencyCaseTest.flowConcurrencySubflow(TENANT_1); flowConcurrencyCaseTest.flowConcurrencySubflow("flow-concurrency-subflow");
} }
@Test @Test
@FlakyTest(description = "Only flaky in CI") @FlakyTest(description = "Only flaky in CI")
@LoadFlows({"flows/valids/flow-concurrency-parallel-subflow-kill.yaml", "flows/valids/flow-concurrency-parallel-subflow-kill-child.yaml", "flows/valids/flow-concurrency-parallel-subflow-kill-grandchild.yaml"}) @LoadFlows(
value = {"flows/valids/flow-concurrency-parallel-subflow-kill.yaml", "flows/valids/flow-concurrency-parallel-subflow-kill-child.yaml", "flows/valids/flow-concurrency-parallel-subflow-kill-grandchild.yaml"},
tenantId = "flow-concurrency-parallel-subflow-kill"
)
protected void flowConcurrencyParallelSubflowKill() throws Exception { protected void flowConcurrencyParallelSubflowKill() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyParallelSubflowKill(); flowConcurrencyCaseTest.flowConcurrencyParallelSubflowKill("flow-concurrency-parallel-subflow-kill");
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-queue-killed.yml"})
void flowConcurrencyKilled() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyKilled();
} }
@Test @Test
@FlakyTest(description = "Only flaky in CI") @FlakyTest(description = "Only flaky in CI")
@LoadFlows({"flows/valids/flow-concurrency-queue-killed.yml"}) @LoadFlows(value = {"flows/valids/flow-concurrency-queue-killed.yml"}, tenantId = "flow-concurrency-killed")
void flowConcurrencyKilled() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyKilled("flow-concurrency-killed");
}
@Test
@FlakyTest(description = "Only flaky in CI")
@LoadFlows(value = {"flows/valids/flow-concurrency-queue-killed.yml"}, tenantId = "flow-concurrency-queue-killed")
void flowConcurrencyQueueKilled() throws Exception { void flowConcurrencyQueueKilled() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueueKilled(); flowConcurrencyCaseTest.flowConcurrencyQueueKilled("flow-concurrency-queue-killed");
} }
} }

View File

@@ -6,6 +6,7 @@ import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows; import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.Label; import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithSource; import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.GenericFlow; import io.kestra.core.models.flows.GenericFlow;
@@ -466,4 +467,20 @@ class ExecutionServiceTest {
assertThat(restart.getTaskRunList()).hasSize(2); assertThat(restart.getTaskRunList()).hasSize(2);
assertThat(restart.findTaskRunsByTaskId("make_error").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS); assertThat(restart.findTaskRunsByTaskId("make_error").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
} }
@Test
@LoadFlows({"flows/valids/each-pause.yaml"})
void killExecutionWithFlowableTask() throws Exception {
Execution execution = runnerUtils.runOneUntilPaused(MAIN_TENANT, "io.kestra.tests", "each-pause");
TaskRun childTaskRun = execution.getTaskRunList().stream().filter(tr -> tr.getTaskId().equals("pause")).toList().getFirst();
Execution killed = executionService.killParentTaskruns(childTaskRun,execution);
TaskRun parentTaskRun = killed.getTaskRunList().stream().filter(tr -> tr.getTaskId().equals("each_task")).toList().getFirst();
assertThat(parentTaskRun.getState().getCurrent()).isEqualTo(State.Type.KILLED);
assertThat(parentTaskRun.getAttempts().getLast().getState().getCurrent()).isEqualTo(State.Type.KILLED);
}
} }

View File

@@ -31,7 +31,6 @@ import java.util.Optional;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
@Singleton @Singleton
@@ -57,12 +56,12 @@ public class FlowConcurrencyCaseTest {
@Named(QueueFactoryInterface.KILL_NAMED) @Named(QueueFactoryInterface.KILL_NAMED)
protected QueueInterface<ExecutionKilled> killQueue; protected QueueInterface<ExecutionKilled> killQueue;
public void flowConcurrencyCancel() throws TimeoutException, QueueException { public void flowConcurrencyCancel(String tenantId) throws TimeoutException, QueueException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel", null, null, Duration.ofSeconds(30)); Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
try { try {
List<Execution> shouldFailExecutions = List.of( List<Execution> shouldFailExecutions = List.of(
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel"), runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-cancel"),
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel") runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-cancel")
); );
assertThat(execution1.getState().isRunning()).isTrue(); assertThat(execution1.getState().isRunning()).isTrue();
@@ -73,12 +72,12 @@ public class FlowConcurrencyCaseTest {
} }
} }
public void flowConcurrencyFail() throws TimeoutException, QueueException { public void flowConcurrencyFail(String tenantId) throws TimeoutException, QueueException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-fail", null, null, Duration.ofSeconds(30)); Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-fail", null, null, Duration.ofSeconds(30));
try { try {
List<Execution> shouldFailExecutions = List.of( List<Execution> shouldFailExecutions = List.of(
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-fail"), runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-fail"),
runnerUtils.runOne(MAIN_TENANT, NAMESPACE, "flow-concurrency-fail") runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-fail")
); );
assertThat(execution1.getState().isRunning()).isTrue(); assertThat(execution1.getState().isRunning()).isTrue();
@@ -89,10 +88,10 @@ public class FlowConcurrencyCaseTest {
} }
} }
public void flowConcurrencyQueue() throws QueueException { public void flowConcurrencyQueue(String tenantId) throws QueueException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue", null, null, Duration.ofSeconds(30)); Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-queue", null, null, Duration.ofSeconds(30));
Flow flow = flowRepository Flow flow = flowRepository
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue", Optional.empty()) .findById(tenantId, NAMESPACE, "flow-concurrency-queue", Optional.empty())
.orElseThrow(); .orElseThrow();
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty()); Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
Execution executionResult2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution2); Execution executionResult2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution2);
@@ -108,10 +107,10 @@ public class FlowConcurrencyCaseTest {
assertThat(executionResult2.getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING); assertThat(executionResult2.getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
} }
public void flowConcurrencyQueuePause() throws QueueException { public void flowConcurrencyQueuePause(String tenantId) throws QueueException {
Execution execution1 = runnerUtils.runOneUntilPaused(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-pause"); Execution execution1 = runnerUtils.runOneUntilPaused(tenantId, NAMESPACE, "flow-concurrency-queue-pause");
Flow flow = flowRepository Flow flow = flowRepository
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-pause", Optional.empty()) .findById(tenantId, NAMESPACE, "flow-concurrency-queue-pause", Optional.empty())
.orElseThrow(); .orElseThrow();
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty()); Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
Execution secondExecutionResult = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution2); Execution secondExecutionResult = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution2);
@@ -127,10 +126,10 @@ public class FlowConcurrencyCaseTest {
assertThat(secondExecutionResult.getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING); assertThat(secondExecutionResult.getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
} }
public void flowConcurrencyCancelPause() throws QueueException { public void flowConcurrencyCancelPause(String tenantId) throws QueueException {
Execution execution1 = runnerUtils.runOneUntilPaused(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel-pause"); Execution execution1 = runnerUtils.runOneUntilPaused(tenantId, NAMESPACE, "flow-concurrency-cancel-pause");
Flow flow = flowRepository Flow flow = flowRepository
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-cancel-pause", Optional.empty()) .findById(tenantId, NAMESPACE, "flow-concurrency-cancel-pause", Optional.empty())
.orElseThrow(); .orElseThrow();
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty()); Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
Execution secondExecutionResult = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.CANCELLED), execution2); Execution secondExecutionResult = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.CANCELLED), execution2);
@@ -166,11 +165,11 @@ public class FlowConcurrencyCaseTest {
.toList()).contains(Type.QUEUED); .toList()).contains(Type.QUEUED);
} }
public void flowConcurrencyQueueRestarted() throws Exception { public void flowConcurrencyQueueRestarted(String tenantId) throws Exception {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE,
"flow-concurrency-queue-fail", null, null, Duration.ofSeconds(30)); "flow-concurrency-queue-fail", null, null, Duration.ofSeconds(30));
Flow flow = flowRepository Flow flow = flowRepository
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-fail", Optional.empty()) .findById(tenantId, NAMESPACE, "flow-concurrency-queue-fail", Optional.empty())
.orElseThrow(); .orElseThrow();
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty()); Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.RUNNING), execution2); runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.RUNNING), execution2);
@@ -179,7 +178,10 @@ public class FlowConcurrencyCaseTest {
// we restart the first one, it should be queued then fail again. // we restart the first one, it should be queued then fail again.
Execution failedExecution = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.FAILED), execution1); Execution failedExecution = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.FAILED), execution1);
Execution restarted = executionService.restart(failedExecution, null); Execution restarted = executionService.restart(failedExecution, null);
Execution executionResult1 = runnerUtils.restartExecution(e -> e.getState().getCurrent().equals(Type.FAILED), restarted); Execution executionResult1 = runnerUtils.restartExecution(
e -> e.getState().getHistories().stream().anyMatch(history -> history.getState() == Type.RESTARTED) && e.getState().getCurrent().equals(Type.FAILED),
restarted
);
Execution executionResult2 = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.FAILED), execution2); Execution executionResult2 = runnerUtils.awaitExecution(e -> e.getState().getCurrent().equals(Type.FAILED), execution2);
assertThat(executionResult1.getState().getCurrent()).isEqualTo(Type.FAILED); assertThat(executionResult1.getState().getCurrent()).isEqualTo(Type.FAILED);
@@ -193,10 +195,10 @@ public class FlowConcurrencyCaseTest {
assertThat(executionResult2.getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING); assertThat(executionResult2.getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
} }
public void flowConcurrencyQueueAfterExecution() throws QueueException { public void flowConcurrencyQueueAfterExecution(String tenantId) throws QueueException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-after-execution", null, null, Duration.ofSeconds(30)); Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-queue-after-execution", null, null, Duration.ofSeconds(30));
Flow flow = flowRepository Flow flow = flowRepository
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-after-execution", Optional.empty()) .findById(tenantId, NAMESPACE, "flow-concurrency-queue-after-execution", Optional.empty())
.orElseThrow(); .orElseThrow();
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty()); Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
Execution executionResult2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution2); Execution executionResult2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), execution2);
@@ -216,15 +218,15 @@ public class FlowConcurrencyCaseTest {
List<Execution> subFlowExecs = runnerUtils.awaitFlowExecutionNumber(2, tenantId, NAMESPACE, "flow-concurrency-cancel"); List<Execution> subFlowExecs = runnerUtils.awaitFlowExecutionNumber(2, tenantId, NAMESPACE, "flow-concurrency-cancel");
assertThat(subFlowExecs).extracting(e -> e.getState().getCurrent()).containsExactlyInAnyOrder(Type.SUCCESS, Type.CANCELLED); assertThat(subFlowExecs).extracting(e -> e.getState().getCurrent()).containsExactlyInAnyOrder(Type.SUCCESS, Type.CANCELLED);
// run another execution to be sure that everything work (purge is correctly done) // run another execution to be sure that everything works (purge is correctly done)
Execution execution3 = runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-subflow"); Execution execution3 = runnerUtils.runOne(tenantId, NAMESPACE, "flow-concurrency-subflow");
assertThat(execution3.getState().getCurrent()).isEqualTo(Type.SUCCESS); assertThat(execution3.getState().getCurrent()).isEqualTo(Type.SUCCESS);
runnerUtils.awaitFlowExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), tenantId, NAMESPACE, "flow-concurrency-cancel"); runnerUtils.awaitFlowExecution(e -> e.getState().getCurrent().equals(Type.SUCCESS), tenantId, NAMESPACE, "flow-concurrency-cancel");
} }
public void flowConcurrencyParallelSubflowKill() throws QueueException { public void flowConcurrencyParallelSubflowKill(String tenantId) throws QueueException {
Execution parent = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-parallel-subflow-kill", null, null, Duration.ofSeconds(30)); Execution parent = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-parallel-subflow-kill", null, null, Duration.ofSeconds(30));
Execution queued = runnerUtils.awaitFlowExecution(e -> e.getState().isQueued(), MAIN_TENANT, NAMESPACE, "flow-concurrency-parallel-subflow-kill-child"); Execution queued = runnerUtils.awaitFlowExecution(e -> e.getState().isQueued(), tenantId, NAMESPACE, "flow-concurrency-parallel-subflow-kill-child");
// Kill the parent // Kill the parent
killQueue.emit(ExecutionKilledExecution killQueue.emit(ExecutionKilledExecution
@@ -232,7 +234,7 @@ public class FlowConcurrencyCaseTest {
.state(ExecutionKilled.State.REQUESTED) .state(ExecutionKilled.State.REQUESTED)
.executionId(parent.getId()) .executionId(parent.getId())
.isOnKillCascade(true) .isOnKillCascade(true)
.tenantId(MAIN_TENANT) .tenantId(tenantId)
.build() .build()
); );
@@ -242,11 +244,11 @@ public class FlowConcurrencyCaseTest {
assertThat(terminated.getTaskRunList()).isNull(); assertThat(terminated.getTaskRunList()).isNull();
} }
public void flowConcurrencyKilled() throws QueueException, InterruptedException { public void flowConcurrencyKilled(String tenantId) throws QueueException, InterruptedException {
Flow flow = flowRepository Flow flow = flowRepository
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty()) .findById(tenantId, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty())
.orElseThrow(); .orElseThrow();
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30)); Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30));
Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty())); Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
Execution execution3 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty())); Execution execution3 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
@@ -261,7 +263,7 @@ public class FlowConcurrencyCaseTest {
.state(ExecutionKilled.State.REQUESTED) .state(ExecutionKilled.State.REQUESTED)
.executionId(execution1.getId()) .executionId(execution1.getId())
.isOnKillCascade(true) .isOnKillCascade(true)
.tenantId(MAIN_TENANT) .tenantId(tenantId)
.build() .build()
); );
@@ -279,20 +281,19 @@ public class FlowConcurrencyCaseTest {
assertThat(queued.getState().getCurrent()).isEqualTo(Type.QUEUED); assertThat(queued.getState().getCurrent()).isEqualTo(Type.QUEUED);
} finally { } finally {
// kill everything to avoid dangling executions // kill everything to avoid dangling executions
runnerUtils.killExecution(execution1);
runnerUtils.killExecution(execution2); runnerUtils.killExecution(execution2);
runnerUtils.killExecution(execution3); runnerUtils.killExecution(execution3);
// await that they are all terminated, note that as KILLED is received twice, some messages would still be pending, but this is the best we can do // await that they are all terminated, note that as KILLED is received twice, some messages would still be pending, but this is the best we can do
runnerUtils.awaitFlowExecutionNumber(3, MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed"); runnerUtils.awaitFlowExecutionNumber(3, tenantId, NAMESPACE, "flow-concurrency-queue-killed");
} }
} }
public void flowConcurrencyQueueKilled() throws QueueException, InterruptedException { public void flowConcurrencyQueueKilled(String tenantId) throws QueueException, InterruptedException {
Flow flow = flowRepository Flow flow = flowRepository
.findById(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty()) .findById(tenantId, NAMESPACE, "flow-concurrency-queue-killed", Optional.empty())
.orElseThrow(); .orElseThrow();
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30)); Execution execution1 = runnerUtils.runOneUntilRunning(tenantId, NAMESPACE, "flow-concurrency-queue-killed", null, null, Duration.ofSeconds(30));
Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty())); Execution execution2 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
Execution execution3 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty())); Execution execution3 = runnerUtils.emitAndAwaitExecution(e -> e.getState().getCurrent().equals(Type.QUEUED), Execution.newExecution(flow, null, null, Optional.empty()));
@@ -307,7 +308,7 @@ public class FlowConcurrencyCaseTest {
.state(ExecutionKilled.State.REQUESTED) .state(ExecutionKilled.State.REQUESTED)
.executionId(execution2.getId()) .executionId(execution2.getId())
.isOnKillCascade(true) .isOnKillCascade(true)
.tenantId(MAIN_TENANT) .tenantId(tenantId)
.build() .build()
); );
@@ -322,11 +323,10 @@ public class FlowConcurrencyCaseTest {
} finally { } finally {
// kill everything to avoid dangling executions // kill everything to avoid dangling executions
runnerUtils.killExecution(execution1); runnerUtils.killExecution(execution1);
runnerUtils.killExecution(execution2);
runnerUtils.killExecution(execution3); runnerUtils.killExecution(execution3);
// await that they are all terminated, note that as KILLED is received twice, some messages would still be pending, but this is the best we can do // await that they are all terminated, note that as KILLED is received twice, some messages would still be pending, but this is the best we can do
runnerUtils.awaitFlowExecutionNumber(3, MAIN_TENANT, NAMESPACE, "flow-concurrency-queue-killed"); runnerUtils.awaitFlowExecutionNumber(3, tenantId, NAMESPACE, "flow-concurrency-queue-killed");
} }
} }

View File

@@ -239,7 +239,7 @@ class FlowInputOutputTest {
// Then // Then
Assertions.assertEquals(2, values.size()); Assertions.assertEquals(2, values.size());
Assertions.assertFalse(values.get(1).enabled()); Assertions.assertFalse(values.get(1).enabled());
Assertions.assertNotNull(values.get(1).exception()); Assertions.assertNotNull(values.get(1).exceptions());
} }
@Test @Test
@@ -257,7 +257,7 @@ class FlowInputOutputTest {
List<InputAndValue> values = flowInputOutput.validateExecutionInputs(List.of(input), null, DEFAULT_TEST_EXECUTION, data).block(); List<InputAndValue> values = flowInputOutput.validateExecutionInputs(List.of(input), null, DEFAULT_TEST_EXECUTION, data).block();
// Then // Then
Assertions.assertNull(values.getFirst().exception()); Assertions.assertNull(values.getFirst().exceptions());
Assertions.assertFalse(storageInterface.exists(MAIN_TENANT, null, URI.create(values.getFirst().value().toString()))); Assertions.assertFalse(storageInterface.exists(MAIN_TENANT, null, URI.create(values.getFirst().value().toString())));
} }

View File

@@ -2,6 +2,7 @@ package io.kestra.core.runners;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.io.CharStreams; import com.google.common.io.CharStreams;
import io.kestra.core.exceptions.InputOutputValidationException;
import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows; import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.Execution;
@@ -137,8 +138,8 @@ public class InputsTest {
void missingRequired() { void missingRequired() {
HashMap<String, Object> inputs = new HashMap<>(InputsTest.inputs); HashMap<String, Object> inputs = new HashMap<>(InputsTest.inputs);
inputs.put("string", null); inputs.put("string", null);
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(inputs, MAIN_TENANT)); InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(inputs, MAIN_TENANT));
assertThat(e.getMessage()).contains("Invalid input for `string`, missing required input, but received `null`"); assertThat(e.getMessage()).contains("Missing required input:string");
} }
@Test @Test
@@ -232,9 +233,9 @@ public class InputsTest {
HashMap<String, Object> map = new HashMap<>(inputs); HashMap<String, Object> map = new HashMap<>(inputs);
map.put("validatedString", "foo"); map.put("validatedString", "foo");
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(map, "tenant4")); InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(map, "tenant4"));
assertThat(e.getMessage()).contains("Invalid input for `validatedString`, it must match the pattern"); assertThat(e.getMessage()).contains( "Invalid value for input `validatedString`. Cause: it must match the pattern");
} }
@Test @Test
@@ -242,15 +243,15 @@ public class InputsTest {
void inputValidatedIntegerBadValue() { void inputValidatedIntegerBadValue() {
HashMap<String, Object> mapMin = new HashMap<>(inputs); HashMap<String, Object> mapMin = new HashMap<>(inputs);
mapMin.put("validatedInt", "9"); mapMin.put("validatedInt", "9");
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant5")); InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant5"));
assertThat(e.getMessage()).contains("Invalid input for `validatedInt`, it must be more than `10`, but received `9`"); assertThat(e.getMessage()).contains("Invalid value for input `validatedInt`. Cause: it must be more than `10`");
HashMap<String, Object> mapMax = new HashMap<>(inputs); HashMap<String, Object> mapMax = new HashMap<>(inputs);
mapMax.put("validatedInt", "21"); mapMax.put("validatedInt", "21");
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant5")); e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant5"));
assertThat(e.getMessage()).contains("Invalid input for `validatedInt`, it must be less than `20`, but received `21`"); assertThat(e.getMessage()).contains("Invalid value for input `validatedInt`. Cause: it must be less than `20`");
} }
@Test @Test
@@ -258,15 +259,15 @@ public class InputsTest {
void inputValidatedDateBadValue() { void inputValidatedDateBadValue() {
HashMap<String, Object> mapMin = new HashMap<>(inputs); HashMap<String, Object> mapMin = new HashMap<>(inputs);
mapMin.put("validatedDate", "2022-01-01"); mapMin.put("validatedDate", "2022-01-01");
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant6")); InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant6"));
assertThat(e.getMessage()).contains("Invalid input for `validatedDate`, it must be after `2023-01-01`, but received `2022-01-01`"); assertThat(e.getMessage()).contains("Invalid value for input `validatedDate`. Cause: it must be after `2023-01-01`");
HashMap<String, Object> mapMax = new HashMap<>(inputs); HashMap<String, Object> mapMax = new HashMap<>(inputs);
mapMax.put("validatedDate", "2024-01-01"); mapMax.put("validatedDate", "2024-01-01");
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant6")); e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant6"));
assertThat(e.getMessage()).contains("Invalid input for `validatedDate`, it must be before `2023-12-31`, but received `2024-01-01`"); assertThat(e.getMessage()).contains("Invalid value for input `validatedDate`. Cause: it must be before `2023-12-31`");
} }
@Test @Test
@@ -274,15 +275,15 @@ public class InputsTest {
void inputValidatedDateTimeBadValue() { void inputValidatedDateTimeBadValue() {
HashMap<String, Object> mapMin = new HashMap<>(inputs); HashMap<String, Object> mapMin = new HashMap<>(inputs);
mapMin.put("validatedDateTime", "2022-01-01T00:00:00Z"); mapMin.put("validatedDateTime", "2022-01-01T00:00:00Z");
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant7")); InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant7"));
assertThat(e.getMessage()).contains("Invalid input for `validatedDateTime`, it must be after `2023-01-01T00:00:00Z`, but received `2022-01-01T00:00:00Z`"); assertThat(e.getMessage()).contains("Invalid value for input `validatedDateTime`. Cause: it must be after `2023-01-01T00:00:00Z`");
HashMap<String, Object> mapMax = new HashMap<>(inputs); HashMap<String, Object> mapMax = new HashMap<>(inputs);
mapMax.put("validatedDateTime", "2024-01-01T00:00:00Z"); mapMax.put("validatedDateTime", "2024-01-01T00:00:00Z");
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant7")); e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant7"));
assertThat(e.getMessage()).contains("Invalid input for `validatedDateTime`, it must be before `2023-12-31T23:59:59Z`"); assertThat(e.getMessage()).contains("Invalid value for input `validatedDateTime`. Cause: it must be before `2023-12-31T23:59:59Z`");
} }
@Test @Test
@@ -290,15 +291,15 @@ public class InputsTest {
void inputValidatedDurationBadValue() { void inputValidatedDurationBadValue() {
HashMap<String, Object> mapMin = new HashMap<>(inputs); HashMap<String, Object> mapMin = new HashMap<>(inputs);
mapMin.put("validatedDuration", "PT1S"); mapMin.put("validatedDuration", "PT1S");
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant8")); InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant8"));
assertThat(e.getMessage()).contains("Invalid input for `validatedDuration`, It must be more than `PT10S`, but received `PT1S`"); assertThat(e.getMessage()).contains("Invalid value for input `validatedDuration`. Cause: It must be more than `PT10S`");
HashMap<String, Object> mapMax = new HashMap<>(inputs); HashMap<String, Object> mapMax = new HashMap<>(inputs);
mapMax.put("validatedDuration", "PT30S"); mapMax.put("validatedDuration", "PT30S");
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant8")); e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant8"));
assertThat(e.getMessage()).contains("Invalid input for `validatedDuration`, It must be less than `PT20S`, but received `PT30S`"); assertThat(e.getMessage()).contains("Invalid value for input `validatedDuration`. Cause: It must be less than `PT20S`");
} }
@Test @Test
@@ -306,15 +307,15 @@ public class InputsTest {
void inputValidatedFloatBadValue() { void inputValidatedFloatBadValue() {
HashMap<String, Object> mapMin = new HashMap<>(inputs); HashMap<String, Object> mapMin = new HashMap<>(inputs);
mapMin.put("validatedFloat", "0.01"); mapMin.put("validatedFloat", "0.01");
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant9")); InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant9"));
assertThat(e.getMessage()).contains("Invalid input for `validatedFloat`, it must be more than `0.1`, but received `0.01`"); assertThat(e.getMessage()).contains("Invalid value for input `validatedFloat`. Cause: it must be more than `0.1`");
HashMap<String, Object> mapMax = new HashMap<>(inputs); HashMap<String, Object> mapMax = new HashMap<>(inputs);
mapMax.put("validatedFloat", "1.01"); mapMax.put("validatedFloat", "1.01");
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant9")); e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant9"));
assertThat(e.getMessage()).contains("Invalid input for `validatedFloat`, it must be less than `0.5`, but received `1.01`"); assertThat(e.getMessage()).contains("Invalid value for input `validatedFloat`. Cause: it must be less than `0.5`");
} }
@Test @Test
@@ -322,15 +323,15 @@ public class InputsTest {
void inputValidatedTimeBadValue() { void inputValidatedTimeBadValue() {
HashMap<String, Object> mapMin = new HashMap<>(inputs); HashMap<String, Object> mapMin = new HashMap<>(inputs);
mapMin.put("validatedTime", "00:00:01"); mapMin.put("validatedTime", "00:00:01");
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMin, "tenant10")); InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMin, "tenant10"));
assertThat(e.getMessage()).contains("Invalid input for `validatedTime`, it must be after `01:00`, but received `00:00:01`"); assertThat(e.getMessage()).contains( "Invalid value for input `validatedTime`. Cause: it must be after `01:00`");
HashMap<String, Object> mapMax = new HashMap<>(inputs); HashMap<String, Object> mapMax = new HashMap<>(inputs);
mapMax.put("validatedTime", "14:00:00"); mapMax.put("validatedTime", "14:00:00");
e = assertThrows(ConstraintViolationException.class, () -> typedInputs(mapMax, "tenant10")); e = assertThrows(InputOutputValidationException.class, () -> typedInputs(mapMax, "tenant10"));
assertThat(e.getMessage()).contains("Invalid input for `validatedTime`, it must be before `11:59:59`, but received `14:00:00`"); assertThat(e.getMessage()).contains("Invalid value for input `validatedTime`. Cause: it must be before `11:59:59`");
} }
@Test @Test
@@ -339,9 +340,9 @@ public class InputsTest {
HashMap<String, Object> map = new HashMap<>(inputs); HashMap<String, Object> map = new HashMap<>(inputs);
map.put("uri", "http:/bla"); map.put("uri", "http:/bla");
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(map, "tenant11")); InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(map, "tenant11"));
assertThat(e.getMessage()).contains("Invalid input for `uri`, Expected `URI` but received `http:/bla`, but received `http:/bla`"); assertThat(e.getMessage()).contains( "Invalid value for input `uri`. Cause: Invalid URI format." );
} }
@Test @Test
@@ -350,9 +351,9 @@ public class InputsTest {
HashMap<String, Object> map = new HashMap<>(inputs); HashMap<String, Object> map = new HashMap<>(inputs);
map.put("enum", "INVALID"); map.put("enum", "INVALID");
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(map, "tenant12")); InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(map, "tenant12"));
assertThat(e.getMessage()).isEqualTo("enum: Invalid input for `enum`, it must match the values `[ENUM_VALUE, OTHER_ONE]`, but received `INVALID`"); assertThat(e.getMessage()).isEqualTo("Invalid value for input `enum`. Cause: it must match the values `[ENUM_VALUE, OTHER_ONE]`");
} }
@Test @Test
@@ -361,9 +362,9 @@ public class InputsTest {
HashMap<String, Object> map = new HashMap<>(inputs); HashMap<String, Object> map = new HashMap<>(inputs);
map.put("array", "[\"s1\", \"s2\"]"); map.put("array", "[\"s1\", \"s2\"]");
ConstraintViolationException e = assertThrows(ConstraintViolationException.class, () -> typedInputs(map, "tenant13")); InputOutputValidationException e = assertThrows(InputOutputValidationException.class, () -> typedInputs(map, "tenant13"));
assertThat(e.getMessage()).contains("Invalid input for `array`, Unable to parse array element as `INT` on `s1`, but received `[\"s1\", \"s2\"]`"); assertThat(e.getMessage()).contains( "Invalid value for input `array`. Cause: Unable to parse array element as `INT` on `s1`");
} }
@Test @Test
@@ -467,7 +468,20 @@ public class InputsTest {
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS); assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat((String) execution.findTaskRunsByTaskId("file").getFirst().getOutputs().get("value")).isEqualTo(file.toString()); assertThat((String) execution.findTaskRunsByTaskId("file").getFirst().getOutputs().get("value")).isEqualTo(file.toString());
} }
@Test
@LoadFlows(value = "flows/invalids/inputs-with-multiple-constraint-violations.yaml")
void multipleConstraintViolations() {
InputOutputValidationException ex = assertThrows(InputOutputValidationException.class, ()-> runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "inputs-with-multiple-constraint-violations", null,
(f, e) ->flowIO.readExecutionInputs(f, e , Map.of("multi", List.of("F", "H")) )));
List<String> messages = Arrays.asList(ex.getMessage().split(System.lineSeparator()));
assertThat(messages).containsExactlyInAnyOrder(
"Invalid value for input `multi`. Cause: you can't define both `values` and `options`",
"Invalid value for input `multi`. Cause: value `F` doesn't match the values `[A, B, C]`",
"Invalid value for input `multi`. Cause: value `H` doesn't match the values `[A, B, C]`"
);
}
private URI createFile() throws IOException { private URI createFile() throws IOException {
File tempFile = File.createTempFile("file", ".txt"); File tempFile = File.createTempFile("file", ".txt");
Files.write(tempFile.toPath(), "Hello World".getBytes()); Files.write(tempFile.toPath(), "Hello World".getBytes());

View File

@@ -1,5 +1,6 @@
package io.kestra.core.runners; package io.kestra.core.runners;
import io.kestra.core.exceptions.InputOutputValidationException;
import io.kestra.core.junit.annotations.ExecuteFlow; import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.LoadFlows; import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.Execution;
@@ -71,6 +72,6 @@ public class NoEncryptionConfiguredTest implements TestPropertyProvider {
.flowId(flow.getId()) .flowId(flow.getId())
.build(); .build();
assertThrows(ConstraintViolationException.class, () -> flowIO.readExecutionInputs(flow, execution, InputsTest.inputs)); assertThrows(InputOutputValidationException.class, () -> flowIO.readExecutionInputs(flow, execution, InputsTest.inputs));
} }
} }

View File

@@ -1,15 +1,24 @@
package io.kestra.core.utils; package io.kestra.core.utils;
import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.models.Setting;
import io.kestra.core.repositories.SettingRepositoryInterface;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@KestraTest import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
@MicronautTest
public class EditionProviderTest { public class EditionProviderTest {
@Inject @Inject
private EditionProvider editionProvider; private EditionProvider editionProvider;
@Inject
private SettingRepositoryInterface settingRepository;
protected EditionProvider.Edition expectedEdition() { protected EditionProvider.Edition expectedEdition() {
return EditionProvider.Edition.OSS; return EditionProvider.Edition.OSS;
} }
@@ -17,5 +26,10 @@ public class EditionProviderTest {
@Test @Test
void shouldReturnCurrentEdition() { void shouldReturnCurrentEdition() {
Assertions.assertEquals(expectedEdition(), editionProvider.get()); Assertions.assertEquals(expectedEdition(), editionProvider.get());
// check that the edition is persisted in settings
Optional<Setting> editionSettings = settingRepository.findByKey(Setting.INSTANCE_EDITION);
assertThat(editionSettings).isPresent();
assertThat(editionSettings.get().getValue()).isEqualTo(expectedEdition().name());
} }
} }

View File

@@ -1,48 +1,107 @@
package io.kestra.core.utils; package io.kestra.core.utils;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;
import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun; import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.TriggerContext; import io.kestra.core.models.triggers.TriggerContext;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level; import org.slf4j.event.Level;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import static org.assertj.core.api.Assertions.assertThat;
@Slf4j @Slf4j
class LogsTest { class LogsTest {
private static final InMemoryAppender MEMORY_APPENDER = new InMemoryAppender();
@BeforeAll
static void setupLogger() {
Logger logger = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
MEMORY_APPENDER.setContext((LoggerContext) LoggerFactory.getILoggerFactory());
MEMORY_APPENDER.start();
logger.addAppender(MEMORY_APPENDER);
}
@AfterEach
void clearLogs() {
MEMORY_APPENDER.clear();
}
@Test @Test
void logFlow() { void logFlow() {
var flow = Flow.builder().namespace("namespace").id("flow").build(); var flow = Flow.builder().tenantId("tenant").namespace("namespace").id("flow").build();
Logs.logExecution(flow, log, Level.INFO, "Some log"); Logs.logExecution(flow, log, Level.INFO, "Some log");
Logs.logExecution(flow, log, Level.INFO, "Some log with an {}", "attribute"); Logs.logExecution(flow, log, Level.INFO, "Some log with an {}", "attribute");
Logs.logExecution(flow, log, Level.ERROR, "Some log with an {} and an error", "attribute", new RuntimeException("Test Exception")); Logs.logExecution(flow, log, Level.ERROR, "Some log with an {} and an error", "attribute", new RuntimeException("Test Exception"));
List<ILoggingEvent> logs = MEMORY_APPENDER.getLogs();
assertThat(logs).hasSize(3);
} }
@Test @Test
void logExecution() { void logExecution() {
var execution = Execution.builder().namespace("namespace").flowId("flow").id("execution").build(); var execution = Execution.builder().tenantId("tenant").namespace("namespace").flowId("flow").id("execution").build();
Logs.logExecution(execution, log, Level.INFO, "Some log");
Logs.logExecution(execution, log, Level.INFO, "Some log with an {}", "attribute");
Logs.logExecution(execution, Level.INFO, "Some log"); Logs.logExecution(execution, Level.INFO, "Some log");
Logs.logExecution(execution, Level.INFO, "Some log with an {}", "attribute");
Logs.logExecution(execution, Level.INFO, "Some log");
List<ILoggingEvent> logs = MEMORY_APPENDER.getLogs();
assertThat(logs).hasSize(3);
assertThat(logs.getFirst().getLoggerName()).isEqualTo("executor.tenant.namespace.flow");
} }
@Test @Test
void logTrigger() { void logTrigger() {
var trigger = TriggerContext.builder().namespace("namespace").flowId("flow").triggerId("trigger").build(); var trigger = TriggerContext.builder().tenantId("tenant").namespace("namespace").flowId("flow").triggerId("trigger").build();
Logs.logTrigger(trigger, log, Level.INFO, "Some log");
Logs.logTrigger(trigger, log, Level.INFO, "Some log with an {}", "attribute");
Logs.logTrigger(trigger, Level.INFO, "Some log"); Logs.logTrigger(trigger, Level.INFO, "Some log");
Logs.logTrigger(trigger, Level.INFO, "Some log with an {}", "attribute");
Logs.logTrigger(trigger, Level.INFO, "Some log");
List<ILoggingEvent> logs = MEMORY_APPENDER.getLogs();
assertThat(logs).hasSize(3);
assertThat(logs.getFirst().getLoggerName()).isEqualTo("scheduler.tenant.namespace.flow.trigger");
} }
@Test @Test
void logTaskRun() { void logTaskRun() {
var taskRun = TaskRun.builder().namespace("namespace").flowId("flow").executionId("execution").taskId("task").id("taskRun").build(); var taskRun = TaskRun.builder().tenantId("tenant").namespace("namespace").flowId("flow").executionId("execution").taskId("task").id("taskRun").build();
Logs.logTaskRun(taskRun, Level.INFO, "Some log"); Logs.logTaskRun(taskRun, Level.INFO, "Some log");
Logs.logTaskRun(taskRun, Level.INFO, "Some log with an {}", "attribute"); Logs.logTaskRun(taskRun, Level.INFO, "Some log with an {}", "attribute");
taskRun = TaskRun.builder().namespace("namespace").flowId("flow").executionId("execution").taskId("task").id("taskRun").value("value").build(); taskRun = TaskRun.builder().namespace("namespace").flowId("flow").executionId("execution").taskId("task").id("taskRun").value("value").build();
Logs.logTaskRun(taskRun, Level.INFO, "Some log"); Logs.logTaskRun(taskRun, Level.INFO, "Some log");
Logs.logTaskRun(taskRun, Level.INFO, "Some log with an {}", "attribute"); Logs.logTaskRun(taskRun, Level.INFO, "Some log with an {}", "attribute");
List<ILoggingEvent> logs = MEMORY_APPENDER.getLogs();
assertThat(logs).hasSize(4);
assertThat(logs.getFirst().getLoggerName()).isEqualTo("worker.tenant.namespace.flow.task");
}
private static class InMemoryAppender extends AppenderBase<ILoggingEvent> {
private final List<ILoggingEvent> logs = new CopyOnWriteArrayList<>();
@Override
protected void append(ILoggingEvent event) {
logs.add(event);
}
public List<ILoggingEvent> getLogs() {
return logs;
}
public void clear() {
logs.clear();
}
} }
} }

View File

@@ -216,4 +216,23 @@ class MapUtilsTest {
"k1.k4", "v2" "k1.k4", "v2"
)); ));
} }
@Test
@SuppressWarnings("unchecked")
void mergeShouldNotDuplicateListElements() {
Map<String, Object> first = Map.of(
"key1", "value1",
"key2", List.of("something", "else")
);
Map<String, Object> second = Map.of(
"key2", List.of("something", "other"),
"key3", "value3"
);
Map<String, Object> results = MapUtils.merge(first, second);
assertThat(results).hasSize(3);
List<String> list = (List<String>) results.get("key2");
assertThat(list).hasSize(3);
}
} }

View File

@@ -20,7 +20,6 @@ import org.junit.jupiter.api.parallel.ExecutionMode;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Duration; import java.time.Duration;
@@ -45,9 +44,6 @@ class NamespaceFilesUtilsTest {
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED) @Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
QueueInterface<LogEntry> workerTaskLogQueue; QueueInterface<LogEntry> workerTaskLogQueue;
@Inject
NamespaceFilesUtils namespaceFilesUtils;
@Inject @Inject
NamespaceFactory namespaceFactory; NamespaceFactory namespaceFactory;
@@ -66,7 +62,7 @@ class NamespaceFilesUtilsTest {
namespaceStorage.putFile(Path.of("/" + i + ".txt"), data); namespaceStorage.putFile(Path.of("/" + i + ".txt"), data);
} }
namespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().build()); NamespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().build());
List<LogEntry> logEntry = TestsUtils.awaitLogs(logs, 1); List<LogEntry> logEntry = TestsUtils.awaitLogs(logs, 1);
receive.blockLast(); receive.blockLast();
@@ -91,7 +87,7 @@ class NamespaceFilesUtilsTest {
namespaceStorage.putFile(Path.of("/" + i + ".txt"), data); namespaceStorage.putFile(Path.of("/" + i + ".txt"), data);
} }
namespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().namespaces(Property.ofValue(List.of(namespace))).build()); NamespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().namespaces(Property.ofValue(List.of(namespace))).build());
List<LogEntry> logEntry = TestsUtils.awaitLogs(logs, 1); List<LogEntry> logEntry = TestsUtils.awaitLogs(logs, 1);
receive.blockLast(); receive.blockLast();
@@ -116,7 +112,7 @@ class NamespaceFilesUtilsTest {
namespaceStorage.putFile(Path.of("/folder2/test.txt"), data); namespaceStorage.putFile(Path.of("/folder2/test.txt"), data);
namespaceStorage.putFile(Path.of("/test.txt"), data); namespaceStorage.putFile(Path.of("/test.txt"), data);
namespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().namespaces(Property.ofValue(List.of(namespace))).build()); NamespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder().namespaces(Property.ofValue(List.of(namespace))).build());
List<LogEntry> logEntry = TestsUtils.awaitLogs(logs, 1); List<LogEntry> logEntry = TestsUtils.awaitLogs(logs, 1);
receive.blockLast(); receive.blockLast();
@@ -141,7 +137,7 @@ class NamespaceFilesUtilsTest {
namespaceFactory.of(MAIN_TENANT, ns1, storageInterface).putFile(Path.of("/test.txt"), data); namespaceFactory.of(MAIN_TENANT, ns1, storageInterface).putFile(Path.of("/test.txt"), data);
namespaceFactory.of(MAIN_TENANT, ns2, storageInterface).putFile(Path.of("/test.txt"), data); namespaceFactory.of(MAIN_TENANT, ns2, storageInterface).putFile(Path.of("/test.txt"), data);
namespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder() NamespaceFilesUtils.loadNamespaceFiles(runContext, NamespaceFiles.builder()
.namespaces(Property.ofValue(List.of(ns1, ns2))) .namespaces(Property.ofValue(List.of(ns1, ns2)))
.folderPerNamespace(Property.ofValue(true)) .folderPerNamespace(Property.ofValue(true))
.build()); .build());

View File

@@ -0,0 +1,30 @@
package io.kestra.core.utils;
import io.kestra.core.models.Setting;
import io.kestra.core.repositories.SettingRepositoryInterface;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
@MicronautTest
class VersionProviderTest {
@Inject
private VersionProvider versionProvider;
@Inject
private SettingRepositoryInterface settingRepository;
@Test
void shouldResolveVersion() {
assertThat(versionProvider.getVersion()).endsWith("-SNAPSHOT");
// check that the version is persisted in settings
Optional<Setting> versionSettings = settingRepository.findByKey(Setting.INSTANCE_VERSION);
assertThat(versionSettings).isPresent();
assertThat(versionSettings.get().getValue()).isEqualTo(versionProvider.getVersion());
}
}

View File

@@ -9,9 +9,15 @@ import io.kestra.core.utils.TestsUtils;
import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import io.kestra.core.models.validations.ValidateConstraintViolation;
import io.kestra.core.services.FlowService;
import jakarta.validation.ConstraintViolationException; import jakarta.validation.ConstraintViolationException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.JsonLocation;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import java.io.File; import java.io.File;
import java.net.URL; import java.net.URL;
import java.util.Optional; import java.util.Optional;
@@ -23,6 +29,107 @@ class FlowValidationTest {
@Inject @Inject
private ModelValidator modelValidator; private ModelValidator modelValidator;
@Inject
private FlowService flowService;
private static final ObjectMapper mapper = new ObjectMapper();
// Helper class to create JsonProcessingException with location
private static class TestJsonProcessingException extends JsonProcessingException {
public TestJsonProcessingException(String msg, JsonLocation location) {
super(msg, location);
}
public TestJsonProcessingException(String msg) {
super(msg);
}
}
@Test
void testFormatYamlErrorMessage_WithExpectedFieldName() throws JsonProcessingException {
JsonProcessingException e = new TestJsonProcessingException("Expected a field name", new JsonLocation(null, 100, 5, 10));
Object dummyTarget = new Object(); // Dummy target for toConstraintViolationException
ConstraintViolationException result = YamlParser.toConstraintViolationException(dummyTarget, "test resource", e);
assertThat(result.getMessage()).contains("YAML syntax error: Invalid structure").contains("(at line 5)");
}
@Test
void testFormatYamlErrorMessage_WithMappingStartEvent() throws JsonProcessingException {
JsonProcessingException e = new TestJsonProcessingException("MappingStartEvent", new JsonLocation(null, 200, 3, 5));
Object dummyTarget = new Object();
ConstraintViolationException result = YamlParser.toConstraintViolationException(dummyTarget, "test resource", e);
assertThat(result.getMessage()).contains("YAML syntax error: Unexpected mapping start").contains("(at line 3)");
}
@Test
void testFormatYamlErrorMessage_WithScalarValue() throws JsonProcessingException {
JsonProcessingException e = new TestJsonProcessingException("Scalar value", new JsonLocation(null, 150, 7, 12));
Object dummyTarget = new Object();
ConstraintViolationException result = YamlParser.toConstraintViolationException(dummyTarget, "test resource", e);
assertThat(result.getMessage()).contains("YAML syntax error: Expected a simple value").contains("(at line 7)");
}
@Test
void testFormatYamlErrorMessage_GenericError() throws JsonProcessingException {
JsonProcessingException e = new TestJsonProcessingException("Some other error", new JsonLocation(null, 50, 2, 8));
Object dummyTarget = new Object();
ConstraintViolationException result = YamlParser.toConstraintViolationException(dummyTarget, "test resource", e);
assertThat(result.getMessage()).contains("YAML parsing error: Some other error").contains("(at line 2)");
}
@Test
void testFormatYamlErrorMessage_NoLocation() throws JsonProcessingException {
JsonProcessingException e = new TestJsonProcessingException("Expected a field name");
Object dummyTarget = new Object();
ConstraintViolationException result = YamlParser.toConstraintViolationException(dummyTarget, "test resource", e);
assertThat(result.getMessage()).contains("YAML syntax error: Invalid structure").doesNotContain("at line");
}
@Test
void testValidateFlowWithYamlSyntaxError() {
String invalidYaml = """
id: test-flow
namespace: io.kestra.unittest
tasks:
- id:hello
type: io.kestra.plugin.core.log.Log
message: {{ abc }}
""";
List<ValidateConstraintViolation> results = flowService.validate("my-tenant", invalidYaml);
assertThat(results).hasSize(1);
assertThat(results.getFirst().getConstraints()).contains("YAML parsing error").contains("at line");
}
@Test
void testValidateFlowWithUndefinedVariable() {
String yamlWithUndefinedVar = """
id: test-flow
namespace: io.kestra.unittest
tasks:
- id: hello
type: io.kestra.plugin.core.log.Log
message: {{ undefinedVar }}
""";
List<ValidateConstraintViolation> results = flowService.validate("my-tenant", yamlWithUndefinedVar);
assertThat(results).hasSize(1);
assertThat(results.getFirst().getConstraints()).contains("Validation error");
}
@Test @Test
void invalidRecursiveFlow() { void invalidRecursiveFlow() {
Flow flow = this.parse("flows/invalids/recursive-flow.yaml"); Flow flow = this.parse("flows/invalids/recursive-flow.yaml");
@@ -130,4 +237,4 @@ class FlowValidationTest {
return YamlParser.parse(file, Flow.class); return YamlParser.parse(file, Flow.class);
} }
} }

View File

@@ -1,6 +1,7 @@
package io.kestra.plugin.core.flow; package io.kestra.plugin.core.flow;
import com.google.common.io.CharStreams; import com.google.common.io.CharStreams;
import io.kestra.core.exceptions.InputOutputValidationException;
import io.kestra.core.junit.annotations.ExecuteFlow; import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.FlakyTest; import io.kestra.core.junit.annotations.FlakyTest;
import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.junit.annotations.KestraTest;
@@ -328,12 +329,12 @@ public class PauseTest {
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.PAUSED); assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.PAUSED);
ConstraintViolationException e = assertThrows( InputOutputValidationException e = assertThrows(
ConstraintViolationException.class, InputOutputValidationException.class,
() -> executionService.resume(execution, flow, State.Type.RUNNING, Mono.empty(), Pause.Resumed.now()).block() () -> executionService.resume(execution, flow, State.Type.RUNNING, Mono.empty(), Pause.Resumed.now()).block()
); );
assertThat(e.getMessage()).contains("Invalid input for `asked`, missing required input, but received `null`"); assertThat(e.getMessage()).contains( "Missing required input:asked");
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@@ -8,6 +8,7 @@ import io.kestra.core.models.flows.Output;
import io.kestra.core.models.flows.State; import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.State.History; import io.kestra.core.models.flows.State.History;
import io.kestra.core.runners.DefaultRunContext; import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.InputAndOutput;
import io.kestra.core.runners.SubflowExecutionResult; import io.kestra.core.runners.SubflowExecutionResult;
import io.kestra.core.services.VariablesService; import io.kestra.core.services.VariablesService;
import io.micronaut.context.ApplicationContext; import io.micronaut.context.ApplicationContext;
@@ -46,11 +47,15 @@ class SubflowTest {
@Mock @Mock
private ApplicationContext applicationContext; private ApplicationContext applicationContext;
@Mock
private InputAndOutput inputAndOutput;
@BeforeEach @BeforeEach
void beforeEach() { void beforeEach() {
Mockito.when(applicationContext.getBean(VariablesService.class)).thenReturn(new VariablesService()); Mockito.when(applicationContext.getBean(VariablesService.class)).thenReturn(new VariablesService());
Mockito.when(runContext.logger()).thenReturn(LOG); Mockito.when(runContext.logger()).thenReturn(LOG);
Mockito.when(runContext.getApplicationContext()).thenReturn(applicationContext); Mockito.when(runContext.getApplicationContext()).thenReturn(applicationContext);
Mockito.when(runContext.inputAndOutput()).thenReturn(inputAndOutput);
} }
@Test @Test
@@ -118,7 +123,7 @@ class SubflowTest {
Map<String, Object> outputs = Map.of("key", "value"); Map<String, Object> outputs = Map.of("key", "value");
Mockito.when(runContext.render(Mockito.anyMap())).thenReturn(outputs); Mockito.when(runContext.render(Mockito.anyMap())).thenReturn(outputs);
Mockito.when(inputAndOutput.renderOutputs(Mockito.anyList())).thenReturn(Map.of("key", "value"));
Subflow subflow = Subflow.builder() Subflow subflow = Subflow.builder()
.outputs(outputs) .outputs(outputs)
@@ -159,6 +164,7 @@ class SubflowTest {
Output output = Output.builder().id("key").value("value").build(); Output output = Output.builder().id("key").value("value").build();
Mockito.when(runContext.render(Mockito.anyMap())).thenReturn(Map.of(output.getId(), output.getValue())); Mockito.when(runContext.render(Mockito.anyMap())).thenReturn(Map.of(output.getId(), output.getValue()));
Mockito.when(inputAndOutput.typedOutputs(Mockito.any(), Mockito.any(), Mockito.anyMap())).thenReturn(Map.of("key", "value"));
Flow flow = Flow.builder() Flow flow = Flow.builder()
.outputs(List.of(output)) .outputs(List.of(output))
.build(); .build();

View File

@@ -57,7 +57,7 @@ class ScheduleOnDatesTest {
} }
@Test @Test
public void shouldReturnFirstDateWhenNextEvaluationDateAndNoExistingTriggerDate() throws Exception { public void shouldReturnFirstDateWhenNextEvaluationDateAndNoExistingTriggerDate() {
// given // given
var now = ZonedDateTime.now(); var now = ZonedDateTime.now();
var before = now.minusMinutes(1).truncatedTo(ChronoUnit.SECONDS); var before = now.minusMinutes(1).truncatedTo(ChronoUnit.SECONDS);
@@ -75,7 +75,7 @@ class ScheduleOnDatesTest {
ZonedDateTime nextDate = scheduleOnDates.nextEvaluationDate(conditionContext, Optional.empty()); ZonedDateTime nextDate = scheduleOnDates.nextEvaluationDate(conditionContext, Optional.empty());
// then // then
assertThat(nextDate).isEqualTo(before); assertThat(nextDate).isEqualTo(after);
} }
@Test @Test

View File

@@ -13,6 +13,7 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.Type; import io.kestra.core.models.flows.Type;
import io.kestra.core.models.flows.input.StringInput; import io.kestra.core.models.flows.input.StringInput;
import io.kestra.core.models.flows.input.MultiselectInput;
import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.TriggerContext; import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.runners.RunContextFactory; import io.kestra.core.runners.RunContextFactory;
@@ -103,8 +104,9 @@ class ScheduleTest {
); );
assertThat(evaluate.isPresent()).isTrue(); assertThat(evaluate.isPresent()).isTrue();
assertThat(evaluate.get().getLabels()).hasSize(3); assertThat(evaluate.get().getLabels()).hasSize(4);
assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID))); assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID)));
assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.equals(new Label(Label.FROM, "trigger"))));
assertThat(evaluate.get().getVariables()).containsEntry("custom_var", "VARIABLE VALUE"); assertThat(evaluate.get().getVariables()).containsEntry("custom_var", "VARIABLE VALUE");
var vars = evaluate.get().getTrigger().getVariables(); var vars = evaluate.get().getTrigger().getVariables();
var inputs = evaluate.get().getInputs(); var inputs = evaluate.get().getInputs();
@@ -137,8 +139,9 @@ class ScheduleTest {
); );
assertThat(evaluate.isPresent()).isTrue(); assertThat(evaluate.isPresent()).isTrue();
assertThat(evaluate.get().getLabels()).hasSize(3); assertThat(evaluate.get().getLabels()).hasSize(4);
assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID))); assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.key().equals(Label.CORRELATION_ID)));
assertTrue(evaluate.get().getLabels().stream().anyMatch(label -> label.equals(new Label(Label.FROM, "trigger"))));
assertThat(evaluate.get().getVariables()).containsEntry("custom_var", "VARIABLE VALUE"); assertThat(evaluate.get().getVariables()).containsEntry("custom_var", "VARIABLE VALUE");
var inputs = evaluate.get().getInputs(); var inputs = evaluate.get().getInputs();
@@ -475,6 +478,81 @@ class ScheduleTest {
assertThat(result.get().getVariables()).containsEntry("custom_var", "VARIABLE VALUE"); assertThat(result.get().getVariables()).containsEntry("custom_var", "VARIABLE VALUE");
} }
@Test
void successWithMultiselectInputDefaults() throws Exception {
Schedule trigger = Schedule.builder().id("schedule").type(Schedule.class.getName()).cron("0 0 1 * *").build();
ZonedDateTime date = ZonedDateTime.now()
.withDayOfMonth(1)
.withHour(0)
.withMinute(0)
.withSecond(0)
.truncatedTo(ChronoUnit.SECONDS)
.minusMonths(1);
Optional<Execution> evaluate = trigger.evaluate(
conditionContextWithMultiselectInput(trigger),
triggerContext(date, trigger));
assertThat(evaluate.isPresent()).isTrue();
var inputs = evaluate.get().getInputs();
// Verify MULTISELECT input with explicit defaults works correctly
assertThat(inputs.get("multiselectInput")).isEqualTo(List.of("option1", "option2"));
}
@Test
void successWithMultiselectInputAutoSelectFirst() throws Exception {
Schedule trigger = Schedule.builder().id("schedule").type(Schedule.class.getName()).cron("0 0 1 * *").build();
ZonedDateTime date = ZonedDateTime.now()
.withDayOfMonth(1)
.withHour(0)
.withMinute(0)
.withSecond(0)
.truncatedTo(ChronoUnit.SECONDS)
.minusMonths(1);
Optional<Execution> evaluate = trigger.evaluate(
conditionContextWithMultiselectAutoSelectFirst(trigger),
triggerContext(date, trigger));
assertThat(evaluate.isPresent()).isTrue();
var inputs = evaluate.get().getInputs();
// Verify MULTISELECT input with autoSelectFirst defaults to first option
assertThat(inputs.get("multiselectAutoSelect")).isEqualTo(List.of("first"));
}
@Test
void successWithMultiselectInputProvidedValue() throws Exception {
// Test that provided values override defaults for MULTISELECT
Schedule trigger = Schedule.builder()
.id("schedule")
.type(Schedule.class.getName())
.cron("0 0 1 * *")
.inputs(Map.of("multiselectInput", List.of("option3")))
.build();
ZonedDateTime date = ZonedDateTime.now()
.withDayOfMonth(1)
.withHour(0)
.withMinute(0)
.withSecond(0)
.truncatedTo(ChronoUnit.SECONDS)
.minusMonths(1);
Optional<Execution> evaluate = trigger.evaluate(
conditionContextWithMultiselectInput(trigger),
triggerContext(date, trigger));
assertThat(evaluate.isPresent()).isTrue();
var inputs = evaluate.get().getInputs();
// Verify provided value overrides defaults
assertThat(inputs.get("multiselectInput")).isEqualTo(List.of("option3"));
}
private ConditionContext conditionContext(AbstractTrigger trigger) { private ConditionContext conditionContext(AbstractTrigger trigger) {
Flow flow = Flow.builder() Flow flow = Flow.builder()
.id(IdUtils.create()) .id(IdUtils.create())
@@ -504,17 +582,79 @@ class ScheduleTest {
.build(); .build();
} }
private ConditionContext conditionContextWithMultiselectInput(AbstractTrigger trigger) {
Flow flow = Flow.builder()
.id(IdUtils.create())
.namespace("io.kestra.tests")
.labels(
List.of(
new Label("flow-label-1", "flow-label-1"),
new Label("flow-label-2", "flow-label-2")))
.variables(Map.of("custom_var", "VARIABLE VALUE"))
.inputs(List.of(
MultiselectInput.builder()
.id("multiselectInput")
.type(Type.MULTISELECT)
.values(List.of("option1", "option2", "option3"))
.defaults(Property.ofValue(List.of("option1", "option2")))
.build()))
.build();
TriggerContext triggerContext = TriggerContext.builder()
.namespace(flow.getNamespace())
.flowId(flow.getId())
.triggerId(trigger.getId())
.build();
return ConditionContext.builder()
.runContext(runContextInitializer.forScheduler((DefaultRunContext) runContextFactory.of(),
triggerContext, trigger))
.flow(flow)
.build();
}
private ConditionContext conditionContextWithMultiselectAutoSelectFirst(AbstractTrigger trigger) {
Flow flow = Flow.builder()
.id(IdUtils.create())
.namespace("io.kestra.tests")
.labels(
List.of(
new Label("flow-label-1", "flow-label-1"),
new Label("flow-label-2", "flow-label-2")))
.variables(Map.of("custom_var", "VARIABLE VALUE"))
.inputs(List.of(
MultiselectInput.builder()
.id("multiselectAutoSelect")
.type(Type.MULTISELECT)
.values(List.of("first", "second", "third"))
.autoSelectFirst(true)
.build()))
.build();
TriggerContext triggerContext = TriggerContext.builder()
.namespace(flow.getNamespace())
.flowId(flow.getId())
.triggerId(trigger.getId())
.build();
return ConditionContext.builder()
.runContext(runContextInitializer.forScheduler((DefaultRunContext) runContextFactory.of(),
triggerContext, trigger))
.flow(flow)
.build();
}
private ZonedDateTime dateFromVars(String date, ZonedDateTime expexted) { private ZonedDateTime dateFromVars(String date, ZonedDateTime expexted) {
return ZonedDateTime.parse(date).withZoneSameInstant(expexted.getZone()); return ZonedDateTime.parse(date).withZoneSameInstant(expexted.getZone());
} }
@Test @Test
void shouldGetNextExecutionDateWithConditionMatchingFutureDate() throws InternalException { void shouldGetNextExecutionDateWithConditionMatchingFutureDate() throws InternalException {
ZonedDateTime now = ZonedDateTime.now().withZoneSameLocal(ZoneId.of("Europe/Paris")); ZonedDateTime now = ZonedDateTime.now().withZoneSameLocal(ZoneId.of("Europe/Paris"));
OffsetTime before = now.minusHours(1).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0); OffsetTime before = now.minusHours(1).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0);
OffsetTime after = now.minusHours(4).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0); OffsetTime after = now.minusHours(4).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0);
Schedule trigger = Schedule.builder() Schedule trigger = Schedule.builder()
.id("schedule").type(Schedule.class.getName()) .id("schedule").type(Schedule.class.getName())
.cron("0 * * * *") // every hour .cron("0 * * * *") // every hour
@@ -527,25 +667,25 @@ class ScheduleTest {
.build() .build()
)) ))
.build(); .build();
TriggerContext triggerContext = triggerContext(now, trigger).toBuilder().build(); TriggerContext triggerContext = triggerContext(now, trigger).toBuilder().build();
ConditionContext conditionContext = ConditionContext.builder() ConditionContext conditionContext = ConditionContext.builder()
.runContext(runContextInitializer.forScheduler((DefaultRunContext) runContextFactory.of(), triggerContext, trigger)) .runContext(runContextInitializer.forScheduler((DefaultRunContext) runContextFactory.of(), triggerContext, trigger))
.build(); .build();
Optional<ZonedDateTime> result = trigger.truePreviousNextDateWithCondition(trigger.executionTime(), conditionContext, now, true); Optional<ZonedDateTime> result = trigger.truePreviousNextDateWithCondition(trigger.executionTime(), conditionContext, now, true);
assertThat(result).isNotEmpty(); assertThat(result).isNotEmpty();
} }
@Test @Test
void shouldGetNextExecutionDateWithConditionMatchingCurrentDate() throws InternalException { void shouldGetNextExecutionDateWithConditionMatchingCurrentDate() throws InternalException {
ZonedDateTime now = ZonedDateTime.now().withZoneSameLocal(ZoneId.of("Europe/Paris")); ZonedDateTime now = ZonedDateTime.now().withZoneSameLocal(ZoneId.of("Europe/Paris"));
OffsetTime before = now.plusHours(2).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0); OffsetTime before = now.plusHours(2).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0);
OffsetTime after = now.minusHours(2).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0); OffsetTime after = now.minusHours(2).toOffsetDateTime().toOffsetTime().withMinute(0).withSecond(0).withNano(0);
Schedule trigger = Schedule.builder() Schedule trigger = Schedule.builder()
.id("schedule").type(Schedule.class.getName()) .id("schedule").type(Schedule.class.getName())
.cron("*/30 * * * * *") .cron("*/30 * * * * *")
@@ -558,13 +698,13 @@ class ScheduleTest {
.build() .build()
)) ))
.build(); .build();
TriggerContext triggerContext = triggerContext(now, trigger).toBuilder().build(); TriggerContext triggerContext = triggerContext(now, trigger).toBuilder().build();
ConditionContext conditionContext = ConditionContext.builder() ConditionContext conditionContext = ConditionContext.builder()
.runContext(runContextInitializer.forScheduler((DefaultRunContext) runContextFactory.of(), triggerContext, trigger)) .runContext(runContextInitializer.forScheduler((DefaultRunContext) runContextFactory.of(), triggerContext, trigger))
.build(); .build();
Optional<ZonedDateTime> result = trigger.truePreviousNextDateWithCondition(trigger.executionTime(), conditionContext, now, true); Optional<ZonedDateTime> result = trigger.truePreviousNextDateWithCondition(trigger.executionTime(), conditionContext, now, true);
assertThat(result).isNotEmpty(); assertThat(result).isNotEmpty();
} }

View File

@@ -0,0 +1,18 @@
id: inputs-with-multiple-constraint-violations
namespace: io.kestra.tests
inputs:
- id: multi
type: MULTISELECT
values:
- A
- B
- C
options:
- X
- Y
- Z
tasks:
- id: validMultiSelect
type: io.kestra.plugin.core.debug.Return
format: "{{inputs.multi}}"

View File

@@ -0,0 +1,10 @@
id: each-pause
namespace: io.kestra.tests
tasks:
- id: each_task
type: io.kestra.plugin.core.flow.ForEach
values: '["a", "b"]'
tasks:
- id: pause
type: io.kestra.plugin.core.flow.Pause

View File

@@ -8,4 +8,4 @@ concurrency:
tasks: tasks:
- id: sleep - id: sleep
type: io.kestra.plugin.core.flow.Sleep type: io.kestra.plugin.core.flow.Sleep
duration: PT10S duration: PT2S

View File

@@ -402,10 +402,11 @@ public class ExecutorService {
if (flow.getOutputs() != null) { if (flow.getOutputs() != null) {
RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution()); RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution());
var inputAndOutput = runContext.inputAndOutput();
try { try {
Map<String, Object> outputs = FlowInputOutput.renderFlowOutputs(flow.getOutputs(), runContext); Map<String, Object> outputs = inputAndOutput.renderOutputs(flow.getOutputs());
outputs = flowInputOutput.typedOutputs(flow, executor.getExecution(), outputs); outputs = inputAndOutput.typedOutputs(flow, executor.getExecution(), outputs);
newExecution = newExecution.withOutputs(outputs); newExecution = newExecution.withOutputs(outputs);
} catch (Exception e) { } catch (Exception e) {
Logs.logExecution( Logs.logExecution(

View File

@@ -0,0 +1,22 @@
DROP INDEX logs_execution_id;
DROP INDEX logs_execution_id__task_id;
DROP INDEX logs_execution_id__taskrun_id;
DROP INDEX logs_namespace_flow;
ALTER table logs drop column "deleted";
CREATE INDEX IF NOT EXISTS logs_execution_id ON logs ("execution_id");
CREATE INDEX IF NOT EXISTS logs_execution_id__task_id ON logs ("execution_id", "task_id");
CREATE INDEX IF NOT EXISTS logs_execution_id__taskrun_id ON logs ("execution_id", "taskrun_id");
CREATE INDEX IF NOT EXISTS logs_namespace_flow ON logs ("tenant_id", "timestamp", "level", "namespace", "flow_id");
DROP INDEX IF EXISTS metrics_flow_id;
DROP INDEX IF EXISTS metrics_execution_id;
DROP INDEX IF EXISTS metrics_timestamp;
ALTER TABLE metrics drop column "deleted";
CREATE INDEX IF NOT EXISTS metrics_flow_id ON metrics ("tenant_id", "namespace", "flow_id");
CREATE INDEX IF NOT EXISTS metrics_execution_id ON metrics ("execution_id");
CREATE INDEX IF NOT EXISTS metrics_timestamp ON metrics ("tenant_id", "timestamp");

View File

@@ -1,6 +1,6 @@
package io.kestra.runner.h2; package io.kestra.runner.h2;
import io.kestra.core.runners.AbstractRunnerConcurrencyTest; import io.kestra.jdbc.runner.JdbcConcurrencyRunnerTest;
public class H2RunnerConcurrencyTest extends AbstractRunnerConcurrencyTest { public class H2RunnerConcurrencyTest extends JdbcConcurrencyRunnerTest {
} }

View File

@@ -0,0 +1,22 @@
ALTER TABLE logs DROP INDEX ix_execution_id;
ALTER TABLE logs DROP INDEX ix_execution_id__task_id;
ALTER TABLE logs DROP INDEX ix_execution_id__taskrun_id;
ALTER TABLE logs DROP INDEX ix_namespace_flow;
ALTER table logs drop column `deleted`;
ALTER TABLE logs ADD INDEX ix_execution_id (`execution_id`), ALGORITHM=INPLACE, LOCK=NONE;
ALTER TABLE logs ADD INDEX ix_execution_id__task_id (`execution_id`, `task_id`), ALGORITHM=INPLACE, LOCK=NONE;
ALTER TABLE logs ADD INDEX ix_execution_id__taskrun_id (`execution_id`, `taskrun_id`), ALGORITHM=INPLACE, LOCK=NONE;
ALTER TABLE logs ADD INDEX ix_namespace_flow (`tenant_id`, `timestamp`, `level`, `namespace`, `flow_id`), ALGORITHM=INPLACE, LOCK=NONE;
ALTER TABLE metrics DROP INDEX metrics_flow_id;
ALTER TABLE metrics DROP INDEX ix_metrics_execution_id;
ALTER TABLE metrics DROP INDEX metrics_timestamp;
ALTER TABLE metrics drop column `deleted`;
ALTER TABLE metrics ADD INDEX ix_metrics_flow_id (`tenant_id`, `namespace`, `flow_id`), ALGORITHM=INPLACE, LOCK=NONE;
ALTER TABLE metrics ADD INDEX ix_metrics_execution_id (`execution_id`), ALGORITHM=INPLACE, LOCK=NONE;
ALTER TABLE metrics ADD INDEX ix_metrics_timestamp (`tenant_id`, `timestamp`), ALGORITHM=INPLACE, LOCK=NONE;

View File

@@ -1,6 +1,6 @@
package io.kestra.runner.mysql; package io.kestra.runner.mysql;
import io.kestra.core.runners.AbstractRunnerConcurrencyTest; import io.kestra.jdbc.runner.JdbcConcurrencyRunnerTest;
public class MysqlRunnerConcurrencyTest extends AbstractRunnerConcurrencyTest { public class MysqlRunnerConcurrencyTest extends JdbcConcurrencyRunnerTest {
} }

Some files were not shown because too many files have changed in this diff Show More