mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-26 05:00:31 -05:00
Compare commits
238 Commits
fix/missin
...
feat/asset
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6c07ecd3d4 | ||
|
|
950223e0a8 | ||
|
|
d48f3b9bd9 | ||
|
|
291fba3281 | ||
|
|
db3b3236ac | ||
|
|
5a8a631b47 | ||
|
|
2da191896f | ||
|
|
111026369b | ||
|
|
e3a0e59e9c | ||
|
|
f352be5746 | ||
|
|
5fc6d0b5d7 | ||
|
|
de5750f656 | ||
|
|
fa870b8df2 | ||
|
|
fa4bf64a23 | ||
|
|
27f81b5b6d | ||
|
|
90d322cd67 | ||
|
|
04246ace13 | ||
|
|
e0e745cb91 | ||
|
|
c69ecd7200 | ||
|
|
4b3419bc15 | ||
|
|
352d4eb194 | ||
|
|
e433833e62 | ||
|
|
d16a8de90f | ||
|
|
4784e459d6 | ||
|
|
2abea0fcde | ||
|
|
5d5165b7b9 | ||
|
|
44d0c10713 | ||
|
|
167734e32a | ||
|
|
24e61c81c0 | ||
|
|
379764a033 | ||
|
|
d55dd275c3 | ||
|
|
f409657e8a | ||
|
|
22f0b3ffdf | ||
|
|
0d99dc6862 | ||
|
|
fd3adc48b8 | ||
|
|
1a8a47c8cd | ||
|
|
7ea95f393e | ||
|
|
6935900699 | ||
|
|
0bc8e8d74a | ||
|
|
7f77b24ae0 | ||
|
|
ec6820dc25 | ||
|
|
d94193c143 | ||
|
|
c9628047fa | ||
|
|
4cbc069af4 | ||
|
|
eabe573fe6 | ||
|
|
ecd64617c3 | ||
|
|
a5650bca0f | ||
|
|
ed59e262d4 | ||
|
|
a5f9d54f7d | ||
|
|
47f4f43198 | ||
|
|
5d31c97f7f | ||
|
|
f8107285c4 | ||
|
|
8dc8dc1796 | ||
|
|
834dfd2947 | ||
|
|
6edb88841f | ||
|
|
5653531628 | ||
|
|
ee61276106 | ||
|
|
abcf76f7b4 | ||
|
|
67ada7f61b | ||
|
|
0c13633f77 | ||
|
|
a6cf2015ff | ||
|
|
2f9216c70b | ||
|
|
1903e6fac5 | ||
|
|
2d2cb00cab | ||
|
|
01b5441d16 | ||
|
|
efc778e294 | ||
|
|
60235a4e73 | ||
|
|
b167c52e76 | ||
|
|
216b124294 | ||
|
|
b6e4df8de2 | ||
|
|
429e7c7945 | ||
|
|
e302b4be4a | ||
|
|
8e7ad9ae25 | ||
|
|
41a11abf16 | ||
|
|
1be16d5e9d | ||
|
|
e263224d7b | ||
|
|
12b89588a6 | ||
|
|
eae5eb80cb | ||
|
|
c0f6298484 | ||
|
|
ba1d6b2232 | ||
|
|
048dcb80cc | ||
|
|
a81de811d7 | ||
|
|
a960a9f982 | ||
|
|
c4d4fd935f | ||
|
|
f063a5a2d9 | ||
|
|
ac91d5605f | ||
|
|
e3d3c3651b | ||
|
|
5b6836237e | ||
|
|
2f8284b133 | ||
|
|
42992fd7c3 | ||
|
|
3a481f93d3 | ||
|
|
7e964ae563 | ||
|
|
25e54edbc9 | ||
|
|
e88dc7af76 | ||
|
|
b7a027f0dc | ||
|
|
98141d6010 | ||
|
|
bf119ab6df | ||
|
|
9bd6353b77 | ||
|
|
c0ab581cf1 | ||
|
|
0f38e19663 | ||
|
|
0c14ea621c | ||
|
|
fb14e57a7c | ||
|
|
09c707d865 | ||
|
|
86e08d71dd | ||
|
|
94c00cedeb | ||
|
|
eb12832b1e | ||
|
|
687cefdfb9 | ||
|
|
8eae8aba72 | ||
|
|
abdbb8d364 | ||
|
|
8a55ab3af6 | ||
|
|
b7cb933e1e | ||
|
|
3af003e5e4 | ||
|
|
c3861a5532 | ||
|
|
ae1f10f45a | ||
|
|
612dccfb8c | ||
|
|
2ae8df2f5f | ||
|
|
1abfa74a16 | ||
|
|
69a793b227 | ||
|
|
35ccb3e39b | ||
|
|
3a7fcb2aa1 | ||
|
|
103c5b92e9 | ||
|
|
5253eeef95 | ||
|
|
848f835191 | ||
|
|
3e55e67534 | ||
|
|
7bca8b4924 | ||
|
|
56febfb415 | ||
|
|
925b8c6954 | ||
|
|
708816fe67 | ||
|
|
5502473fa4 | ||
|
|
c6cf0147a4 | ||
|
|
2951f4b4bc | ||
|
|
4ea13e258b | ||
|
|
3f8dcb47fd | ||
|
|
42dc3b930c | ||
|
|
97a78abd28 | ||
|
|
b3b2ef1b5a | ||
|
|
596a26a137 | ||
|
|
8a9a1df436 | ||
|
|
55d0880ed3 | ||
|
|
a74ebd5cd6 | ||
|
|
f3aed38964 | ||
|
|
2595e56199 | ||
|
|
e821bd7f65 | ||
|
|
09762d2a8d | ||
|
|
018c22918f | ||
|
|
3e9c8cf7da | ||
|
|
008404e442 | ||
|
|
2b224bcde8 | ||
|
|
1977b61693 | ||
|
|
8e2267f86c | ||
|
|
24355c2a88 | ||
|
|
51adcfa908 | ||
|
|
a55baa1f96 | ||
|
|
32793fde18 | ||
|
|
4381d585ec | ||
|
|
e595e26c45 | ||
|
|
b833cf28b5 | ||
|
|
ac11e9545c | ||
|
|
a07df5f6cd | ||
|
|
f626c85346 | ||
|
|
e15b53ebb5 | ||
|
|
7edb6bc379 | ||
|
|
78c81f932b | ||
|
|
56bb3ca29c | ||
|
|
14029e8c14 | ||
|
|
bea3d63d89 | ||
|
|
24a3bbd303 | ||
|
|
f9932af2e8 | ||
|
|
e0410c8f24 | ||
|
|
424a6cb41a | ||
|
|
afde71e913 | ||
|
|
086c32e711 | ||
|
|
710abcfaac | ||
|
|
be951d015c | ||
|
|
a07260bef4 | ||
|
|
dd19f8391d | ||
|
|
354873e220 | ||
|
|
386d4a15f0 | ||
|
|
1b75f15680 | ||
|
|
957bf74d97 | ||
|
|
3cbad1ce0d | ||
|
|
760050e9fc | ||
|
|
43f47ec337 | ||
|
|
ad5521199a | ||
|
|
fe7849d7fe | ||
|
|
feeaeff0b2 | ||
|
|
ed6bc50163 | ||
|
|
069845f579 | ||
|
|
f613eb0433 | ||
|
|
e440c402b4 | ||
|
|
700527b5dc | ||
|
|
5245014a32 | ||
|
|
5db0f44fb6 | ||
|
|
a8635108b7 | ||
|
|
cd4470044e | ||
|
|
4ec7f23a7b | ||
|
|
107ba16ce3 | ||
|
|
042d548598 | ||
|
|
94bd6f0a1e | ||
|
|
f43f11e125 | ||
|
|
3dfa5f97c4 | ||
|
|
8898ba736b | ||
|
|
f5665bf719 | ||
|
|
b6db688003 | ||
|
|
93f5e366ed | ||
|
|
0465ffa5df | ||
|
|
e869c54883 | ||
|
|
32da15b2ea | ||
|
|
a72ecfc2eb | ||
|
|
7cb494b244 | ||
|
|
9d21ab4b26 | ||
|
|
8f3a5058b1 | ||
|
|
56fb304ff6 | ||
|
|
28370d80df | ||
|
|
cc5fd30b2c | ||
|
|
61c39d23c5 | ||
|
|
b5a40b2fcc | ||
|
|
825b9dbcdb | ||
|
|
393b132444 | ||
|
|
b0ce760e50 | ||
|
|
2d68dad70c | ||
|
|
d08a2d8930 | ||
|
|
2722735d2d | ||
|
|
e9b7d190d4 | ||
|
|
d6cfa01fd5 | ||
|
|
d8e3a9bd44 | ||
|
|
b18e3b76ef | ||
|
|
4660091dc9 | ||
|
|
067414ffbe | ||
|
|
26f6154eed | ||
|
|
ea44128d2b | ||
|
|
4602546045 | ||
|
|
aecd050314 | ||
|
|
d6c290cb91 | ||
|
|
56216ef0b4 | ||
|
|
e1f983cc2d | ||
|
|
68f92e1159 | ||
|
|
5b597b9520 |
1
.github/ISSUE_TEMPLATE/bug.yml
vendored
1
.github/ISSUE_TEMPLATE/bug.yml
vendored
@@ -2,6 +2,7 @@ name: Bug report
|
||||
description: Report a bug or unexpected behavior in the project
|
||||
|
||||
labels: ["bug", "area/backend", "area/frontend"]
|
||||
type: Bug
|
||||
|
||||
body:
|
||||
- type: markdown
|
||||
|
||||
1
.github/ISSUE_TEMPLATE/feature.yml
vendored
1
.github/ISSUE_TEMPLATE/feature.yml
vendored
@@ -2,6 +2,7 @@ name: Feature request
|
||||
description: Suggest a new feature or improvement to enhance the project
|
||||
|
||||
labels: ["enhancement", "area/backend", "area/frontend"]
|
||||
type: Feature
|
||||
|
||||
body:
|
||||
- type: textarea
|
||||
|
||||
99
.github/dependabot.yml
vendored
99
.github/dependabot.yml
vendored
@@ -26,7 +26,7 @@ updates:
|
||||
open-pull-requests-limit: 50
|
||||
labels: ["dependency-upgrade", "area/backend"]
|
||||
ignore:
|
||||
# Ignore versions of Protobuf that are equal to or greater than 4.0.0 as Orc still uses 3
|
||||
# Ignore versions of Protobuf >= 4.0.0 because Orc still uses version 3
|
||||
- dependency-name: "com.google.protobuf:*"
|
||||
versions: ["[4,)"]
|
||||
|
||||
@@ -44,68 +44,75 @@ updates:
|
||||
build:
|
||||
applies-to: version-updates
|
||||
patterns: ["@esbuild/*", "@rollup/*", "@swc/*"]
|
||||
|
||||
types:
|
||||
applies-to: version-updates
|
||||
patterns: ["@types/*"]
|
||||
|
||||
storybook:
|
||||
applies-to: version-updates
|
||||
patterns: ["@storybook/*"]
|
||||
patterns: ["storybook*", "@storybook/*", "eslint-plugin-storybook"]
|
||||
|
||||
vitest:
|
||||
applies-to: version-updates
|
||||
patterns: ["vitest", "@vitest/*"]
|
||||
patch:
|
||||
|
||||
major:
|
||||
update-types: ["major"]
|
||||
applies-to: version-updates
|
||||
exclude-patterns: [
|
||||
"@esbuild/*",
|
||||
"@rollup/*",
|
||||
"@swc/*",
|
||||
"@types/*",
|
||||
"storybook*",
|
||||
"@storybook/*",
|
||||
"eslint-plugin-storybook",
|
||||
"vitest",
|
||||
"@vitest/*",
|
||||
# Temporary exclusion of these packages from major updates
|
||||
"eslint-plugin-vue",
|
||||
]
|
||||
|
||||
minor:
|
||||
update-types: ["minor"]
|
||||
applies-to: version-updates
|
||||
exclude-patterns: [
|
||||
"@esbuild/*",
|
||||
"@rollup/*",
|
||||
"@swc/*",
|
||||
"@types/*",
|
||||
"storybook*",
|
||||
"@storybook/*",
|
||||
"eslint-plugin-storybook",
|
||||
"vitest",
|
||||
"@vitest/*",
|
||||
# Temporary exclusion of these packages from minor updates
|
||||
"moment-timezone",
|
||||
"monaco-editor",
|
||||
]
|
||||
|
||||
patch:
|
||||
update-types: ["patch"]
|
||||
applies-to: version-updates
|
||||
patterns: ["*"]
|
||||
exclude-patterns:
|
||||
[
|
||||
"@esbuild/*",
|
||||
"@rollup/*",
|
||||
"@swc/*",
|
||||
"@types/*",
|
||||
"storybook*",
|
||||
"@storybook/*",
|
||||
"vitest",
|
||||
"@vitest/*",
|
||||
]
|
||||
update-types: ["patch"]
|
||||
minor:
|
||||
applies-to: version-updates
|
||||
patterns: ["*"]
|
||||
exclude-patterns: [
|
||||
"@esbuild/*",
|
||||
"@rollup/*",
|
||||
"@swc/*",
|
||||
"@types/*",
|
||||
"@storybook/*",
|
||||
"vitest",
|
||||
"@vitest/*",
|
||||
# Temporary exclusion of packages below from minor updates
|
||||
"moment-timezone",
|
||||
"monaco-editor",
|
||||
]
|
||||
update-types: ["minor"]
|
||||
major:
|
||||
applies-to: version-updates
|
||||
patterns: ["*"]
|
||||
exclude-patterns: [
|
||||
"@esbuild/*",
|
||||
"@rollup/*",
|
||||
"@swc/*",
|
||||
"@types/*",
|
||||
"@storybook/*",
|
||||
"vitest",
|
||||
"@vitest/*",
|
||||
# Temporary exclusion of packages below from major updates
|
||||
"eslint-plugin-storybook",
|
||||
"eslint-plugin-vue",
|
||||
"vitest",
|
||||
"@vitest/*",
|
||||
]
|
||||
update-types: ["major"]
|
||||
ignore:
|
||||
# Ignore updates to monaco-yaml, version is pinned to 5.3.1 due to patch-package script additions
|
||||
- dependency-name: "monaco-yaml"
|
||||
versions:
|
||||
- ">=5.3.2"
|
||||
|
||||
# Ignore updates of version 1.x, as we're using the beta of 2.x (still in beta)
|
||||
ignore:
|
||||
# Ignore updates to monaco-yaml; version is pinned to 5.3.1 due to patch-package script additions
|
||||
- dependency-name: "monaco-yaml"
|
||||
versions: [">=5.3.2"]
|
||||
|
||||
# Ignore updates of version 1.x for vue-virtual-scroller, as the project uses the beta of 2.x
|
||||
- dependency-name: "vue-virtual-scroller"
|
||||
versions:
|
||||
- "1.x"
|
||||
versions: ["1.x"]
|
||||
|
||||
2
.github/pull_request_template.md
vendored
2
.github/pull_request_template.md
vendored
@@ -12,7 +12,7 @@ _Example: Replaces legacy scroll directive with the new API._
|
||||
### 🔗 Related Issue
|
||||
|
||||
Which issue does this PR resolve? Use [GitHub Keywords](https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/using-keywords-in-issues-and-pull-requests#linking-a-pull-request-to-an-issue) to automatically link the pull request to the issue.
|
||||
_Example: Closes https://github.com/kestra-io/kestra/issues/12345._
|
||||
_Example: Closes https://github.com/kestra-io/kestra/issues/ISSUE_NUMBER._
|
||||
|
||||
### 🎨 Frontend Checklist
|
||||
|
||||
|
||||
1
.github/workflows/main-build.yml
vendored
1
.github/workflows/main-build.yml
vendored
@@ -64,6 +64,7 @@ jobs:
|
||||
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
|
||||
|
||||
publish-develop-maven:
|
||||
|
||||
1
.github/workflows/release-docker.yml
vendored
1
.github/workflows/release-docker.yml
vendored
@@ -32,3 +32,4 @@ jobs:
|
||||
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
2
.github/workflows/vulnerabilities-check.yml
vendored
2
.github/workflows/vulnerabilities-check.yml
vendored
@@ -43,7 +43,7 @@ jobs:
|
||||
|
||||
# Upload dependency check report
|
||||
- name: Upload dependency check report
|
||||
uses: actions/upload-artifact@v5
|
||||
uses: actions/upload-artifact@v6
|
||||
if: ${{ always() }}
|
||||
with:
|
||||
name: dependency-check-report
|
||||
|
||||
@@ -29,8 +29,8 @@ start_time2=$(date +%s)
|
||||
|
||||
echo "cd ./ui"
|
||||
cd ./ui
|
||||
echo "npm i"
|
||||
npm i
|
||||
echo "npm ci"
|
||||
npm ci
|
||||
|
||||
echo 'sh ./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"'
|
||||
./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"
|
||||
|
||||
184
build.gradle
184
build.gradle
@@ -21,7 +21,7 @@ plugins {
|
||||
|
||||
// test
|
||||
id "com.adarshr.test-logger" version "4.0.0"
|
||||
id "org.sonarqube" version "7.1.0.6387"
|
||||
id "org.sonarqube" version "7.2.1.6560"
|
||||
id 'jacoco-report-aggregation'
|
||||
|
||||
// helper
|
||||
@@ -171,13 +171,22 @@ allprojects {
|
||||
subprojects {subProj ->
|
||||
|
||||
if (subProj.name != 'platform' && subProj.name != 'jmh-benchmarks') {
|
||||
|
||||
apply plugin: "com.adarshr.test-logger"
|
||||
apply plugin: 'jacoco'
|
||||
|
||||
java {
|
||||
sourceCompatibility = targetJavaVersion
|
||||
targetCompatibility = targetJavaVersion
|
||||
}
|
||||
|
||||
configurations {
|
||||
agent {
|
||||
canBeResolved = true
|
||||
canBeConsumed = true
|
||||
}
|
||||
}
|
||||
|
||||
dependencies {
|
||||
// Platform
|
||||
testAnnotationProcessor enforcedPlatform(project(":platform"))
|
||||
@@ -204,9 +213,16 @@ subprojects {subProj ->
|
||||
|
||||
//assertj
|
||||
testImplementation 'org.assertj:assertj-core'
|
||||
|
||||
agent "org.aspectj:aspectjweaver:1.9.25.1"
|
||||
|
||||
testImplementation platform("io.qameta.allure:allure-bom")
|
||||
testImplementation "io.qameta.allure:allure-junit5"
|
||||
}
|
||||
|
||||
def commonTestConfig = { Test t ->
|
||||
t.ignoreFailures = true
|
||||
|
||||
// set Xmx for test workers
|
||||
t.maxHeapSize = '4g'
|
||||
|
||||
@@ -223,13 +239,59 @@ subprojects {subProj ->
|
||||
t.environment 'ENV_TEST2', "Pass by env"
|
||||
|
||||
|
||||
if (subProj.name == 'core' || subProj.name == 'jdbc-h2' || subProj.name == 'jdbc-mysql' || subProj.name == 'jdbc-postgres') {
|
||||
// JUnit 5 parallel settings
|
||||
t.systemProperty 'junit.jupiter.execution.parallel.enabled', 'true'
|
||||
t.systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent'
|
||||
t.systemProperty 'junit.jupiter.execution.parallel.mode.classes.default', 'same_thread'
|
||||
t.systemProperty 'junit.jupiter.execution.parallel.config.strategy', 'dynamic'
|
||||
// if (subProj.name == 'core' || subProj.name == 'jdbc-h2' || subProj.name == 'jdbc-mysql' || subProj.name == 'jdbc-postgres') {
|
||||
// // JUnit 5 parallel settings
|
||||
// t.systemProperty 'junit.jupiter.execution.parallel.enabled', 'true'
|
||||
// t.systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent'
|
||||
// t.systemProperty 'junit.jupiter.execution.parallel.mode.classes.default', 'same_thread'
|
||||
// t.systemProperty 'junit.jupiter.execution.parallel.config.strategy', 'dynamic'
|
||||
// }
|
||||
}
|
||||
|
||||
tasks.register('integrationTest', Test) { Test t ->
|
||||
description = 'Runs integration tests'
|
||||
group = 'verification'
|
||||
|
||||
useJUnitPlatform {
|
||||
includeTags 'integration'
|
||||
}
|
||||
|
||||
testClassesDirs = sourceSets.test.output.classesDirs
|
||||
classpath = sourceSets.test.runtimeClasspath
|
||||
|
||||
reports {
|
||||
junitXml.required = true
|
||||
junitXml.outputPerTestCase = true
|
||||
junitXml.mergeReruns = true
|
||||
junitXml.includeSystemErrLog = true
|
||||
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
|
||||
}
|
||||
|
||||
// Integration tests typically not parallel (but you can enable)
|
||||
maxParallelForks = 1
|
||||
commonTestConfig(t)
|
||||
}
|
||||
|
||||
tasks.register('unitTest', Test) { Test t ->
|
||||
description = 'Runs unit tests'
|
||||
group = 'verification'
|
||||
|
||||
useJUnitPlatform {
|
||||
excludeTags 'flaky', 'integration'
|
||||
}
|
||||
|
||||
testClassesDirs = sourceSets.test.output.classesDirs
|
||||
classpath = sourceSets.test.runtimeClasspath
|
||||
|
||||
reports {
|
||||
junitXml.required = true
|
||||
junitXml.outputPerTestCase = true
|
||||
junitXml.mergeReruns = true
|
||||
junitXml.includeSystemErrLog = true
|
||||
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
|
||||
}
|
||||
|
||||
commonTestConfig(t)
|
||||
}
|
||||
|
||||
tasks.register('flakyTest', Test) { Test t ->
|
||||
@@ -239,7 +301,6 @@ subprojects {subProj ->
|
||||
useJUnitPlatform {
|
||||
includeTags 'flaky'
|
||||
}
|
||||
ignoreFailures = true
|
||||
|
||||
reports {
|
||||
junitXml.required = true
|
||||
@@ -249,10 +310,13 @@ subprojects {subProj ->
|
||||
junitXml.outputLocation = layout.buildDirectory.dir("test-results/flakyTest")
|
||||
}
|
||||
commonTestConfig(t)
|
||||
|
||||
}
|
||||
|
||||
test {
|
||||
// test task (default)
|
||||
tasks.named('test', Test) { Test t ->
|
||||
group = 'verification'
|
||||
description = 'Runs all non-flaky tests.'
|
||||
|
||||
useJUnitPlatform {
|
||||
excludeTags 'flaky'
|
||||
}
|
||||
@@ -263,10 +327,12 @@ subprojects {subProj ->
|
||||
junitXml.includeSystemErrLog = true
|
||||
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
|
||||
}
|
||||
commonTestConfig(it)
|
||||
commonTestConfig(t)
|
||||
jvmArgs = ["-javaagent:${configurations.agent.singleFile}"]
|
||||
}
|
||||
|
||||
|
||||
finalizedBy(tasks.named('flakyTest'))
|
||||
tasks.named('check') {
|
||||
dependsOn(tasks.named('test'))// default behaviour
|
||||
}
|
||||
|
||||
testlogger {
|
||||
@@ -282,83 +348,25 @@ subprojects {subProj ->
|
||||
}
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************\
|
||||
* End-to-End Tests
|
||||
**********************************************************************************************************************/
|
||||
def e2eTestsCheck = tasks.register('e2eTestsCheck') {
|
||||
group = 'verification'
|
||||
description = "Runs the 'check' task for all e2e-tests modules"
|
||||
doFirst {
|
||||
project.ext.set("e2e-tests", true)
|
||||
}
|
||||
}
|
||||
|
||||
subprojects {
|
||||
// Add e2e-tests modules check tasks to e2eTestsCheck
|
||||
if (project.name.startsWith("e2e-tests")) {
|
||||
test {
|
||||
onlyIf {
|
||||
project.hasProperty("e2e-tests")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
afterEvaluate {
|
||||
// Add e2e-tests modules check tasks to e2eTestsCheck
|
||||
if (project.name.startsWith("e2e-tests")) {
|
||||
e2eTestsCheck.configure {
|
||||
finalizedBy(check)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************\
|
||||
* Allure Reports
|
||||
**********************************************************************************************************************/
|
||||
subprojects {
|
||||
if (it.name != 'platform' && it.name != 'jmh-benchmarks') {
|
||||
dependencies {
|
||||
testImplementation platform("io.qameta.allure:allure-bom")
|
||||
testImplementation "io.qameta.allure:allure-junit5"
|
||||
}
|
||||
|
||||
configurations {
|
||||
agent {
|
||||
canBeResolved = true
|
||||
canBeConsumed = true
|
||||
}
|
||||
}
|
||||
|
||||
dependencies {
|
||||
agent "org.aspectj:aspectjweaver:1.9.25"
|
||||
}
|
||||
|
||||
test {
|
||||
jvmArgs = ["-javaagent:${configurations.agent.singleFile}"]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************\
|
||||
* Jacoco
|
||||
**********************************************************************************************************************/
|
||||
subprojects {
|
||||
if (it.name != 'platform' && it.name != 'jmh-benchmarks') {
|
||||
apply plugin: 'jacoco'
|
||||
|
||||
test {
|
||||
finalizedBy jacocoTestReport
|
||||
}
|
||||
|
||||
jacocoTestReport {
|
||||
dependsOn test
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tasks.named('check') {
|
||||
dependsOn tasks.named('testCodeCoverageReport', JacocoReport)
|
||||
finalizedBy jacocoTestReport
|
||||
}
|
||||
|
||||
tasks.register('unitTest') {
|
||||
// No jacocoTestReport here, because it depends by default on :test,
|
||||
// and that would make :test being run twice in our CI.
|
||||
// In practice the report will be generated later in the CI by :check.
|
||||
}
|
||||
|
||||
tasks.register('integrationTest') {
|
||||
dependsOn tasks.named('testCodeCoverageReport', JacocoReport)
|
||||
finalizedBy jacocoTestReport
|
||||
}
|
||||
|
||||
tasks.register('flakyTest') {
|
||||
dependsOn tasks.named('testCodeCoverageReport', JacocoReport)
|
||||
finalizedBy jacocoTestReport
|
||||
}
|
||||
|
||||
tasks.named('testCodeCoverageReport') {
|
||||
|
||||
@@ -42,7 +42,7 @@ import picocli.CommandLine.Option;
|
||||
@Introspected
|
||||
public abstract class AbstractCommand implements Callable<Integer> {
|
||||
@Inject
|
||||
private ApplicationContext applicationContext;
|
||||
protected ApplicationContext applicationContext;
|
||||
|
||||
@Inject
|
||||
private EndpointDefaultConfiguration endpointConfiguration;
|
||||
|
||||
@@ -18,7 +18,8 @@ import picocli.CommandLine;
|
||||
FlowDotCommand.class,
|
||||
FlowExportCommand.class,
|
||||
FlowUpdateCommand.class,
|
||||
FlowUpdatesCommand.class
|
||||
FlowUpdatesCommand.class,
|
||||
FlowsSyncFromSourceCommand.class
|
||||
}
|
||||
)
|
||||
@Slf4j
|
||||
|
||||
@@ -0,0 +1,55 @@
|
||||
package io.kestra.cli.commands.flows;
|
||||
|
||||
import io.kestra.cli.AbstractApiCommand;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.flows.GenericFlow;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import jakarta.inject.Inject;
|
||||
import java.util.List;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "syncFromSource",
|
||||
description = "Update a single flow",
|
||||
mixinStandardHelpOptions = true
|
||||
)
|
||||
@Slf4j
|
||||
public class FlowsSyncFromSourceCommand extends AbstractApiCommand {
|
||||
|
||||
@Inject
|
||||
private TenantIdSelectorService tenantService;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
FlowRepositoryInterface repository = applicationContext.getBean(FlowRepositoryInterface.class);
|
||||
String tenant = tenantService.getTenantId(tenantId);
|
||||
|
||||
List<FlowWithSource> persistedFlows = repository.findAllWithSource(tenant);
|
||||
|
||||
int count = 0;
|
||||
for (FlowWithSource persistedFlow : persistedFlows) {
|
||||
// Ensure exactly one trailing newline. We need this new line
|
||||
// because when we update a flow from its source,
|
||||
// we don't update it if no change is detected.
|
||||
// The goal here is to force an update from the source for every flows
|
||||
GenericFlow flow = GenericFlow.fromYaml(tenant,persistedFlow.getSource() + System.lineSeparator());
|
||||
repository.update(flow, persistedFlow);
|
||||
stdOut("- %s.%s".formatted(flow.getNamespace(), flow.getId()));
|
||||
count++;
|
||||
}
|
||||
stdOut("%s flow(s) successfully updated!".formatted(count));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
protected boolean loadExternalPlugins() {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.kestra.cli.commands.migrations.metadata;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.kestra.core.models.kv.PersistedKvMetadata;
|
||||
import io.kestra.core.models.namespaces.files.NamespaceFileMetadata;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
@@ -11,44 +12,40 @@ import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.storages.kv.InternalKVStore;
|
||||
import io.kestra.core.storages.kv.KVEntry;
|
||||
import io.kestra.core.tenant.TenantService;
|
||||
import jakarta.inject.Inject;
|
||||
import io.kestra.core.utils.NamespaceUtils;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.AllArgsConstructor;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.time.Instant;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.*;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
|
||||
@Singleton
|
||||
@AllArgsConstructor
|
||||
public class MetadataMigrationService {
|
||||
@Inject
|
||||
private TenantService tenantService;
|
||||
protected FlowRepositoryInterface flowRepository;
|
||||
protected TenantService tenantService;
|
||||
protected KvMetadataRepositoryInterface kvMetadataRepository;
|
||||
protected NamespaceFileMetadataRepositoryInterface namespaceFileMetadataRepository;
|
||||
protected StorageInterface storageInterface;
|
||||
protected NamespaceUtils namespaceUtils;
|
||||
|
||||
@Inject
|
||||
private FlowRepositoryInterface flowRepository;
|
||||
|
||||
@Inject
|
||||
private KvMetadataRepositoryInterface kvMetadataRepository;
|
||||
|
||||
@Inject
|
||||
private NamespaceFileMetadataRepositoryInterface namespaceFileMetadataRepository;
|
||||
|
||||
@Inject
|
||||
private StorageInterface storageInterface;
|
||||
|
||||
protected Map<String, List<String>> namespacesPerTenant() {
|
||||
@VisibleForTesting
|
||||
public Map<String, List<String>> namespacesPerTenant() {
|
||||
String tenantId = tenantService.resolveTenant();
|
||||
return Map.of(tenantId, flowRepository.findDistinctNamespace(tenantId));
|
||||
return Map.of(tenantId, Stream.concat(
|
||||
Stream.of(namespaceUtils.getSystemFlowNamespace()),
|
||||
flowRepository.findDistinctNamespace(tenantId).stream()
|
||||
).map(NamespaceUtils::asTree).flatMap(Collection::stream).distinct().toList());
|
||||
}
|
||||
|
||||
public void kvMigration() throws IOException {
|
||||
@@ -106,10 +103,14 @@ public class MetadataMigrationService {
|
||||
private static List<PathAndAttributes> listAllFromStorage(StorageInterface storage, Function<String, String> prefixFunction, String tenant, String namespace) throws IOException {
|
||||
try {
|
||||
String prefix = prefixFunction.apply(namespace);
|
||||
if (!storage.exists(tenant, namespace, URI.create(StorageContext.KESTRA_PROTOCOL + prefix))) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
return storage.allByPrefix(tenant, namespace, URI.create(StorageContext.KESTRA_PROTOCOL + prefix + "/"), true).stream()
|
||||
.map(throwFunction(uri -> new PathAndAttributes(uri.getPath().substring(prefix.length()), storage.getAttributes(tenant, namespace, uri))))
|
||||
.toList();
|
||||
} catch (FileNotFoundException e) {
|
||||
} catch (FileNotFoundException | NoSuchFileException e) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -57,7 +57,7 @@ public class StateStoreMigrateCommand extends AbstractCommand {
|
||||
String taskRunValue = statesUriPart.length > 2 ? statesUriPart[1] : null;
|
||||
String stateSubName = statesUriPart[statesUriPart.length - 1];
|
||||
boolean flowScoped = flowQualifierWithStateQualifiers[0].endsWith("/" + flow.getId());
|
||||
StateStore stateStore = new StateStore(runContext(runContextFactory, flow), false);
|
||||
StateStore stateStore = new StateStore(runContextFactory.of(flow, Map.of()), false);
|
||||
|
||||
try (InputStream is = storageInterface.get(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri)) {
|
||||
stateStore.putState(flowScoped, stateName, stateSubName, taskRunValue, is.readAllBytes());
|
||||
@@ -70,12 +70,4 @@ public class StateStoreMigrateCommand extends AbstractCommand {
|
||||
stdOut("Successfully ran the state-store migration.");
|
||||
return 0;
|
||||
}
|
||||
|
||||
private RunContext runContext(RunContextFactory runContextFactory, Flow flow) {
|
||||
Map<String, String> flowVariables = new HashMap<>();
|
||||
flowVariables.put("tenantId", flow.getTenantId());
|
||||
flowVariables.put("id", flow.getId());
|
||||
flowVariables.put("namespace", flow.getNamespace());
|
||||
return runContextFactory.of(flow, Map.of("flow", flowVariables));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,73 @@
|
||||
package io.kestra.cli.commands.flows;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import io.micronaut.runtime.server.EmbeddedServer;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
import java.util.List;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class FlowsSyncFromSourceCommandTest {
|
||||
@Test
|
||||
void updateAllFlowsFromSource() {
|
||||
URL directory = FlowUpdatesCommandTest.class.getClassLoader().getResource("flows");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
String[] args = {
|
||||
"--plugins",
|
||||
"/tmp", // pass this arg because it can cause failure
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
"--delete",
|
||||
directory.getPath(),
|
||||
};
|
||||
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString()).contains("successfully updated !");
|
||||
out.reset();
|
||||
|
||||
FlowRepositoryInterface repository = ctx.getBean(FlowRepositoryInterface.class);
|
||||
List<Flow> flows = repository.findAll(MAIN_TENANT);
|
||||
for (Flow flow : flows) {
|
||||
assertThat(flow.getRevision()).isEqualTo(1);
|
||||
}
|
||||
|
||||
args = new String[]{
|
||||
"--plugins",
|
||||
"/tmp", // pass this arg because it can cause failure
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word"
|
||||
|
||||
};
|
||||
PicocliRunner.call(FlowsSyncFromSourceCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString()).contains("4 flow(s) successfully updated!");
|
||||
assertThat(out.toString()).contains("- io.kestra.outsider.quattro");
|
||||
assertThat(out.toString()).contains("- io.kestra.cli.second");
|
||||
assertThat(out.toString()).contains("- io.kestra.cli.third");
|
||||
assertThat(out.toString()).contains("- io.kestra.cli.first");
|
||||
|
||||
flows = repository.findAll(MAIN_TENANT);
|
||||
for (Flow flow : flows) {
|
||||
assertThat(flow.getRevision()).isEqualTo(2);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,57 @@
|
||||
package io.kestra.cli.commands.migrations.metadata;
|
||||
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.tenant.TenantService;
|
||||
import io.kestra.core.utils.NamespaceUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class MetadataMigrationServiceTest<T extends MetadataMigrationService> {
|
||||
private static final String TENANT_ID = TestsUtils.randomTenant();
|
||||
|
||||
protected static final String SYSTEM_NAMESPACE = "my.system.namespace";
|
||||
|
||||
@Test
|
||||
void namespacesPerTenant() {
|
||||
Map<String, List<String>> expected = getNamespacesPerTenant();
|
||||
Map<String, List<String>> result = metadataMigrationService(
|
||||
expected
|
||||
).namespacesPerTenant();
|
||||
|
||||
assertThat(result).hasSize(expected.size());
|
||||
expected.forEach((tenantId, namespaces) -> {
|
||||
assertThat(result.get(tenantId)).containsExactlyInAnyOrderElementsOf(
|
||||
Stream.concat(
|
||||
Stream.of(SYSTEM_NAMESPACE),
|
||||
namespaces.stream()
|
||||
).map(NamespaceUtils::asTree).flatMap(Collection::stream).distinct().toList()
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
protected Map<String, List<String>> getNamespacesPerTenant() {
|
||||
return Map.of(TENANT_ID, List.of("my.first.namespace", "my.second.namespace", "another.namespace"));
|
||||
}
|
||||
|
||||
protected T metadataMigrationService(Map<String, List<String>> namespacesPerTenant) {
|
||||
FlowRepositoryInterface mockedFlowRepository = Mockito.mock(FlowRepositoryInterface.class);
|
||||
Mockito.doAnswer((params) -> namespacesPerTenant.get(params.getArgument(0).toString())).when(mockedFlowRepository).findDistinctNamespace(Mockito.anyString());
|
||||
NamespaceUtils namespaceUtils = Mockito.mock(NamespaceUtils.class);
|
||||
Mockito.when(namespaceUtils.getSystemFlowNamespace()).thenReturn(SYSTEM_NAMESPACE);
|
||||
//noinspection unchecked
|
||||
return ((T) new MetadataMigrationService(mockedFlowRepository, new TenantService() {
|
||||
@Override
|
||||
public String resolveTenant() {
|
||||
return TENANT_ID;
|
||||
}
|
||||
}, null, null, null, namespaceUtils));
|
||||
}
|
||||
}
|
||||
@@ -131,6 +131,36 @@ public class NsFilesMetadataMigrationCommandTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void namespaceWithoutNsFile() {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
ByteArrayOutputStream err = new ByteArrayOutputStream();
|
||||
System.setErr(new PrintStream(err));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
String tenantId = TenantService.MAIN_TENANT;
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
|
||||
// A flow is created from namespace 1, so the namespace files in this namespace should be migrated
|
||||
FlowRepositoryInterface flowRepository = ctx.getBean(FlowRepositoryInterface.class);
|
||||
flowRepository.create(GenericFlow.of(Flow.builder()
|
||||
.tenantId(tenantId)
|
||||
.id("a-flow")
|
||||
.namespace(namespace)
|
||||
.tasks(List.of(Log.builder().id("log").type(Log.class.getName()).message("logging").build()))
|
||||
.build()));
|
||||
|
||||
String[] nsFilesMetadataMigrationCommand = {
|
||||
"migrate", "metadata", "nsfiles"
|
||||
};
|
||||
PicocliRunner.call(App.class, ctx, nsFilesMetadataMigrationCommand);
|
||||
|
||||
assertThat(out.toString()).contains("✅ Namespace Files Metadata migration complete.");
|
||||
assertThat(err.toString()).doesNotContain("java.nio.file.NoSuchFileException");
|
||||
}
|
||||
}
|
||||
|
||||
private static void putOldNsFile(StorageInterface storage, String namespace, String path, String value) throws IOException {
|
||||
URI nsFileStorageUri = getNsFileStorageUri(namespace, path);
|
||||
storage.put(TenantService.MAIN_TENANT, namespace, nsFileStorageUri, new StorageObject(
|
||||
|
||||
@@ -55,11 +55,7 @@ class StateStoreMigrateCommandTest {
|
||||
);
|
||||
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isTrue();
|
||||
|
||||
RunContext runContext = ctx.getBean(RunContextFactory.class).of(flow, Map.of("flow", Map.of(
|
||||
"tenantId", tenantId,
|
||||
"id", flow.getId(),
|
||||
"namespace", flow.getNamespace()
|
||||
)));
|
||||
RunContext runContext = ctx.getBean(RunContextFactory.class).of(flow, Map.of());
|
||||
StateStore stateStore = new StateStore(runContext, true);
|
||||
Assertions.assertThrows(MigrationRequiredException.class, () -> stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value"));
|
||||
|
||||
|
||||
@@ -24,6 +24,9 @@ dependencies {
|
||||
// reactor
|
||||
api "io.projectreactor:reactor-core"
|
||||
|
||||
// awaitility
|
||||
api "org.awaitility:awaitility"
|
||||
|
||||
// micronaut
|
||||
api "io.micronaut.data:micronaut-data-model"
|
||||
implementation "io.micronaut:micronaut-http-server-netty"
|
||||
@@ -82,8 +85,8 @@ dependencies {
|
||||
testImplementation "io.micronaut:micronaut-http-server-netty"
|
||||
testImplementation "io.micronaut:micronaut-management"
|
||||
|
||||
testImplementation "org.testcontainers:testcontainers:1.21.3"
|
||||
testImplementation "org.testcontainers:junit-jupiter:1.21.3"
|
||||
testImplementation "org.testcontainers:testcontainers:1.21.4"
|
||||
testImplementation "org.testcontainers:junit-jupiter:1.21.4"
|
||||
testImplementation "org.bouncycastle:bcpkix-jdk18on"
|
||||
|
||||
testImplementation "org.wiremock:wiremock-jetty12"
|
||||
|
||||
@@ -15,6 +15,7 @@ import org.slf4j.LoggerFactory;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@@ -84,6 +85,11 @@ public abstract class KestraContext {
|
||||
|
||||
public abstract StorageInterface getStorageInterface();
|
||||
|
||||
/**
|
||||
* Returns the Micronaut active environments.
|
||||
*/
|
||||
public abstract Set<String> getEnvironments();
|
||||
|
||||
/**
|
||||
* Shutdowns the Kestra application.
|
||||
*/
|
||||
@@ -182,5 +188,10 @@ public abstract class KestraContext {
|
||||
// Lazy init of the PluginRegistry.
|
||||
return this.applicationContext.getBean(StorageInterface.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getEnvironments() {
|
||||
return this.applicationContext.getEnvironment().getActiveNames();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.core.docs;
|
||||
|
||||
import com.fasterxml.classmate.ResolvedType;
|
||||
import com.fasterxml.classmate.members.HierarchicType;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
@@ -22,6 +23,8 @@ import com.github.victools.jsonschema.module.swagger2.Swagger2Module;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.assets.CustomAsset;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.conditions.ScheduleCondition;
|
||||
import io.kestra.core.models.dashboards.DataFilter;
|
||||
@@ -42,13 +45,12 @@ import io.kestra.core.plugins.PluginRegistry;
|
||||
import io.kestra.core.plugins.RegisteredPlugin;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
import io.swagger.v3.oas.annotations.media.Content;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.lang.reflect.*;
|
||||
import java.time.*;
|
||||
@@ -64,7 +66,7 @@ import static io.kestra.core.serializers.JacksonMapper.MAP_TYPE_REFERENCE;
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class JsonSchemaGenerator {
|
||||
|
||||
|
||||
private static final List<Class<?>> TYPES_RESOLVED_AS_STRING = List.of(Duration.class, LocalTime.class, LocalDate.class, LocalDateTime.class, ZonedDateTime.class, OffsetDateTime.class, OffsetTime.class);
|
||||
private static final List<Class<?>> SUBTYPE_RESOLUTION_EXCLUSION_FOR_PLUGIN_SCHEMA = List.of(Task.class, AbstractTrigger.class);
|
||||
|
||||
@@ -277,10 +279,10 @@ public class JsonSchemaGenerator {
|
||||
.with(Option.DEFINITION_FOR_MAIN_SCHEMA)
|
||||
.with(Option.PLAIN_DEFINITION_KEYS)
|
||||
.with(Option.ALLOF_CLEANUP_AT_THE_END);
|
||||
|
||||
|
||||
// HACK: Registered a custom JsonUnwrappedDefinitionProvider prior to the JacksonModule
|
||||
// to be able to return an CustomDefinition with an empty node when the ResolvedType can't be found.
|
||||
builder.forTypesInGeneral().withCustomDefinitionProvider(new JsonUnwrappedDefinitionProvider(){
|
||||
builder.forTypesInGeneral().withCustomDefinitionProvider(new JsonUnwrappedDefinitionProvider() {
|
||||
@Override
|
||||
public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) {
|
||||
try {
|
||||
@@ -299,7 +301,9 @@ public class JsonSchemaGenerator {
|
||||
}
|
||||
|
||||
// default value
|
||||
builder.forFields().withDefaultResolver(this::defaults);
|
||||
builder.forFields()
|
||||
.withIgnoreCheck(fieldScope -> fieldScope.getAnnotation(Hidden.class) != null)
|
||||
.withDefaultResolver(this::defaults);
|
||||
|
||||
// def name
|
||||
builder.forTypesInGeneral()
|
||||
@@ -320,7 +324,7 @@ public class JsonSchemaGenerator {
|
||||
// inline some type
|
||||
builder.forTypesInGeneral()
|
||||
.withCustomDefinitionProvider(new CustomDefinitionProviderV2() {
|
||||
|
||||
|
||||
@Override
|
||||
public CustomDefinition provideCustomSchemaDefinition(ResolvedType javaType, SchemaGenerationContext context) {
|
||||
if (javaType.isInstanceOf(Map.class) || javaType.isInstanceOf(Enum.class)) {
|
||||
@@ -588,11 +592,31 @@ public class JsonSchemaGenerator {
|
||||
// The `const` property is used by editors for auto-completion based on that schema.
|
||||
builder.forTypesInGeneral().withTypeAttributeOverride((collectedTypeAttributes, scope, context) -> {
|
||||
final Class<?> pluginType = scope.getType().getErasedType();
|
||||
if (pluginType.getAnnotation(Plugin.class) != null) {
|
||||
Plugin pluginAnnotation = pluginType.getAnnotation(Plugin.class);
|
||||
if (pluginAnnotation != null) {
|
||||
ObjectNode properties = (ObjectNode) collectedTypeAttributes.get("properties");
|
||||
if (properties != null) {
|
||||
String typeConst = pluginType.getName();
|
||||
// This is needed so that assets can have arbitrary types while still being able to be identified as assets.
|
||||
if (pluginType == CustomAsset.class) {
|
||||
properties.set("type", context.getGeneratorConfig().createObjectNode()
|
||||
.put("type", "string")
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (Asset.class.isAssignableFrom(pluginType)) {
|
||||
// For Asset types, we want to be able to use a simple-string type. Convention is that first alias is that string type.
|
||||
typeConst = pluginAnnotation.aliases().length > 0 ? pluginAnnotation.aliases()[0] : pluginType.getName();
|
||||
Arrays.stream(pluginType.getDeclaredMethods())
|
||||
.filter(m -> m.isAnnotationPresent(JsonProperty.class))
|
||||
.forEach(m -> properties.set(m.getAnnotation(JsonProperty.class).value(), context.getGeneratorConfig().createObjectNode()
|
||||
.put("type", "string")
|
||||
));
|
||||
}
|
||||
|
||||
properties.set("type", context.getGeneratorConfig().createObjectNode()
|
||||
.put("const", pluginType.getName())
|
||||
.put("const", typeConst)
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -763,6 +787,14 @@ public class JsonSchemaGenerator {
|
||||
consumer.accept(typeContext.resolve(clz));
|
||||
}
|
||||
}).toList();
|
||||
} else if (declaredType.getErasedType() == Asset.class) {
|
||||
return getRegisteredPlugins()
|
||||
.stream()
|
||||
.flatMap(registeredPlugin -> registeredPlugin.getAssets().stream())
|
||||
.filter(p -> allowedPluginTypes.isEmpty() || allowedPluginTypes.contains(p.getName()))
|
||||
.filter(Predicate.not(io.kestra.core.models.Plugin::isInternal))
|
||||
.map(typeContext::resolve)
|
||||
.toList();
|
||||
}
|
||||
|
||||
return null;
|
||||
@@ -809,9 +841,9 @@ public class JsonSchemaGenerator {
|
||||
// we don't return base properties unless specified with @PluginProperty and hidden is false
|
||||
builder
|
||||
.forFields()
|
||||
.withIgnoreCheck(fieldScope -> base != null &&
|
||||
.withIgnoreCheck(fieldScope -> (base != null &&
|
||||
(fieldScope.getAnnotation(PluginProperty.class) == null || fieldScope.getAnnotation(PluginProperty.class).hidden()) &&
|
||||
fieldScope.getDeclaringType().getTypeName().equals(base.getName())
|
||||
fieldScope.getDeclaringType().getTypeName().equals(base.getName())) || fieldScope.getAnnotation(Hidden.class) != null
|
||||
);
|
||||
|
||||
SchemaGeneratorConfig schemaGeneratorConfig = builder.build();
|
||||
|
||||
@@ -3,6 +3,7 @@ package io.kestra.core.docs;
|
||||
import io.kestra.core.models.annotations.PluginSubGroup;
|
||||
import io.kestra.core.plugins.RegisteredPlugin;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@@ -117,10 +118,17 @@ public class Plugin {
|
||||
.filter(not(io.kestra.core.models.Plugin::isInternal))
|
||||
.filter(clazzFilter)
|
||||
.filter(c -> !c.getName().startsWith("org.kestra."))
|
||||
.map(c -> new PluginElementMetadata(c.getName(), io.kestra.core.models.Plugin.isDeprecated(c) ? true : null))
|
||||
.map(c -> {
|
||||
Schema schema = c.getAnnotation(Schema.class);
|
||||
|
||||
var title = Optional.ofNullable(schema).map(Schema::title).filter(t -> !t.isEmpty()).orElse(null);
|
||||
var description = Optional.ofNullable(schema).map(Schema::description).filter(d -> !d.isEmpty()).orElse(null);
|
||||
var deprecated = io.kestra.core.models.Plugin.isDeprecated(c) ? true : null;
|
||||
|
||||
return new PluginElementMetadata(c.getName(), deprecated, title, description);
|
||||
})
|
||||
.toList();
|
||||
}
|
||||
|
||||
public record PluginElementMetadata(String cls, Boolean deprecated) {
|
||||
}
|
||||
public record PluginElementMetadata(String cls, Boolean deprecated, String title, String description) {}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,37 @@
|
||||
package io.kestra.core.exceptions;
|
||||
|
||||
import io.kestra.core.models.flows.Data;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.flows.Output;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Exception that can be thrown when Inputs/Outputs have validation problems.
|
||||
*/
|
||||
public class InputOutputValidationException extends KestraRuntimeException {
|
||||
public InputOutputValidationException(String message) {
|
||||
super(message);
|
||||
}
|
||||
public static InputOutputValidationException of( String message, Input<?> input){
|
||||
String inputMessage = "Invalid value for input" + " `" + input.getId() + "`. Cause: " + message;
|
||||
return new InputOutputValidationException(inputMessage);
|
||||
}
|
||||
public static InputOutputValidationException of( String message, Output output){
|
||||
String outputMessage = "Invalid value for output" + " `" + output.getId() + "`. Cause: " + message;
|
||||
return new InputOutputValidationException(outputMessage);
|
||||
}
|
||||
public static InputOutputValidationException of(String message){
|
||||
return new InputOutputValidationException(message);
|
||||
}
|
||||
|
||||
public static InputOutputValidationException merge(Set<InputOutputValidationException> exceptions){
|
||||
String combinedMessage = exceptions.stream()
|
||||
.map(InputOutputValidationException::getMessage)
|
||||
.collect(Collectors.joining(System.lineSeparator()));
|
||||
throw new InputOutputValidationException(combinedMessage);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,6 +1,8 @@
|
||||
package io.kestra.core.exceptions;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* The top-level {@link KestraRuntimeException} for non-recoverable errors.
|
||||
|
||||
@@ -7,7 +7,6 @@ import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.List;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.zip.ZipEntry;
|
||||
import java.util.zip.ZipInputStream;
|
||||
@@ -65,7 +64,7 @@ public interface HasSource {
|
||||
|
||||
if (isYAML(fileName)) {
|
||||
byte[] bytes = inputStream.readAllBytes();
|
||||
List<String> sources = List.of(new String(bytes).split("---"));
|
||||
List<String> sources = List.of(new String(bytes).split("(?m)^---\\s*$"));
|
||||
for (int i = 0; i < sources.size(); i++) {
|
||||
String source = sources.get(i);
|
||||
reader.accept(source, String.valueOf(i));
|
||||
|
||||
@@ -4,13 +4,16 @@ import io.kestra.core.utils.MapUtils;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Schema(description = "A key/value pair that can be attached to a Flow or Execution. Labels are often used to organize and categorize objects.")
|
||||
public record Label(@NotEmpty String key, @NotEmpty String value) {
|
||||
public record Label(
|
||||
@NotEmpty @Pattern(regexp = "^[\\p{Ll}][\\p{L}0-9._-]*$", message = "Invalid label key. A valid key contains only lowercase letters numbers hyphens (-) underscores (_) or periods (.) and must begin with a lowercase letter.") String key,
|
||||
@NotEmpty String value) {
|
||||
public static final String SYSTEM_PREFIX = "system.";
|
||||
|
||||
// system labels
|
||||
@@ -23,6 +26,7 @@ public record Label(@NotEmpty String key, @NotEmpty String value) {
|
||||
public static final String REPLAYED = SYSTEM_PREFIX + "replayed";
|
||||
public static final String SIMULATED_EXECUTION = SYSTEM_PREFIX + "simulatedExecution";
|
||||
public static final String TEST = SYSTEM_PREFIX + "test";
|
||||
public static final String FROM = SYSTEM_PREFIX + "from";
|
||||
|
||||
/**
|
||||
* Static helper method for converting a list of labels to a nested map.
|
||||
|
||||
@@ -94,7 +94,7 @@ public record QueryFilter(
|
||||
KIND("kind") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS,Op.NOT_EQUALS);
|
||||
return List.of(Op.EQUALS,Op.NOT_EQUALS, Op.IN, Op.NOT_IN);
|
||||
}
|
||||
},
|
||||
LABELS("labels") {
|
||||
@@ -103,12 +103,48 @@ public record QueryFilter(
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.IN, Op.NOT_IN, Op.CONTAINS);
|
||||
}
|
||||
},
|
||||
METADATA("metadata") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.IN, Op.NOT_IN, Op.CONTAINS);
|
||||
}
|
||||
},
|
||||
FLOW_ID("flowId") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX, Op.IN, Op.NOT_IN, Op.PREFIX);
|
||||
}
|
||||
},
|
||||
FLOW_REVISION("flowRevision") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.IN, Op.NOT_IN);
|
||||
}
|
||||
},
|
||||
ID("id") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
|
||||
}
|
||||
},
|
||||
ASSET_ID("assetId") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
|
||||
}
|
||||
},
|
||||
TYPE("type") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
|
||||
}
|
||||
},
|
||||
CREATED("created") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.GREATER_THAN_OR_EQUAL_TO, Op.GREATER_THAN, Op.LESS_THAN_OR_EQUAL_TO, Op.LESS_THAN, Op.EQUALS, Op.NOT_EQUALS);
|
||||
}
|
||||
},
|
||||
UPDATED("updated") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
@@ -151,12 +187,30 @@ public record QueryFilter(
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
|
||||
}
|
||||
},
|
||||
TRIGGER_STATE("triggerState"){
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS);
|
||||
}
|
||||
},
|
||||
EXECUTION_ID("executionId") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
|
||||
}
|
||||
},
|
||||
TASK_ID("taskId") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
|
||||
}
|
||||
},
|
||||
TASK_RUN_ID("taskRunId") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
|
||||
}
|
||||
},
|
||||
CHILD_FILTER("childFilter") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
@@ -226,7 +280,7 @@ public record QueryFilter(
|
||||
FLOW {
|
||||
@Override
|
||||
public List<Field> supportedField() {
|
||||
return List.of(Field.LABELS, Field.NAMESPACE, Field.QUERY, Field.SCOPE);
|
||||
return List.of(Field.LABELS, Field.NAMESPACE, Field.QUERY, Field.SCOPE, Field.FLOW_ID);
|
||||
}
|
||||
},
|
||||
NAMESPACE {
|
||||
@@ -241,7 +295,7 @@ public record QueryFilter(
|
||||
return List.of(
|
||||
Field.QUERY, Field.SCOPE, Field.FLOW_ID, Field.START_DATE, Field.END_DATE,
|
||||
Field.STATE, Field.LABELS, Field.TRIGGER_EXECUTION_ID, Field.CHILD_FILTER,
|
||||
Field.NAMESPACE,Field.KIND
|
||||
Field.NAMESPACE, Field.KIND
|
||||
);
|
||||
}
|
||||
},
|
||||
@@ -271,7 +325,7 @@ public record QueryFilter(
|
||||
@Override
|
||||
public List<Field> supportedField() {
|
||||
return List.of(Field.QUERY, Field.SCOPE, Field.NAMESPACE, Field.WORKER_ID, Field.FLOW_ID,
|
||||
Field.START_DATE, Field.END_DATE, Field.TRIGGER_ID
|
||||
Field.START_DATE, Field.END_DATE, Field.TRIGGER_ID, Field.TRIGGER_STATE
|
||||
);
|
||||
}
|
||||
},
|
||||
@@ -306,6 +360,34 @@ public record QueryFilter(
|
||||
Field.UPDATED
|
||||
);
|
||||
}
|
||||
},
|
||||
ASSET {
|
||||
@Override
|
||||
public List<Field> supportedField() {
|
||||
return List.of(
|
||||
Field.QUERY,
|
||||
Field.ID,
|
||||
Field.TYPE,
|
||||
Field.NAMESPACE,
|
||||
Field.METADATA,
|
||||
Field.UPDATED
|
||||
);
|
||||
}
|
||||
},
|
||||
ASSET_USAGE {
|
||||
@Override
|
||||
public List<Field> supportedField() {
|
||||
return List.of(
|
||||
Field.ASSET_ID,
|
||||
Field.NAMESPACE,
|
||||
Field.FLOW_ID,
|
||||
Field.FLOW_REVISION,
|
||||
Field.EXECUTION_ID,
|
||||
Field.TASK_ID,
|
||||
Field.TASK_RUN_ID,
|
||||
Field.CREATED
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
public abstract List<Field> supportedField();
|
||||
|
||||
@@ -16,6 +16,7 @@ import jakarta.validation.constraints.NotNull;
|
||||
public class Setting {
|
||||
public static final String INSTANCE_UUID = "instance.uuid";
|
||||
public static final String INSTANCE_VERSION = "instance.version";
|
||||
public static final String INSTANCE_EDITION = "instance.edition";
|
||||
|
||||
@NotNull
|
||||
private String key;
|
||||
|
||||
111
core/src/main/java/io/kestra/core/models/assets/Asset.java
Normal file
111
core/src/main/java/io/kestra/core/models/assets/Asset.java
Normal file
@@ -0,0 +1,111 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonAnySetter;
|
||||
import io.kestra.core.models.DeletedInterface;
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotBlank;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
import jakarta.validation.constraints.Size;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.*;
|
||||
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
public abstract class Asset implements HasUID, DeletedInterface, Plugin {
|
||||
@Hidden
|
||||
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
|
||||
protected String tenantId;
|
||||
|
||||
@Pattern(regexp = "^[a-z0-9][a-z0-9._-]*")
|
||||
@Size(min = 1, max = 150)
|
||||
protected String namespace;
|
||||
|
||||
@NotBlank
|
||||
@Pattern(regexp = "^[a-zA-Z0-9][a-zA-Z0-9._-]*")
|
||||
@Size(min = 1, max = 150)
|
||||
protected String id;
|
||||
|
||||
@NotBlank
|
||||
protected String type;
|
||||
|
||||
protected String displayName;
|
||||
|
||||
protected String description;
|
||||
|
||||
protected Map<String, Object> metadata;
|
||||
|
||||
@Nullable
|
||||
@Hidden
|
||||
private Instant created;
|
||||
|
||||
@Nullable
|
||||
@Hidden
|
||||
private Instant updated;
|
||||
|
||||
@Hidden
|
||||
private boolean deleted;
|
||||
|
||||
public Asset(
|
||||
String tenantId,
|
||||
String namespace,
|
||||
String id,
|
||||
String type,
|
||||
String displayName,
|
||||
String description,
|
||||
Map<String, Object> metadata,
|
||||
Instant created,
|
||||
Instant updated,
|
||||
boolean deleted
|
||||
) {
|
||||
this.tenantId = tenantId;
|
||||
this.namespace = namespace;
|
||||
this.id = id;
|
||||
this.type = type;
|
||||
this.displayName = displayName;
|
||||
this.description = description;
|
||||
this.metadata = Optional.ofNullable(metadata).map(HashMap::new).orElse(new HashMap<>());
|
||||
Instant now = Instant.now();
|
||||
this.created = Optional.ofNullable(created).orElse(now);
|
||||
this.updated = Optional.ofNullable(updated).orElse(now);
|
||||
this.deleted = deleted;
|
||||
}
|
||||
|
||||
public <T extends Asset> T toUpdated() {
|
||||
if (this.created == null) {
|
||||
this.created = Instant.now();
|
||||
}
|
||||
this.updated = Instant.now();
|
||||
return (T) this;
|
||||
}
|
||||
|
||||
public Asset toDeleted() {
|
||||
this.deleted = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
@JsonAnySetter
|
||||
public void setMetadata(String name, Object value) {
|
||||
metadata.put(name, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String uid() {
|
||||
return Asset.uid(tenantId, id);
|
||||
}
|
||||
|
||||
public static String uid(String tenantId, String id) {
|
||||
return IdUtils.fromParts(tenantId, id);
|
||||
}
|
||||
|
||||
public Asset withTenantId(String tenantId) {
|
||||
this.tenantId = tenantId;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
|
||||
public record AssetIdentifier(@Hidden String tenantId, @Hidden String namespace, String id){
|
||||
|
||||
public AssetIdentifier withTenantId(String tenantId) {
|
||||
return new AssetIdentifier(tenantId, this.namespace, this.id);
|
||||
}
|
||||
|
||||
public String uid() {
|
||||
return IdUtils.fromParts(tenantId, id);
|
||||
}
|
||||
|
||||
public static AssetIdentifier of(Asset asset) {
|
||||
return new AssetIdentifier(asset.getTenantId(), asset.getNamespace(), asset.getId());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.flows.FlowId;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
|
||||
/**
|
||||
* Represents an entity that used an asset
|
||||
*/
|
||||
public record AssetUser(String tenantId, String namespace, String flowId, Integer flowRevision, String executionId, String taskId, String taskRunId) implements HasUID {
|
||||
public String uid() {
|
||||
return IdUtils.fromParts(tenantId, namespace, flowId, String.valueOf(flowRevision), executionId, taskRunId);
|
||||
}
|
||||
|
||||
public FlowId toFlowId() {
|
||||
return FlowId.of(tenantId, namespace, flowId, flowRevision);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
@Getter
|
||||
public class AssetsDeclaration extends AssetsInOut {
|
||||
private boolean enableAuto;
|
||||
|
||||
@JsonCreator
|
||||
public AssetsDeclaration(Boolean enableAuto, List<AssetIdentifier> inputs, List<Asset> outputs) {
|
||||
super(inputs, outputs);
|
||||
|
||||
this.enableAuto = Optional.ofNullable(enableAuto).orElse(false);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
@Getter
|
||||
public class AssetsInOut {
|
||||
private List<AssetIdentifier> inputs;
|
||||
|
||||
private List<Asset> outputs;
|
||||
|
||||
@JsonCreator
|
||||
public AssetsInOut(List<AssetIdentifier> inputs, List<Asset> outputs) {
|
||||
this.inputs = Optional.ofNullable(inputs).orElse(Collections.emptyList());
|
||||
this.outputs = Optional.ofNullable(outputs).orElse(Collections.emptyList());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import lombok.Builder;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
|
||||
@NoArgsConstructor
|
||||
@Plugin
|
||||
public class CustomAsset extends Asset {
|
||||
@Builder
|
||||
@JsonCreator
|
||||
public CustomAsset(
|
||||
String tenantId,
|
||||
String namespace,
|
||||
String id,
|
||||
String type,
|
||||
String displayName,
|
||||
String description,
|
||||
Map<String, Object> metadata,
|
||||
Instant created,
|
||||
Instant updated,
|
||||
boolean deleted
|
||||
) {
|
||||
super(tenantId, namespace, id, type, displayName, description, metadata, created, updated, deleted);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import lombok.Builder;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
@NoArgsConstructor
|
||||
@Plugin(aliases = DatasetAsset.ASSET_TYPE)
|
||||
public class DatasetAsset extends Asset {
|
||||
public static final String ASSET_TYPE = "DATASET";
|
||||
|
||||
@Builder
|
||||
@JsonCreator
|
||||
public DatasetAsset(
|
||||
String tenantId,
|
||||
String namespace,
|
||||
String id,
|
||||
String displayName,
|
||||
String description,
|
||||
String system,
|
||||
String location,
|
||||
String format,
|
||||
Map<String, Object> metadata,
|
||||
Instant created,
|
||||
Instant updated,
|
||||
boolean deleted
|
||||
) {
|
||||
super(tenantId, namespace, id, ASSET_TYPE, displayName, description, metadata, created, updated, deleted);
|
||||
|
||||
this.setSystem(system);
|
||||
this.setLocation(location);
|
||||
this.setFormat(format);
|
||||
}
|
||||
|
||||
@JsonProperty("system")
|
||||
public String getSystem() {
|
||||
return Optional.ofNullable(metadata.get("system")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
@JsonProperty("location")
|
||||
public String getLocation() {
|
||||
return Optional.ofNullable(metadata.get("location")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
@JsonProperty("format")
|
||||
public String getFormat() {
|
||||
return Optional.ofNullable(metadata.get("format")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
public void setSystem(String system) {
|
||||
if (system != null) {
|
||||
metadata.put("system", system);
|
||||
}
|
||||
}
|
||||
|
||||
public void setLocation(String location) {
|
||||
if (location != null) {
|
||||
metadata.put("location", location);
|
||||
}
|
||||
}
|
||||
|
||||
public void setFormat(String format) {
|
||||
if (format != null) {
|
||||
metadata.put("format", format);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import lombok.Builder;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
|
||||
@NoArgsConstructor
|
||||
@Plugin(aliases = ExternalAsset.ASSET_TYPE)
|
||||
public class ExternalAsset extends Asset {
|
||||
public static final String ASSET_TYPE = "EXTERNAL";
|
||||
|
||||
@Builder
|
||||
@JsonCreator
|
||||
public ExternalAsset(
|
||||
String tenantId,
|
||||
String namespace,
|
||||
String id,
|
||||
String displayName,
|
||||
String description,
|
||||
Map<String, Object> metadata,
|
||||
Instant created,
|
||||
Instant updated,
|
||||
boolean deleted
|
||||
) {
|
||||
super(tenantId, namespace, id, ASSET_TYPE, displayName, description, metadata, created, updated, deleted);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,60 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import lombok.Builder;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
@NoArgsConstructor
|
||||
@Plugin(aliases = FileAsset.ASSET_TYPE)
|
||||
public class FileAsset extends Asset {
|
||||
public static final String ASSET_TYPE = "FILE";
|
||||
|
||||
@Builder
|
||||
@JsonCreator
|
||||
public FileAsset(
|
||||
String tenantId,
|
||||
String namespace,
|
||||
String id,
|
||||
String displayName,
|
||||
String description,
|
||||
String system,
|
||||
String path,
|
||||
Map<String, Object> metadata,
|
||||
Instant created,
|
||||
Instant updated,
|
||||
boolean deleted
|
||||
) {
|
||||
super(tenantId, namespace, id, ASSET_TYPE, displayName, description, metadata, created, updated, deleted);
|
||||
|
||||
this.setSystem(system);
|
||||
this.setPath(path);
|
||||
}
|
||||
|
||||
@JsonProperty("system")
|
||||
public String getSystem() {
|
||||
return Optional.ofNullable(metadata.get("system")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
@JsonProperty("path")
|
||||
public String getPath() {
|
||||
return Optional.ofNullable(metadata.get("path")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
public void setSystem(String system) {
|
||||
if (system != null) {
|
||||
metadata.put("system", system);
|
||||
}
|
||||
}
|
||||
|
||||
public void setPath(String path) {
|
||||
if (path != null) {
|
||||
metadata.put("path", path);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,86 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import lombok.Builder;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
@NoArgsConstructor
|
||||
@Plugin(aliases = TableAsset.ASSET_TYPE)
|
||||
public class TableAsset extends Asset {
|
||||
public static final String ASSET_TYPE = "TABLE";
|
||||
|
||||
@Builder
|
||||
@JsonCreator
|
||||
public TableAsset(
|
||||
String tenantId,
|
||||
String namespace,
|
||||
String id,
|
||||
String displayName,
|
||||
String description,
|
||||
String system,
|
||||
String database,
|
||||
String schema,
|
||||
String name,
|
||||
Map<String, Object> metadata,
|
||||
Instant created,
|
||||
Instant updated,
|
||||
boolean deleted
|
||||
) {
|
||||
super(tenantId, namespace, id, ASSET_TYPE, displayName, description, metadata, created, updated, deleted);
|
||||
|
||||
this.setSystem(system);
|
||||
this.setDatabase(database);
|
||||
this.setSchema(schema);
|
||||
this.setName(name);
|
||||
}
|
||||
|
||||
@JsonProperty("system")
|
||||
public String getSystem() {
|
||||
return Optional.ofNullable(metadata.get("system")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
@JsonProperty("database")
|
||||
public String getDatabase() {
|
||||
return Optional.ofNullable(metadata.get("database")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
@JsonProperty("schema")
|
||||
public String getSchema() {
|
||||
return Optional.ofNullable(metadata.get("schema")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
@JsonProperty("name")
|
||||
public String getName() {
|
||||
return Optional.ofNullable(metadata.get("name")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
public void setSystem(String system) {
|
||||
if (system != null) {
|
||||
metadata.put("system", system);
|
||||
}
|
||||
}
|
||||
|
||||
public void setDatabase(String database) {
|
||||
if (database != null) {
|
||||
metadata.put("database", database);
|
||||
}
|
||||
}
|
||||
|
||||
public void setSchema(String schema) {
|
||||
if (schema != null) {
|
||||
metadata.put("schema", schema);
|
||||
}
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
if (name != null) {
|
||||
metadata.put("name", name);
|
||||
}
|
||||
}
|
||||
}
|
||||
73
core/src/main/java/io/kestra/core/models/assets/VmAsset.java
Normal file
73
core/src/main/java/io/kestra/core/models/assets/VmAsset.java
Normal file
@@ -0,0 +1,73 @@
|
||||
package io.kestra.core.models.assets;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import lombok.Builder;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
@NoArgsConstructor
|
||||
@Plugin(aliases = VmAsset.ASSET_TYPE)
|
||||
public class VmAsset extends Asset {
|
||||
public static final String ASSET_TYPE = "VM";
|
||||
|
||||
@Builder
|
||||
@JsonCreator
|
||||
public VmAsset(
|
||||
String tenantId,
|
||||
String namespace,
|
||||
String id,
|
||||
String displayName,
|
||||
String description,
|
||||
String provider,
|
||||
String region,
|
||||
String state,
|
||||
Map<String, Object> metadata,
|
||||
Instant created,
|
||||
Instant updated,
|
||||
boolean deleted
|
||||
) {
|
||||
super(tenantId, namespace, id, ASSET_TYPE, displayName, description, metadata, created, updated, deleted);
|
||||
|
||||
this.setProvider(provider);
|
||||
this.setRegion(region);
|
||||
this.setState(state);
|
||||
}
|
||||
|
||||
@JsonProperty("provider")
|
||||
public String getProvider() {
|
||||
return Optional.ofNullable(metadata.get("provider")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
@JsonProperty("region")
|
||||
public String getRegion() {
|
||||
return Optional.ofNullable(metadata.get("region")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
@JsonProperty("state")
|
||||
public String getState() {
|
||||
return Optional.ofNullable(metadata.get("state")).map(Object::toString).orElse(null);
|
||||
}
|
||||
|
||||
public void setProvider(String provider) {
|
||||
if (provider != null) {
|
||||
metadata.put("provider", provider);
|
||||
}
|
||||
}
|
||||
|
||||
public void setRegion(String region) {
|
||||
if (region != null) {
|
||||
metadata.put("region", region);
|
||||
}
|
||||
}
|
||||
|
||||
public void setState(String state) {
|
||||
if (state != null) {
|
||||
metadata.put("state", state);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2,10 +2,9 @@ package io.kestra.core.models.executions;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import io.kestra.core.models.TenantInterface;
|
||||
import io.kestra.core.models.assets.AssetsInOut;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.tasks.FlowableTask;
|
||||
import io.kestra.core.models.tasks.ResolvedTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.tasks.retrys.AbstractRetry;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
@@ -59,6 +58,10 @@ public class TaskRun implements TenantInterface {
|
||||
@Schema(implementation = Object.class)
|
||||
Variables outputs;
|
||||
|
||||
@With
|
||||
@Nullable
|
||||
AssetsInOut assets;
|
||||
|
||||
@NotNull
|
||||
State state;
|
||||
|
||||
@@ -89,14 +92,23 @@ public class TaskRun implements TenantInterface {
|
||||
this.value,
|
||||
this.attempts,
|
||||
this.outputs,
|
||||
this.assets,
|
||||
this.state.withState(state),
|
||||
this.iteration,
|
||||
this.dynamic,
|
||||
this.forceExecution
|
||||
);
|
||||
}
|
||||
public TaskRun withStateAndAttempt(State.Type state) {
|
||||
List<TaskRunAttempt> newAttempts = new ArrayList<>(this.attempts != null ? this.attempts : List.of());
|
||||
|
||||
if (newAttempts.isEmpty()) {
|
||||
newAttempts.add(TaskRunAttempt.builder().state(new State(state)).build());
|
||||
} else {
|
||||
TaskRunAttempt updatedLast = newAttempts.getLast().withState(state);
|
||||
newAttempts.set(newAttempts.size() - 1, updatedLast);
|
||||
}
|
||||
|
||||
public TaskRun replaceState(State newState) {
|
||||
return new TaskRun(
|
||||
this.tenantId,
|
||||
this.id,
|
||||
@@ -106,9 +118,10 @@ public class TaskRun implements TenantInterface {
|
||||
this.taskId,
|
||||
this.parentTaskRunId,
|
||||
this.value,
|
||||
this.attempts,
|
||||
newAttempts,
|
||||
this.outputs,
|
||||
newState,
|
||||
this.assets,
|
||||
this.state.withState(state),
|
||||
this.iteration,
|
||||
this.dynamic,
|
||||
this.forceExecution
|
||||
@@ -131,6 +144,7 @@ public class TaskRun implements TenantInterface {
|
||||
this.value,
|
||||
newAttempts,
|
||||
this.outputs,
|
||||
this.assets,
|
||||
this.state.withState(State.Type.FAILED),
|
||||
this.iteration,
|
||||
this.dynamic,
|
||||
@@ -150,6 +164,7 @@ public class TaskRun implements TenantInterface {
|
||||
.value(this.getValue())
|
||||
.attempts(this.getAttempts())
|
||||
.outputs(this.getOutputs())
|
||||
.assets(this.getAssets())
|
||||
.state(state == null ? this.getState() : state)
|
||||
.iteration(this.getIteration())
|
||||
.build();
|
||||
@@ -236,6 +251,7 @@ public class TaskRun implements TenantInterface {
|
||||
", parentTaskRunId=" + this.getParentTaskRunId() +
|
||||
", state=" + this.getState().getCurrent().toString() +
|
||||
", outputs=" + this.getOutputs() +
|
||||
", assets=" + this.getAssets() +
|
||||
", attempts=" + this.getAttempts() +
|
||||
")";
|
||||
}
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
package io.kestra.core.models.flows;
|
||||
|
||||
import io.kestra.core.models.validations.ManualConstraintViolation;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
|
||||
/**
|
||||
* Interface for defining an identifiable and typed data.
|
||||
@@ -29,16 +27,4 @@ public interface Data {
|
||||
*/
|
||||
String getDisplayName();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
default ConstraintViolationException toConstraintViolationException(String message, Object value) {
|
||||
Class<Data> cls = (Class<Data>) this.getClass();
|
||||
|
||||
return ManualConstraintViolation.toConstraintViolationException(
|
||||
"Invalid " + (this instanceof Output ? "output" : "input") + " for `" + getId() + "`, " + message + ", but received `" + value + "`",
|
||||
this,
|
||||
cls,
|
||||
this.getId(),
|
||||
value
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package io.kestra.core.models.flows;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
@@ -130,7 +129,7 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
@Valid
|
||||
@PluginProperty
|
||||
List<SLA> sla;
|
||||
|
||||
|
||||
@Schema(
|
||||
title = "Conditions evaluated before the flow is executed.",
|
||||
description = "A list of conditions that are evaluated before the flow is executed. If no checks are defined, the flow executes normally."
|
||||
@@ -355,7 +354,7 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
* To be conservative a flow MUST not return any source.
|
||||
*/
|
||||
@Override
|
||||
@JsonIgnore
|
||||
@Schema(hidden = true)
|
||||
public String getSource() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -1,14 +1,12 @@
|
||||
package io.kestra.core.models.flows;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@@ -48,7 +46,7 @@ public class FlowWithSource extends Flow {
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonIgnore(value = false)
|
||||
@Schema(hidden = false)
|
||||
public String getSource() {
|
||||
return this.source;
|
||||
}
|
||||
|
||||
@@ -267,6 +267,10 @@ public class State {
|
||||
return this == Type.RUNNING || this == Type.KILLING;
|
||||
}
|
||||
|
||||
public boolean onlyRunning() {
|
||||
return this == Type.RUNNING;
|
||||
}
|
||||
|
||||
public boolean isFailed() {
|
||||
return this == Type.FAILED;
|
||||
}
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
package io.kestra.core.models.flows.input;
|
||||
|
||||
import io.kestra.core.exceptions.InputOutputValidationException;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Represents an input along with its associated value and validation state.
|
||||
*
|
||||
@@ -12,15 +14,15 @@ import jakarta.validation.constraints.NotNull;
|
||||
* @param value The provided value for the input.
|
||||
* @param enabled {@code true} if the input is enabled; {@code false} otherwise.
|
||||
* @param isDefault {@code true} if the provided value is the default; {@code false} otherwise.
|
||||
* @param exception The validation exception, if the input value is invalid; {@code null} otherwise.
|
||||
* @param exceptions The validation exceptions, if the input value is invalid; {@code null} otherwise.
|
||||
*/
|
||||
public record InputAndValue(
|
||||
Input<?> input,
|
||||
Object value,
|
||||
boolean enabled,
|
||||
boolean isDefault,
|
||||
ConstraintViolationException exception) {
|
||||
|
||||
Set<InputOutputValidationException> exceptions) {
|
||||
|
||||
/**
|
||||
* Creates a new {@link InputAndValue} instance.
|
||||
*
|
||||
|
||||
@@ -7,6 +7,7 @@ import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.validations.ManualConstraintViolation;
|
||||
import io.kestra.core.validations.Regex;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.ConstraintViolation;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.Builder;
|
||||
@@ -14,10 +15,7 @@ import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.*;
|
||||
import java.util.function.Function;
|
||||
|
||||
@SuperBuilder
|
||||
@@ -77,30 +75,35 @@ public class MultiselectInput extends Input<List<String>> implements ItemTypeInt
|
||||
|
||||
@Override
|
||||
public void validate(List<String> inputs) throws ConstraintViolationException {
|
||||
Set<ConstraintViolation<?>> violations = new HashSet<>();
|
||||
|
||||
if (values != null && options != null) {
|
||||
throw ManualConstraintViolation.toConstraintViolationException(
|
||||
violations.add( ManualConstraintViolation.of(
|
||||
"you can't define both `values` and `options`",
|
||||
this,
|
||||
MultiselectInput.class,
|
||||
getId(),
|
||||
""
|
||||
);
|
||||
));
|
||||
}
|
||||
|
||||
if (!this.getAllowCustomValue()) {
|
||||
for (String input : inputs) {
|
||||
List<@Regex String> finalValues = this.values != null ? this.values : this.options;
|
||||
if (!finalValues.contains(input)) {
|
||||
throw ManualConstraintViolation.toConstraintViolationException(
|
||||
"it must match the values `" + finalValues + "`",
|
||||
violations.add(ManualConstraintViolation.of(
|
||||
"value `" + input + "` doesn't match the values `" + finalValues + "`",
|
||||
this,
|
||||
MultiselectInput.class,
|
||||
getId(),
|
||||
input
|
||||
);
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!violations.isEmpty()) {
|
||||
throw ManualConstraintViolation.toConstraintViolationException(violations);
|
||||
}
|
||||
}
|
||||
|
||||
/** {@inheritDoc} **/
|
||||
@@ -145,7 +148,7 @@ public class MultiselectInput extends Input<List<String>> implements ItemTypeInt
|
||||
|
||||
String type = Optional.ofNullable(result).map(Object::getClass).map(Class::getSimpleName).orElse("<null>");
|
||||
throw ManualConstraintViolation.toConstraintViolationException(
|
||||
"Invalid expression result. Expected a list of strings, but received " + type,
|
||||
"Invalid expression result. Expected a list of strings",
|
||||
this,
|
||||
MultiselectInput.class,
|
||||
getId(),
|
||||
|
||||
@@ -125,7 +125,7 @@ public class SelectInput extends Input<String> implements RenderableInput {
|
||||
|
||||
String type = Optional.ofNullable(result).map(Object::getClass).map(Class::getSimpleName).orElse("<null>");
|
||||
throw ManualConstraintViolation.toConstraintViolationException(
|
||||
"Invalid expression result. Expected a list of strings, but received " + type,
|
||||
"Invalid expression result. Expected a list of strings",
|
||||
this,
|
||||
SelectInput.class,
|
||||
getId(),
|
||||
|
||||
@@ -11,6 +11,7 @@ import com.fasterxml.jackson.databind.ser.std.StdSerializer;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.RunContextProperty;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
@@ -93,7 +94,7 @@ public class Property<T> {
|
||||
* @return a new {@link Property} without a pre-rendered value
|
||||
*/
|
||||
public Property<T> skipCache() {
|
||||
return Property.ofExpression(expression);
|
||||
return new Property<>(expression, true);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -156,9 +157,9 @@ public class Property<T> {
|
||||
/**
|
||||
* Render a property, then convert it to its target type.<br>
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#as(Class)
|
||||
* @see RunContextProperty#as(Class)
|
||||
*/
|
||||
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||
return as(property, context, clazz, Map.of());
|
||||
@@ -167,25 +168,57 @@ public class Property<T> {
|
||||
/**
|
||||
* Render a property with additional variables, then convert it to its target type.<br>
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#as(Class, Map)
|
||||
* @see RunContextProperty#as(Class, Map)
|
||||
*/
|
||||
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
if (property.skipCache || property.value == null) {
|
||||
String rendered = context.render(property.expression, variables);
|
||||
property.value = MAPPER.convertValue(rendered, clazz);
|
||||
property.value = deserialize(rendered, clazz);
|
||||
}
|
||||
|
||||
return property.value;
|
||||
}
|
||||
|
||||
private static <T> T deserialize(Object rendered, Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||
try {
|
||||
return MAPPER.convertValue(rendered, clazz);
|
||||
} catch (IllegalArgumentException e) {
|
||||
if (rendered instanceof String str) {
|
||||
try {
|
||||
return MAPPER.readValue(str, clazz);
|
||||
} catch (JsonProcessingException ex) {
|
||||
throw new IllegalVariableEvaluationException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
throw new IllegalVariableEvaluationException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static <T> T deserialize(Object rendered, JavaType type) throws IllegalVariableEvaluationException {
|
||||
try {
|
||||
return MAPPER.convertValue(rendered, type);
|
||||
} catch (IllegalArgumentException e) {
|
||||
if (rendered instanceof String str) {
|
||||
try {
|
||||
return MAPPER.readValue(str, type);
|
||||
} catch (JsonProcessingException ex) {
|
||||
throw new IllegalVariableEvaluationException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
throw new IllegalVariableEvaluationException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Render a property then convert it as a list of target type.<br>
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#asList(Class)
|
||||
* @see RunContextProperty#asList(Class)
|
||||
*/
|
||||
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz) throws IllegalVariableEvaluationException {
|
||||
return asList(property, context, itemClazz, Map.of());
|
||||
@@ -194,37 +227,39 @@ public class Property<T> {
|
||||
/**
|
||||
* Render a property with additional variables, then convert it as a list of target type.<br>
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#asList(Class, Map)
|
||||
* @see RunContextProperty#asList(Class, Map)
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
if (property.skipCache || property.value == null) {
|
||||
JavaType type = MAPPER.getTypeFactory().constructCollectionLikeType(List.class, itemClazz);
|
||||
try {
|
||||
String trimmedExpression = property.expression.trim();
|
||||
// We need to detect if the expression is already a list or if it's a pebble expression (for eg. referencing a variable containing a list).
|
||||
// Doing that allows us to, if it's an expression, first render then read it as a list.
|
||||
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
|
||||
property.value = MAPPER.readValue(context.render(property.expression, variables), type);
|
||||
}
|
||||
// Otherwise, if it's already a list, we read it as a list first then render it from run context which handle list rendering by rendering each item of the list
|
||||
else {
|
||||
List<?> asRawList = MAPPER.readValue(property.expression, List.class);
|
||||
property.value = (T) asRawList.stream()
|
||||
.map(throwFunction(item -> {
|
||||
if (item instanceof String str) {
|
||||
return MAPPER.convertValue(context.render(str, variables), itemClazz);
|
||||
} else if (item instanceof Map map) {
|
||||
return MAPPER.convertValue(context.render(map, variables), itemClazz);
|
||||
}
|
||||
return item;
|
||||
}))
|
||||
.toList();
|
||||
}
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalVariableEvaluationException(e);
|
||||
String trimmedExpression = property.expression.trim();
|
||||
// We need to detect if the expression is already a list or if it's a pebble expression (for eg. referencing a variable containing a list).
|
||||
// Doing that allows us to, if it's an expression, first render then read it as a list.
|
||||
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
|
||||
property.value = deserialize(context.render(property.expression, variables), type);
|
||||
}
|
||||
// Otherwise, if it's already a list, we read it as a list first then render it from run context which handle list rendering by rendering each item of the list
|
||||
else {
|
||||
List<?> asRawList = deserialize(property.expression, List.class);
|
||||
property.value = (T) asRawList.stream()
|
||||
.map(throwFunction(item -> {
|
||||
Object rendered = null;
|
||||
if (item instanceof String str) {
|
||||
rendered = context.render(str, variables);
|
||||
} else if (item instanceof Map map) {
|
||||
rendered = context.render(map, variables);
|
||||
}
|
||||
|
||||
if (rendered != null) {
|
||||
return deserialize(rendered, itemClazz);
|
||||
}
|
||||
|
||||
return item;
|
||||
}))
|
||||
.toList();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -234,9 +269,9 @@ public class Property<T> {
|
||||
/**
|
||||
* Render a property then convert it as a map of target types.<br>
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class)
|
||||
* @see RunContextProperty#asMap(Class, Class)
|
||||
*/
|
||||
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
|
||||
return asMap(property, runContext, keyClass, valueClass, Map.of());
|
||||
@@ -248,7 +283,7 @@ public class Property<T> {
|
||||
* This method is safe to be used as many times as you want as the rendering and conversion will be cached.
|
||||
* Warning, due to the caching mechanism, this method is not thread-safe.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class, Map)
|
||||
* @see RunContextProperty#asMap(Class, Class, Map)
|
||||
*/
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
@@ -260,12 +295,12 @@ public class Property<T> {
|
||||
// We need to detect if the expression is already a map or if it's a pebble expression (for eg. referencing a variable containing a map).
|
||||
// Doing that allows us to, if it's an expression, first render then read it as a map.
|
||||
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
|
||||
property.value = MAPPER.readValue(runContext.render(property.expression, variables), targetMapType);
|
||||
property.value = deserialize(runContext.render(property.expression, variables), targetMapType);
|
||||
}
|
||||
// Otherwise if it's already a map we read it as a map first then render it from run context which handle map rendering by rendering each entry of the map (otherwise it will fail with nested expressions in values for eg.)
|
||||
else {
|
||||
Map asRawMap = MAPPER.readValue(property.expression, Map.class);
|
||||
property.value = MAPPER.convertValue(runContext.render(asRawMap, variables), targetMapType);
|
||||
property.value = deserialize(runContext.render(asRawMap, variables), targetMapType);
|
||||
}
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalVariableEvaluationException(e);
|
||||
|
||||
@@ -5,11 +5,13 @@ import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.assets.AssetsDeclaration;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.retrys.AbstractRetry;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.plugin.core.flow.WorkingDirectory;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.Valid;
|
||||
import jakarta.validation.constraints.Size;
|
||||
import lombok.Builder;
|
||||
@@ -78,6 +80,11 @@ abstract public class Task implements TaskInterface {
|
||||
@Valid
|
||||
private Cache taskCache;
|
||||
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
@Valid
|
||||
@Nullable
|
||||
private Property<AssetsDeclaration> assets;
|
||||
|
||||
public Optional<Task> findById(String id) {
|
||||
if (this.getId().equals(id)) {
|
||||
return Optional.of(this);
|
||||
|
||||
@@ -4,7 +4,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.tasks.runners.TaskLogLineMatcher.TaskLogMatch;
|
||||
import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
@@ -37,6 +36,7 @@ import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
abstract public class PluginUtilsService {
|
||||
|
||||
private static final TypeReference<Map<String, String>> MAP_TYPE_REFERENCE = new TypeReference<>() {};
|
||||
private static final TaskLogLineMatcher LOG_LINE_MATCHER = new TaskLogLineMatcher();
|
||||
|
||||
public static Map<String, String> createOutputFiles(
|
||||
Path tempDirectory,
|
||||
@@ -169,12 +169,9 @@ abstract public class PluginUtilsService {
|
||||
}
|
||||
|
||||
public static Map<String, Object> parseOut(String line, Logger logger, RunContext runContext, boolean isStdErr, Instant customInstant) {
|
||||
|
||||
TaskLogLineMatcher logLineMatcher = ((DefaultRunContext) runContext).getApplicationContext().getBean(TaskLogLineMatcher.class);
|
||||
|
||||
Map<String, Object> outputs = new HashMap<>();
|
||||
try {
|
||||
Optional<TaskLogMatch> matches = logLineMatcher.matches(line, logger, runContext, customInstant);
|
||||
Optional<TaskLogMatch> matches = LOG_LINE_MATCHER.matches(line, logger, runContext, customInstant);
|
||||
if (matches.isPresent()) {
|
||||
TaskLogMatch taskLogMatch = matches.get();
|
||||
outputs.putAll(taskLogMatch.outputs());
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
package io.kestra.core.models.tasks.runners;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.executions.AbstractMetricEntry;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import io.kestra.core.runners.AssetEmitter;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.event.Level;
|
||||
import org.slf4j.spi.LoggingEventBuilder;
|
||||
@@ -18,6 +21,7 @@ import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import static io.kestra.core.runners.RunContextLogger.ORIGINAL_TIMESTAMP_KEY;
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
|
||||
/**
|
||||
* Service for matching and capturing structured data from task execution logs.
|
||||
@@ -27,7 +31,6 @@ import static io.kestra.core.runners.RunContextLogger.ORIGINAL_TIMESTAMP_KEY;
|
||||
* ::{"outputs":{"key":"value"}}::
|
||||
* }</pre>
|
||||
*/
|
||||
@Singleton
|
||||
public class TaskLogLineMatcher {
|
||||
|
||||
protected static final Pattern LOG_DATA_SYNTAX = Pattern.compile("^::(\\{.*})::$");
|
||||
@@ -77,6 +80,18 @@ public class TaskLogLineMatcher {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (match.assets() != null) {
|
||||
try {
|
||||
AssetEmitter assetEmitter = runContext.assets();
|
||||
match.assets().forEach(throwConsumer(assetEmitter::upsert));
|
||||
} catch (IllegalVariableEvaluationException e) {
|
||||
logger.warn("Unable to get asset emitter for log '{}'", data, e);
|
||||
} catch (QueueException e) {
|
||||
logger.warn("Unable to emit asset for log '{}'", data, e);
|
||||
}
|
||||
}
|
||||
|
||||
return match;
|
||||
}
|
||||
|
||||
@@ -95,8 +110,9 @@ public class TaskLogLineMatcher {
|
||||
public record TaskLogMatch(
|
||||
Map<String, Object> outputs,
|
||||
List<AbstractMetricEntry<?>> metrics,
|
||||
List<LogLine> logs
|
||||
) {
|
||||
List<LogLine> logs,
|
||||
List<Asset> assets
|
||||
) {
|
||||
@Override
|
||||
public Map<String, Object> outputs() {
|
||||
return Optional.ofNullable(outputs).orElse(Map.of());
|
||||
@@ -108,4 +124,4 @@ public class TaskLogLineMatcher {
|
||||
String message
|
||||
) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,8 +6,10 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize;
|
||||
import io.kestra.core.models.Label;
|
||||
import io.kestra.core.models.annotations.Plugin;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.assets.AssetsDeclaration;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.WorkerGroup;
|
||||
import io.kestra.core.serializers.ListOrMapOfLabelDeserializer;
|
||||
import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
|
||||
@@ -82,6 +84,15 @@ abstract public class AbstractTrigger implements TriggerInterface {
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
private boolean failOnTriggerError = false;
|
||||
|
||||
@PluginProperty(group = PluginProperty.CORE_GROUP)
|
||||
@Schema(
|
||||
title = "Specifies whether a trigger is allowed to start a new execution even if a previous run is still in progress."
|
||||
)
|
||||
private boolean allowConcurrent = false;
|
||||
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
private Property<AssetsDeclaration> assets;
|
||||
|
||||
/**
|
||||
* For backward compatibility: we rename minLogLevel to logLevel.
|
||||
* @deprecated use {@link #logLevel} instead
|
||||
|
||||
@@ -1,22 +1,37 @@
|
||||
package io.kestra.core.models.triggers;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.Map;
|
||||
|
||||
public interface Schedulable extends PollingTriggerInterface{
|
||||
String PLUGIN_PROPERTY_RECOVER_MISSED_SCHEDULES = "recoverMissedSchedules";
|
||||
|
||||
@Schema(
|
||||
title = "The inputs to pass to the scheduled flow"
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
Map<String, Object> getInputs();
|
||||
|
||||
@Schema(
|
||||
title = "Action to take in the case of missed schedules",
|
||||
description = "`ALL` will recover all missed schedules, `LAST` will only recovered the last missing one, `NONE` will not recover any missing schedule.\n" +
|
||||
"The default is `ALL` unless a different value is configured using the global plugin configuration."
|
||||
)
|
||||
@PluginProperty
|
||||
RecoverMissedSchedules getRecoverMissedSchedules();
|
||||
|
||||
/**
|
||||
* Compute the previous evaluation of a trigger.
|
||||
* This is used when a trigger misses some schedule to compute the next date to evaluate in the past.
|
||||
*/
|
||||
ZonedDateTime previousEvaluationDate(ConditionContext conditionContext) throws IllegalVariableEvaluationException;
|
||||
|
||||
RecoverMissedSchedules getRecoverMissedSchedules();
|
||||
|
||||
|
||||
/**
|
||||
* Load the default RecoverMissedSchedules from plugin property, or else ALL.
|
||||
*/
|
||||
|
||||
@@ -172,7 +172,7 @@ public class Trigger extends TriggerContext implements HasUID {
|
||||
|
||||
if (abstractTrigger instanceof PollingTriggerInterface pollingTriggerInterface) {
|
||||
try {
|
||||
nextDate = pollingTriggerInterface.nextEvaluationDate(conditionContext, Optional.empty());
|
||||
nextDate = pollingTriggerInterface.nextEvaluationDate(conditionContext, lastTrigger);
|
||||
} catch (InvalidTriggerConfigurationException e) {
|
||||
disabled = true;
|
||||
}
|
||||
|
||||
@@ -6,12 +6,9 @@ import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionTrigger;
|
||||
import io.kestra.core.models.tasks.Output;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.FlowInputOutput;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.*;
|
||||
|
||||
public abstract class TriggerService {
|
||||
@@ -51,58 +48,6 @@ public abstract class TriggerService {
|
||||
return generateExecution(IdUtils.create(), trigger, context, executionTrigger, conditionContext);
|
||||
}
|
||||
|
||||
public static Execution generateScheduledExecution(
|
||||
AbstractTrigger trigger,
|
||||
ConditionContext conditionContext,
|
||||
TriggerContext context,
|
||||
List<Label> labels,
|
||||
Map<String, Object> inputs,
|
||||
Map<String, Object> variables,
|
||||
Optional<ZonedDateTime> scheduleDate
|
||||
) {
|
||||
RunContext runContext = conditionContext.getRunContext();
|
||||
ExecutionTrigger executionTrigger = ExecutionTrigger.of(trigger, variables);
|
||||
|
||||
List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(labels));
|
||||
if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) {
|
||||
// add a correlation ID if none exist
|
||||
executionLabels.add(new Label(Label.CORRELATION_ID, runContext.getTriggerExecutionId()));
|
||||
}
|
||||
Execution execution = Execution.builder()
|
||||
.id(runContext.getTriggerExecutionId())
|
||||
.tenantId(context.getTenantId())
|
||||
.namespace(context.getNamespace())
|
||||
.flowId(context.getFlowId())
|
||||
.flowRevision(conditionContext.getFlow().getRevision())
|
||||
.variables(conditionContext.getFlow().getVariables())
|
||||
.labels(executionLabels)
|
||||
.state(new State())
|
||||
.trigger(executionTrigger)
|
||||
.scheduleDate(scheduleDate.map(date -> date.toInstant()).orElse(null))
|
||||
.build();
|
||||
|
||||
Map<String, Object> allInputs = new HashMap<>();
|
||||
// add flow inputs with default value
|
||||
var flow = conditionContext.getFlow();
|
||||
if (flow.getInputs() != null) {
|
||||
flow.getInputs().stream()
|
||||
.filter(input -> input.getDefaults() != null)
|
||||
.forEach(input -> allInputs.put(input.getId(), input.getDefaults()));
|
||||
}
|
||||
|
||||
if (inputs != null) {
|
||||
allInputs.putAll(inputs);
|
||||
}
|
||||
|
||||
// add inputs and inject defaults
|
||||
if (!allInputs.isEmpty()) {
|
||||
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class);
|
||||
execution = execution.withInputs(flowInputOutput.readExecutionInputs(conditionContext.getFlow(), execution, allInputs));
|
||||
}
|
||||
|
||||
return execution;
|
||||
}
|
||||
|
||||
private static Execution generateExecution(
|
||||
String id,
|
||||
AbstractTrigger trigger,
|
||||
@@ -111,6 +56,7 @@ public abstract class TriggerService {
|
||||
ConditionContext conditionContext
|
||||
) {
|
||||
List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(trigger.getLabels()));
|
||||
executionLabels.add(new Label(Label.FROM, "trigger"));
|
||||
if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) {
|
||||
// add a correlation ID if none exist
|
||||
executionLabels.add(new Label(Label.CORRELATION_ID, id));
|
||||
|
||||
@@ -67,6 +67,11 @@ public class ManualConstraintViolation<T> implements ConstraintViolation<T> {
|
||||
invalidValue
|
||||
)));
|
||||
}
|
||||
public static <T> ConstraintViolationException toConstraintViolationException(
|
||||
Set<? extends ConstraintViolation<?>> constraintViolations
|
||||
) {
|
||||
return new ConstraintViolationException(constraintViolations);
|
||||
}
|
||||
|
||||
public String getMessageTemplate() {
|
||||
return "{messageTemplate}";
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.core.plugins;
|
||||
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
@@ -2,6 +2,7 @@ package io.kestra.core.plugins;
|
||||
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import io.kestra.core.app.AppPluginInterface;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.models.dashboards.DataFilterKPI;
|
||||
@@ -11,6 +12,7 @@ import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.tasks.logs.LogExporter;
|
||||
import io.kestra.core.models.tasks.runners.TaskRunner;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.plugins.serdes.AssetDeserializer;
|
||||
import io.kestra.core.plugins.serdes.PluginDeserializer;
|
||||
import io.kestra.core.secret.SecretPluginInterface;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
@@ -45,5 +47,6 @@ public class PluginModule extends SimpleModule {
|
||||
addDeserializer(SecretPluginInterface.class, new PluginDeserializer<>());
|
||||
addDeserializer(AppPluginInterface.class, new PluginDeserializer<>());
|
||||
addDeserializer(LogExporter.class, new PluginDeserializer<>());
|
||||
addDeserializer(Asset.class, new AssetDeserializer());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package io.kestra.core.plugins;
|
||||
import io.kestra.core.app.AppBlockInterface;
|
||||
import io.kestra.core.app.AppPluginInterface;
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.models.dashboards.DataFilterKPI;
|
||||
@@ -108,6 +109,7 @@ public class PluginScanner {
|
||||
List<Class<? extends StorageInterface>> storages = new ArrayList<>();
|
||||
List<Class<? extends SecretPluginInterface>> secrets = new ArrayList<>();
|
||||
List<Class<? extends TaskRunner<?>>> taskRunners = new ArrayList<>();
|
||||
List<Class<? extends Asset>> assets = new ArrayList<>();
|
||||
List<Class<? extends AppPluginInterface>> apps = new ArrayList<>();
|
||||
List<Class<? extends AppBlockInterface>> appBlocks = new ArrayList<>();
|
||||
List<Class<? extends Chart<?>>> charts = new ArrayList<>();
|
||||
@@ -155,6 +157,10 @@ public class PluginScanner {
|
||||
//noinspection unchecked
|
||||
taskRunners.add((Class<? extends TaskRunner<?>>) runner.getClass());
|
||||
}
|
||||
case Asset asset -> {
|
||||
log.debug("Loading Asset plugin: '{}'", plugin.getClass());
|
||||
assets.add(asset.getClass());
|
||||
}
|
||||
case AppPluginInterface app -> {
|
||||
log.debug("Loading App plugin: '{}'", plugin.getClass());
|
||||
apps.add(app.getClass());
|
||||
@@ -223,6 +229,7 @@ public class PluginScanner {
|
||||
.conditions(conditions)
|
||||
.storages(storages)
|
||||
.secrets(secrets)
|
||||
.assets(assets)
|
||||
.apps(apps)
|
||||
.appBlocks(appBlocks)
|
||||
.taskRunners(taskRunners)
|
||||
|
||||
@@ -3,6 +3,7 @@ package io.kestra.core.plugins;
|
||||
import io.kestra.core.app.AppBlockInterface;
|
||||
import io.kestra.core.app.AppPluginInterface;
|
||||
import io.kestra.core.models.annotations.PluginSubGroup;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.dashboards.DataFilter;
|
||||
import io.kestra.core.models.dashboards.DataFilterKPI;
|
||||
@@ -39,6 +40,7 @@ public class RegisteredPlugin {
|
||||
public static final String STORAGES_GROUP_NAME = "storages";
|
||||
public static final String SECRETS_GROUP_NAME = "secrets";
|
||||
public static final String TASK_RUNNERS_GROUP_NAME = "task-runners";
|
||||
public static final String ASSETS_GROUP_NAME = "assets";
|
||||
public static final String APPS_GROUP_NAME = "apps";
|
||||
public static final String APP_BLOCKS_GROUP_NAME = "app-blocks";
|
||||
public static final String CHARTS_GROUP_NAME = "charts";
|
||||
@@ -56,6 +58,7 @@ public class RegisteredPlugin {
|
||||
private final List<Class<? extends StorageInterface>> storages;
|
||||
private final List<Class<? extends SecretPluginInterface>> secrets;
|
||||
private final List<Class<? extends TaskRunner<?>>> taskRunners;
|
||||
private final List<Class<? extends Asset>> assets;
|
||||
private final List<Class<? extends AppPluginInterface>> apps;
|
||||
private final List<Class<? extends AppBlockInterface>> appBlocks;
|
||||
private final List<Class<? extends Chart<?>>> charts;
|
||||
@@ -74,6 +77,7 @@ public class RegisteredPlugin {
|
||||
!storages.isEmpty() ||
|
||||
!secrets.isEmpty() ||
|
||||
!taskRunners.isEmpty() ||
|
||||
!assets.isEmpty() ||
|
||||
!apps.isEmpty() ||
|
||||
!appBlocks.isEmpty() ||
|
||||
!charts.isEmpty() ||
|
||||
@@ -145,6 +149,10 @@ public class RegisteredPlugin {
|
||||
return AppPluginInterface.class;
|
||||
}
|
||||
|
||||
if (this.getAssets().stream().anyMatch(r -> r.getName().equals(cls))) {
|
||||
return Asset.class;
|
||||
}
|
||||
|
||||
if (this.getLogExporters().stream().anyMatch(r -> r.getName().equals(cls))) {
|
||||
return LogExporter.class;
|
||||
}
|
||||
@@ -180,6 +188,7 @@ public class RegisteredPlugin {
|
||||
result.put(STORAGES_GROUP_NAME, Arrays.asList(this.getStorages().toArray(Class[]::new)));
|
||||
result.put(SECRETS_GROUP_NAME, Arrays.asList(this.getSecrets().toArray(Class[]::new)));
|
||||
result.put(TASK_RUNNERS_GROUP_NAME, Arrays.asList(this.getTaskRunners().toArray(Class[]::new)));
|
||||
result.put(ASSETS_GROUP_NAME, Arrays.asList(this.getAssets().toArray(Class[]::new)));
|
||||
result.put(APPS_GROUP_NAME, Arrays.asList(this.getApps().toArray(Class[]::new)));
|
||||
result.put(APP_BLOCKS_GROUP_NAME, Arrays.asList(this.getAppBlocks().toArray(Class[]::new)));
|
||||
result.put(CHARTS_GROUP_NAME, Arrays.asList(this.getCharts().toArray(Class[]::new)));
|
||||
@@ -359,6 +368,12 @@ public class RegisteredPlugin {
|
||||
b.append("] ");
|
||||
}
|
||||
|
||||
if (!this.getAssets().isEmpty()) {
|
||||
b.append("[Assets: ");
|
||||
b.append(this.getAssets().stream().map(Class::getName).collect(Collectors.joining(", ")));
|
||||
b.append("] ");
|
||||
}
|
||||
|
||||
if (!this.getApps().isEmpty()) {
|
||||
b.append("[Apps: ");
|
||||
b.append(this.getApps().stream().map(Class::getName).collect(Collectors.joining(", ")));
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
package io.kestra.core.plugins.notifications;
|
||||
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public interface ExecutionInterface {
|
||||
@Schema(
|
||||
title = "The execution id to use",
|
||||
description = "Default is the current execution, " +
|
||||
"change it to {{ trigger.executionId }} if you use this task with a Flow trigger to use the original execution."
|
||||
)
|
||||
Property<String> getExecutionId();
|
||||
|
||||
@Schema(
|
||||
title = "Custom fields to be added on notification"
|
||||
)
|
||||
Property<Map<String, Object>> getCustomFields();
|
||||
|
||||
@Schema(
|
||||
title = "Custom message to be added on notification"
|
||||
)
|
||||
Property<String> getCustomMessage();
|
||||
}
|
||||
@@ -0,0 +1,140 @@
|
||||
package io.kestra.core.plugins.notifications;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.retrys.Exponential;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||
import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.RetryUtils;
|
||||
import io.kestra.core.utils.UriProvider;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.*;
|
||||
|
||||
public final class ExecutionService {
|
||||
private ExecutionService() {}
|
||||
|
||||
public static Execution findExecution(RunContext runContext, Property<String> executionId) throws IllegalVariableEvaluationException, NoSuchElementException {
|
||||
ExecutionRepositoryInterface executionRepository = ((DefaultRunContext) runContext).getApplicationContext().getBean(ExecutionRepositoryInterface.class);
|
||||
|
||||
RetryUtils.Instance<Execution, NoSuchElementException> retryInstance = RetryUtils
|
||||
.of(Exponential.builder()
|
||||
.delayFactor(2.0)
|
||||
.interval(Duration.ofSeconds(1))
|
||||
.maxInterval(Duration.ofSeconds(15))
|
||||
.maxAttempts(-1)
|
||||
.maxDuration(Duration.ofMinutes(10))
|
||||
.build(),
|
||||
runContext.logger()
|
||||
);
|
||||
|
||||
var executionRendererId = runContext.render(executionId).as(String.class).orElse(null);
|
||||
var flowTriggerExecutionState = getOptionalFlowTriggerExecutionState(runContext);
|
||||
|
||||
var flowVars = (Map<String, String>) runContext.getVariables().get("flow");
|
||||
var isCurrentExecution = isCurrentExecution(runContext, executionRendererId);
|
||||
if (isCurrentExecution) {
|
||||
runContext.logger().info("Loading execution data for the current execution.");
|
||||
}
|
||||
|
||||
return retryInstance.run(
|
||||
NoSuchElementException.class,
|
||||
() -> executionRepository.findById(flowVars.get("tenantId"), executionRendererId)
|
||||
.filter(foundExecution -> isExecutionInTheWantedState(foundExecution, isCurrentExecution, flowTriggerExecutionState))
|
||||
.orElseThrow(() -> new NoSuchElementException("Unable to find execution '" + executionRendererId + "'"))
|
||||
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* ExecutionRepository can be out of sync in ElasticSearch stack, with this filter we try to mitigate that
|
||||
*
|
||||
* @param execution the Execution we fetched from ExecutionRepository
|
||||
* @param isCurrentExecution true if this *Execution Task is configured to send a notification for the current Execution
|
||||
* @param flowTriggerExecutionState the Execution State that triggered the Flow trigger, if any
|
||||
* @return true if we think we fetched the right Execution data for our usecase
|
||||
*/
|
||||
public static boolean isExecutionInTheWantedState(Execution execution, boolean isCurrentExecution, Optional<String> flowTriggerExecutionState) {
|
||||
if (isCurrentExecution) {
|
||||
// we don't wait for current execution to be terminated as it could not be possible as long as this task is running
|
||||
return true;
|
||||
}
|
||||
|
||||
if (flowTriggerExecutionState.isPresent()) {
|
||||
// we were triggered by a Flow trigger that can be, for example: PAUSED
|
||||
if (flowTriggerExecutionState.get().equals(State.Type.RUNNING.toString())) {
|
||||
// RUNNING special case: we take the first state we got
|
||||
return true;
|
||||
} else {
|
||||
// to handle the case where the ExecutionRepository is out of sync in ElasticSearch stack,
|
||||
// we try to match an Execution with the same state
|
||||
return execution.getState().getCurrent().name().equals(flowTriggerExecutionState.get());
|
||||
}
|
||||
} else {
|
||||
return execution.getState().getCurrent().isTerminated();
|
||||
}
|
||||
}
|
||||
|
||||
public static Map<String, Object> executionMap(RunContext runContext, ExecutionInterface executionInterface) throws IllegalVariableEvaluationException {
|
||||
Execution execution = findExecution(runContext, executionInterface.getExecutionId());
|
||||
UriProvider uriProvider = ((DefaultRunContext) runContext).getApplicationContext().getBean(UriProvider.class);
|
||||
|
||||
Map<String, Object> templateRenderMap = new HashMap<>();
|
||||
templateRenderMap.put("duration", execution.getState().humanDuration());
|
||||
templateRenderMap.put("startDate", execution.getState().getStartDate());
|
||||
templateRenderMap.put("link", uriProvider.executionUrl(execution));
|
||||
templateRenderMap.put("execution", JacksonMapper.toMap(execution));
|
||||
|
||||
runContext.render(executionInterface.getCustomMessage())
|
||||
.as(String.class)
|
||||
.ifPresent(s -> templateRenderMap.put("customMessage", s));
|
||||
|
||||
final Map<String, Object> renderedCustomFields = runContext.render(executionInterface.getCustomFields()).asMap(String.class, Object.class);
|
||||
if (!renderedCustomFields.isEmpty()) {
|
||||
templateRenderMap.put("customFields", renderedCustomFields);
|
||||
}
|
||||
|
||||
var isCurrentExecution = isCurrentExecution(runContext, execution.getId());
|
||||
|
||||
List<TaskRun> taskRuns;
|
||||
|
||||
if (isCurrentExecution) {
|
||||
taskRuns = execution.getTaskRunList();
|
||||
} else {
|
||||
taskRuns = execution.getTaskRunList().stream()
|
||||
.filter(t -> (execution.hasFailed() ? State.Type.FAILED : State.Type.SUCCESS).equals(t.getState().getCurrent()))
|
||||
.toList();
|
||||
}
|
||||
|
||||
if (!ListUtils.isEmpty(taskRuns)) {
|
||||
TaskRun lastTaskRun = taskRuns.getLast();
|
||||
templateRenderMap.put("firstFailed", State.Type.FAILED.equals(lastTaskRun.getState().getCurrent()) ? lastTaskRun : false);
|
||||
templateRenderMap.put("lastTask", lastTaskRun);
|
||||
}
|
||||
|
||||
return templateRenderMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* if there is a state, we assume this is a Flow trigger with type: {@link io.kestra.plugin.core.trigger.Flow.Output}
|
||||
*
|
||||
* @return the state of the execution that triggered the Flow trigger, or empty if another usecase/trigger
|
||||
*/
|
||||
private static Optional<String> getOptionalFlowTriggerExecutionState(RunContext runContext) {
|
||||
var triggerVar = Optional.ofNullable(
|
||||
runContext.getVariables().get("trigger")
|
||||
);
|
||||
return triggerVar.map(trigger -> ((Map<String, String>) trigger).get("state"));
|
||||
}
|
||||
|
||||
private static boolean isCurrentExecution(RunContext runContext, String executionId) {
|
||||
var executionVars = (Map<String, String>) runContext.getVariables().get("execution");
|
||||
return executionId.equals(executionVars.get("id"));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,16 @@
|
||||
package io.kestra.core.plugins.serdes;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonDeserializer;
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.assets.CustomAsset;
|
||||
|
||||
/**
|
||||
* Specific {@link JsonDeserializer} for deserializing {@link Asset}.
|
||||
*/
|
||||
public final class AssetDeserializer extends PluginDeserializer<Asset> {
|
||||
@Override
|
||||
protected Class<? extends Plugin> fallbackClass() {
|
||||
return CustomAsset.class;
|
||||
}
|
||||
}
|
||||
@@ -12,6 +12,7 @@ import io.kestra.core.models.dashboards.charts.DataChart;
|
||||
import io.kestra.core.plugins.DefaultPluginRegistry;
|
||||
import io.kestra.core.plugins.PluginRegistry;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.micronaut.context.exceptions.NoSuchBeanException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -28,7 +29,7 @@ import java.util.Optional;
|
||||
* The {@link PluginDeserializer} uses the {@link PluginRegistry} to found the plugin class corresponding to
|
||||
* a plugin type.
|
||||
*/
|
||||
public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer<T> {
|
||||
public class PluginDeserializer<T extends Plugin> extends JsonDeserializer<T> {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(PluginDeserializer.class);
|
||||
|
||||
@@ -72,7 +73,7 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
|
||||
// By default, if no plugin-registry is configured retrieve
|
||||
// the one configured from the static Kestra's context.
|
||||
pluginRegistry = KestraContext.getContext().getPluginRegistry();
|
||||
} catch (IllegalStateException ignore) {
|
||||
} catch (IllegalStateException | NoSuchBeanException ignore) {
|
||||
// This error can only happen if the KestraContext is not initialized (i.e. in unit tests).
|
||||
log.error("No plugin registry was initialized. Use default implementation.");
|
||||
pluginRegistry = DefaultPluginRegistry.getOrCreate();
|
||||
@@ -92,6 +93,10 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
|
||||
identifier
|
||||
);
|
||||
pluginType = pluginRegistry.findClassByIdentifier(identifier);
|
||||
|
||||
if (pluginType == null) {
|
||||
pluginType = fallbackClass();
|
||||
}
|
||||
}
|
||||
|
||||
if (pluginType == null) {
|
||||
@@ -152,4 +157,8 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
|
||||
|
||||
return isVersioningSupported && version != null && !version.isEmpty() ? type + ":" + version : type;
|
||||
}
|
||||
|
||||
protected Class<? extends Plugin> fallbackClass() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
package io.kestra.core.repositories;
|
||||
|
||||
import io.kestra.core.models.Setting;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
|
||||
public interface SettingRepositoryInterface {
|
||||
Optional<Setting> findByKey(String key);
|
||||
@@ -13,5 +13,7 @@ public interface SettingRepositoryInterface {
|
||||
|
||||
Setting save(Setting setting) throws ConstraintViolationException;
|
||||
|
||||
Setting internalSave(Setting setting) throws ConstraintViolationException;
|
||||
|
||||
Setting delete(Setting setting);
|
||||
}
|
||||
|
||||
@@ -16,8 +16,8 @@ import java.util.function.Function;
|
||||
public interface TriggerRepositoryInterface extends QueryBuilderInterface<Triggers.Fields> {
|
||||
Optional<Trigger> findLast(TriggerContext trigger);
|
||||
|
||||
Optional<Trigger> findByExecution(Execution execution);
|
||||
|
||||
Optional<Trigger> findByUid(String uid);
|
||||
|
||||
List<Trigger> findAll(String tenantId);
|
||||
|
||||
List<Trigger> findAllForAllTenants();
|
||||
|
||||
12
core/src/main/java/io/kestra/core/runners/AssetEmitter.java
Normal file
12
core/src/main/java/io/kestra/core/runners/AssetEmitter.java
Normal file
@@ -0,0 +1,12 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface AssetEmitter {
|
||||
void upsert(Asset asset) throws QueueException;
|
||||
|
||||
List<Asset> outputs();
|
||||
}
|
||||
@@ -6,10 +6,14 @@ import com.google.common.base.CaseFormat;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.assets.AssetsDeclaration;
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.executions.AbstractMetricEntry;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.services.AssetManagerFactory;
|
||||
import io.kestra.core.plugins.PluginConfigurations;
|
||||
import io.kestra.core.services.KVStoreService;
|
||||
import io.kestra.core.storages.Storage;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
@@ -52,6 +56,7 @@ public class DefaultRunContext extends RunContext {
|
||||
private MetricRegistry meterRegistry;
|
||||
private VersionProvider version;
|
||||
private KVStoreService kvStoreService;
|
||||
private AssetManagerFactory assetManagerFactory;
|
||||
private Optional<String> secretKey;
|
||||
private WorkingDir workingDir;
|
||||
private Validator validator;
|
||||
@@ -71,6 +76,8 @@ public class DefaultRunContext extends RunContext {
|
||||
private Task task;
|
||||
private AbstractTrigger trigger;
|
||||
|
||||
private volatile AssetEmitter assetEmitter;
|
||||
|
||||
private final AtomicBoolean isInitialized = new AtomicBoolean(false);
|
||||
|
||||
|
||||
@@ -159,6 +166,7 @@ public class DefaultRunContext extends RunContext {
|
||||
this.secretKey = applicationContext.getProperty("kestra.encryption.secret-key", String.class);
|
||||
this.validator = applicationContext.getBean(Validator.class);
|
||||
this.localPath = applicationContext.getBean(LocalPathFactory.class).createLocalPath(this);
|
||||
this.assetManagerFactory = applicationContext.getBean(AssetManagerFactory.class);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -235,6 +243,14 @@ public class DefaultRunContext extends RunContext {
|
||||
return runContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RunContext cloneForPlugin(Plugin plugin) {
|
||||
PluginConfigurations pluginConfigurations = applicationContext.getBean(PluginConfigurations.class);
|
||||
DefaultRunContext runContext = clone();
|
||||
runContext.pluginConfiguration = pluginConfigurations.getConfigurationByPluginTypeOrAliases(plugin.getType(), plugin.getClass());
|
||||
return runContext;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@@ -527,6 +543,23 @@ public class DefaultRunContext extends RunContext {
|
||||
return flow != null ? flow.get("tenantId") : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public TaskRunInfo taskRunInfo() {
|
||||
Optional<Map<String, Object>> maybeTaskRunMap = Optional.ofNullable(this.getVariables().get("taskrun"))
|
||||
.map(Map.class::cast);
|
||||
return new TaskRunInfo(
|
||||
(String) this.getVariables().get("executionId"),
|
||||
(String) this.getVariables().get("taskId"),
|
||||
maybeTaskRunMap.map(m -> (String) m.get("id"))
|
||||
.orElse(null),
|
||||
maybeTaskRunMap.map(m -> (String) m.get("value"))
|
||||
.orElse(null)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@@ -535,12 +568,7 @@ public class DefaultRunContext extends RunContext {
|
||||
public FlowInfo flowInfo() {
|
||||
Map<String, Object> flow = (Map<String, Object>) this.getVariables().get("flow");
|
||||
// normally only tests should not have the flow variable
|
||||
return flow == null ? new FlowInfo(null, null, null, null) : new FlowInfo(
|
||||
(String) flow.get("tenantId"),
|
||||
(String) flow.get("namespace"),
|
||||
(String) flow.get("id"),
|
||||
(Integer) flow.get("revision")
|
||||
);
|
||||
return flow == null ? new FlowInfo(null, null, null, null) : FlowInfo.from(flow);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -584,11 +612,35 @@ public class DefaultRunContext extends RunContext {
|
||||
return new AclCheckerImpl(this.applicationContext, flowInfo());
|
||||
}
|
||||
|
||||
@Override
|
||||
public AssetEmitter assets() throws IllegalVariableEvaluationException {
|
||||
if (this.assetEmitter == null) {
|
||||
synchronized (this) {
|
||||
if (this.assetEmitter == null) {
|
||||
this.assetEmitter = assetManagerFactory.of(
|
||||
Optional.ofNullable(task).map(Task::getAssets)
|
||||
.or(() -> Optional.ofNullable(trigger).map(AbstractTrigger::getAssets))
|
||||
.flatMap(throwFunction(asset -> this.render(asset).as(AssetsDeclaration.class)))
|
||||
.map(AssetsDeclaration::isEnableAuto)
|
||||
.orElse(false)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return this.assetEmitter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LocalPath localPath() {
|
||||
return localPath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputAndOutput inputAndOutput() {
|
||||
return new InputAndOutputImpl(this.applicationContext, this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Builder class for constructing new {@link DefaultRunContext} objects.
|
||||
*/
|
||||
|
||||
@@ -189,12 +189,11 @@ public final class ExecutableUtils {
|
||||
variables.put("taskRunIteration", currentTaskRun.getIteration());
|
||||
}
|
||||
|
||||
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class);
|
||||
Instant scheduleOnDate = runContext.render(scheduleDate).as(ZonedDateTime.class).map(date -> date.toInstant()).orElse(null);
|
||||
Execution execution = Execution
|
||||
.newExecution(
|
||||
flow,
|
||||
(f, e) -> flowInputOutput.readExecutionInputs(f, e, inputs),
|
||||
(f, e) -> runContext.inputAndOutput().readInputs(f, e, inputs),
|
||||
newLabels,
|
||||
Optional.empty())
|
||||
.withTrigger(ExecutionTrigger.builder()
|
||||
|
||||
@@ -3,13 +3,13 @@ package io.kestra.core.runners;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.kestra.core.encryption.EncryptionService;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.exceptions.KestraRuntimeException;
|
||||
|
||||
import io.kestra.core.exceptions.InputOutputValidationException;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Data;
|
||||
import io.kestra.core.models.flows.DependsOn;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.flows.Output;
|
||||
import io.kestra.core.models.flows.RenderableInput;
|
||||
import io.kestra.core.models.flows.Type;
|
||||
import io.kestra.core.models.flows.input.FileInput;
|
||||
@@ -19,7 +19,6 @@ import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.property.PropertyContext;
|
||||
import io.kestra.core.models.property.URIFetcher;
|
||||
import io.kestra.core.models.tasks.common.EncryptedString;
|
||||
import io.kestra.core.models.validations.ManualConstraintViolation;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
@@ -158,11 +157,7 @@ public class FlowInputOutput {
|
||||
File tempFile = File.createTempFile(prefix, fileExtension);
|
||||
try (var inputStream = fileUpload.getInputStream();
|
||||
var outputStream = new FileOutputStream(tempFile)) {
|
||||
long transferredBytes = inputStream.transferTo(outputStream);
|
||||
if (transferredBytes == 0) {
|
||||
sink.error(new KestraRuntimeException("Can't upload file: " + fileUpload.getFilename()));
|
||||
return;
|
||||
}
|
||||
inputStream.transferTo(outputStream);
|
||||
URI from = storageInterface.from(execution, inputId, fileName, tempFile);
|
||||
sink.next(Map.entry(inputId, from.toString()));
|
||||
} finally {
|
||||
@@ -213,8 +208,8 @@ public class FlowInputOutput {
|
||||
.filter(InputAndValue::enabled)
|
||||
.map(it -> {
|
||||
//TODO check to return all exception at-once.
|
||||
if (it.exception() != null) {
|
||||
throw it.exception();
|
||||
if (it.exceptions() != null && !it.exceptions().isEmpty()) {
|
||||
throw InputOutputValidationException.merge(it.exceptions());
|
||||
}
|
||||
return new AbstractMap.SimpleEntry<>(it.input().getId(), it.value());
|
||||
})
|
||||
@@ -298,13 +293,9 @@ public class FlowInputOutput {
|
||||
try {
|
||||
isInputEnabled = Boolean.TRUE.equals(runContext.renderTyped(dependsOnCondition.get()));
|
||||
} catch (IllegalVariableEvaluationException e) {
|
||||
resolvable.resolveWithError(ManualConstraintViolation.toConstraintViolationException(
|
||||
"Invalid condition: " + e.getMessage(),
|
||||
input,
|
||||
(Class<Input>)input.getClass(),
|
||||
input.getId(),
|
||||
this
|
||||
));
|
||||
resolvable.resolveWithError(
|
||||
InputOutputValidationException.of("Invalid condition: " + e.getMessage())
|
||||
);
|
||||
isInputEnabled = false;
|
||||
}
|
||||
}
|
||||
@@ -337,7 +328,7 @@ public class FlowInputOutput {
|
||||
// validate and parse input value
|
||||
if (value == null) {
|
||||
if (input.getRequired()) {
|
||||
resolvable.resolveWithError(input.toConstraintViolationException("missing required input", null));
|
||||
resolvable.resolveWithError(InputOutputValidationException.of("Missing required input:" + input.getId()));
|
||||
} else {
|
||||
resolvable.resolveWithValue(null);
|
||||
}
|
||||
@@ -347,17 +338,18 @@ public class FlowInputOutput {
|
||||
parsedInput.ifPresent(parsed -> ((Input) resolvable.get().input()).validate(parsed.getValue()));
|
||||
parsedInput.ifPresent(typed -> resolvable.resolveWithValue(typed.getValue()));
|
||||
} catch (ConstraintViolationException e) {
|
||||
ConstraintViolationException exception = e.getConstraintViolations().size() == 1 ?
|
||||
input.toConstraintViolationException(List.copyOf(e.getConstraintViolations()).getFirst().getMessage(), value) :
|
||||
input.toConstraintViolationException(e.getMessage(), value);
|
||||
resolvable.resolveWithError(exception);
|
||||
Input<?> finalInput = input;
|
||||
Set<InputOutputValidationException> exceptions = e.getConstraintViolations().stream()
|
||||
.map(c-> InputOutputValidationException.of(c.getMessage(), finalInput))
|
||||
.collect(Collectors.toSet());
|
||||
resolvable.resolveWithError(exceptions);
|
||||
}
|
||||
}
|
||||
} catch (ConstraintViolationException e) {
|
||||
resolvable.resolveWithError(e);
|
||||
} catch (Exception e) {
|
||||
ConstraintViolationException exception = input.toConstraintViolationException(e instanceof IllegalArgumentException ? e.getMessage() : e.toString(), resolvable.get().value());
|
||||
resolvable.resolveWithError(exception);
|
||||
} catch (IllegalArgumentException e){
|
||||
resolvable.resolveWithError(InputOutputValidationException.of(e.getMessage(), input));
|
||||
}
|
||||
catch (Exception e) {
|
||||
resolvable.resolveWithError(InputOutputValidationException.of(e.getMessage()));
|
||||
}
|
||||
|
||||
return resolvable.get();
|
||||
@@ -382,11 +374,11 @@ public class FlowInputOutput {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static <T> Object resolveDefaultPropertyAs(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||
return Property.as((Property<T>) input.getDefaults(), renderer, clazz);
|
||||
return Property.as((Property<T>) input.getDefaults().skipCache(), renderer, clazz);
|
||||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
private static <T> Object resolveDefaultPropertyAsList(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||
return Property.asList((Property<List<T>>) input.getDefaults(), renderer, clazz);
|
||||
return Property.asList((Property<List<T>>) input.getDefaults().skipCache(), renderer, clazz);
|
||||
}
|
||||
|
||||
private RunContext buildRunContextForExecutionAndInputs(final FlowInterface flow, final Execution execution, Map<String, InputAndValue> dependencies, final boolean decryptSecrets) {
|
||||
@@ -445,8 +437,12 @@ public class FlowInputOutput {
|
||||
}
|
||||
return entry;
|
||||
});
|
||||
} catch (Exception e) {
|
||||
throw output.toConstraintViolationException(e.getMessage(), current);
|
||||
}
|
||||
catch (IllegalArgumentException e){
|
||||
throw InputOutputValidationException.of(e.getMessage(), output);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw InputOutputValidationException.of(e.getMessage());
|
||||
}
|
||||
})
|
||||
.filter(Optional::isPresent)
|
||||
@@ -502,14 +498,14 @@ public class FlowInputOutput {
|
||||
yield storageInterface.from(execution, id, current.toString().substring(current.toString().lastIndexOf("/") + 1), new File(current.toString()));
|
||||
}
|
||||
}
|
||||
case JSON -> JacksonMapper.toObject(current.toString());
|
||||
case YAML -> YAML_MAPPER.readValue(current.toString(), JacksonMapper.OBJECT_TYPE_REFERENCE);
|
||||
case JSON -> (current instanceof Map || current instanceof Collection<?>) ? current : JacksonMapper.toObject(current.toString());
|
||||
case YAML -> (current instanceof Map || current instanceof Collection<?>) ? current : YAML_MAPPER.readValue(current.toString(), JacksonMapper.OBJECT_TYPE_REFERENCE);
|
||||
case URI -> {
|
||||
Matcher matcher = URI_PATTERN.matcher(current.toString());
|
||||
if (matcher.matches()) {
|
||||
yield current.toString();
|
||||
} else {
|
||||
throw new IllegalArgumentException("Expected `URI` but received `" + current + "`");
|
||||
throw new IllegalArgumentException("Invalid URI format.");
|
||||
}
|
||||
}
|
||||
case ARRAY, MULTISELECT -> {
|
||||
@@ -539,34 +535,10 @@ public class FlowInputOutput {
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw e;
|
||||
} catch (Throwable e) {
|
||||
throw new Exception("Expected `" + type + "` but received `" + current + "` with errors:\n```\n" + e.getMessage() + "\n```");
|
||||
throw new Exception(" errors:\n```\n" + e.getMessage() + "\n```");
|
||||
}
|
||||
}
|
||||
|
||||
public static Map<String, Object> renderFlowOutputs(List<Output> outputs, RunContext runContext) throws IllegalVariableEvaluationException {
|
||||
if (outputs == null) return Map.of();
|
||||
|
||||
// render required outputs
|
||||
Map<String, Object> outputsById = outputs
|
||||
.stream()
|
||||
.filter(output -> output.getRequired() == null || output.getRequired())
|
||||
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
|
||||
outputsById = runContext.render(outputsById);
|
||||
|
||||
// render optional outputs one by one to catch, log, and skip any error.
|
||||
for (io.kestra.core.models.flows.Output output : outputs) {
|
||||
if (Boolean.FALSE.equals(output.getRequired())) {
|
||||
try {
|
||||
outputsById.putAll(runContext.render(Map.of(output.getId(), output.getValue())));
|
||||
} catch (Exception e) {
|
||||
runContext.logger().warn("Failed to render optional flow output '{}'. Output is ignored.", output.getId(), e);
|
||||
outputsById.put(output.getId(), null);
|
||||
}
|
||||
}
|
||||
}
|
||||
return outputsById;
|
||||
}
|
||||
|
||||
/**
|
||||
* Mutable wrapper to hold a flow's input, and it's resolved value.
|
||||
*/
|
||||
@@ -595,27 +567,30 @@ public class FlowInputOutput {
|
||||
}
|
||||
|
||||
public void isDefault(boolean isDefault) {
|
||||
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), isDefault, this.input.exception());
|
||||
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), isDefault, this.input.exceptions());
|
||||
}
|
||||
|
||||
public void setInput(final Input<?> input) {
|
||||
this.input = new InputAndValue(input, this.input.value(), this.input.enabled(), this.input.isDefault(), this.input.exception());
|
||||
this.input = new InputAndValue(input, this.input.value(), this.input.enabled(), this.input.isDefault(), this.input.exceptions());
|
||||
}
|
||||
|
||||
public void resolveWithEnabled(boolean enabled) {
|
||||
this.input = new InputAndValue(this.input.input(), input.value(), enabled, this.input.isDefault(), this.input.exception());
|
||||
this.input = new InputAndValue(this.input.input(), input.value(), enabled, this.input.isDefault(), this.input.exceptions());
|
||||
markAsResolved();
|
||||
}
|
||||
|
||||
public void resolveWithValue(@Nullable Object value) {
|
||||
this.input = new InputAndValue(this.input.input(), value, this.input.enabled(), this.input.isDefault(), this.input.exception());
|
||||
this.input = new InputAndValue(this.input.input(), value, this.input.enabled(), this.input.isDefault(), this.input.exceptions());
|
||||
markAsResolved();
|
||||
}
|
||||
|
||||
public void resolveWithError(@Nullable ConstraintViolationException exception) {
|
||||
public void resolveWithError(@Nullable Set<InputOutputValidationException> exception) {
|
||||
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), this.input.isDefault(), exception);
|
||||
markAsResolved();
|
||||
}
|
||||
private void resolveWithError(@Nullable InputOutputValidationException exception){
|
||||
resolveWithError(Collections.singleton(exception));
|
||||
}
|
||||
|
||||
private void markAsResolved() {
|
||||
this.isResolved = true;
|
||||
|
||||
@@ -0,0 +1,29 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.Output;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* InputAndOutput could be used to work with flow execution inputs and outputs.
|
||||
*/
|
||||
public interface InputAndOutput {
|
||||
/**
|
||||
* Reads the inputs of a flow execution.
|
||||
*/
|
||||
Map<String, Object> readInputs(FlowInterface flow, Execution execution, Map<String, Object> inputs);
|
||||
|
||||
/**
|
||||
* Processes the outputs of a flow execution (parse them based on their types).
|
||||
*/
|
||||
Map<String, Object> typedOutputs(FlowInterface flow, Execution execution, Map<String, Object> rOutputs);
|
||||
|
||||
/**
|
||||
* Render flow execution outputs.
|
||||
*/
|
||||
Map<String, Object> renderOutputs(List<Output> outputs) throws IllegalVariableEvaluationException;
|
||||
}
|
||||
@@ -0,0 +1,56 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.Output;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
class InputAndOutputImpl implements InputAndOutput {
|
||||
private final FlowInputOutput flowInputOutput;
|
||||
private final RunContext runContext;
|
||||
|
||||
InputAndOutputImpl(ApplicationContext applicationContext, RunContext runContext) {
|
||||
this.flowInputOutput = applicationContext.getBean(FlowInputOutput.class);
|
||||
this.runContext = runContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> readInputs(FlowInterface flow, Execution execution, Map<String, Object> inputs) {
|
||||
return flowInputOutput.readExecutionInputs(flow, execution, inputs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> typedOutputs(FlowInterface flow, Execution execution, Map<String, Object> rOutputs) {
|
||||
return flowInputOutput.typedOutputs(flow, execution, rOutputs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> renderOutputs(List<Output> outputs) throws IllegalVariableEvaluationException {
|
||||
if (outputs == null) return Map.of();
|
||||
|
||||
// render required outputs
|
||||
Map<String, Object> outputsById = outputs
|
||||
.stream()
|
||||
.filter(output -> output.getRequired() == null || output.getRequired())
|
||||
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
|
||||
outputsById = runContext.render(outputsById);
|
||||
|
||||
// render optional outputs one by one to catch, log, and skip any error.
|
||||
for (io.kestra.core.models.flows.Output output : outputs) {
|
||||
if (Boolean.FALSE.equals(output.getRequired())) {
|
||||
try {
|
||||
outputsById.putAll(runContext.render(Map.of(output.getId(), output.getValue())));
|
||||
} catch (Exception e) {
|
||||
runContext.logger().warn("Failed to render optional flow output '{}'. Output is ignored.", output.getId(), e);
|
||||
outputsById.put(output.getId(), null);
|
||||
}
|
||||
}
|
||||
}
|
||||
return outputsById;
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import io.kestra.core.encryption.EncryptionService;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.executions.AbstractMetricEntry;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.property.PropertyContext;
|
||||
@@ -142,6 +143,8 @@ public abstract class RunContext implements PropertyContext {
|
||||
@Deprecated(forRemoval = true)
|
||||
public abstract String tenantId();
|
||||
|
||||
public abstract TaskRunInfo taskRunInfo();
|
||||
|
||||
public abstract FlowInfo flowInfo();
|
||||
|
||||
/**
|
||||
@@ -189,7 +192,19 @@ public abstract class RunContext implements PropertyContext {
|
||||
*/
|
||||
public abstract LocalPath localPath();
|
||||
|
||||
public record TaskRunInfo(String executionId, String taskId, String taskRunId, Object value) {
|
||||
|
||||
}
|
||||
|
||||
public record FlowInfo(String tenantId, String namespace, String id, Integer revision) {
|
||||
public static FlowInfo from(Map<String, Object> flowInfoMap) {
|
||||
return new FlowInfo(
|
||||
(String) flowInfoMap.get("tenantId"),
|
||||
(String) flowInfoMap.get("namespace"),
|
||||
(String) flowInfoMap.get("id"),
|
||||
(Integer) flowInfoMap.get("revision")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -204,4 +219,20 @@ public abstract class RunContext implements PropertyContext {
|
||||
* when Namespace ACLs are used (EE).
|
||||
*/
|
||||
public abstract AclChecker acl();
|
||||
|
||||
/**
|
||||
* Get access to the Assets handler.
|
||||
*/
|
||||
public abstract AssetEmitter assets() throws IllegalVariableEvaluationException;
|
||||
|
||||
/**
|
||||
* Clone this run context for a specific plugin.
|
||||
* @return a new run context with the plugin configuration of the given plugin.
|
||||
*/
|
||||
public abstract RunContext cloneForPlugin(Plugin plugin);
|
||||
|
||||
/**
|
||||
* @return an InputAndOutput that can be used to work with inputs and outputs.
|
||||
*/
|
||||
public abstract InputAndOutput inputAndOutput();
|
||||
}
|
||||
|
||||
@@ -171,14 +171,16 @@ public class RunContextFactory {
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@VisibleForTesting
|
||||
public RunContext of(final FlowInterface flow, final Map<String, Object> variables) {
|
||||
RunContextLogger runContextLogger = new RunContextLogger();
|
||||
return newBuilder()
|
||||
.withLogger(runContextLogger)
|
||||
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forFlow(flow), storageInterface, namespaceService, namespaceFactory))
|
||||
.withVariables(variables)
|
||||
.withVariables(newRunVariablesBuilder()
|
||||
.withFlow(flow)
|
||||
.withVariables(variables)
|
||||
.build(runContextLogger, PropertyContext.create(this.variableRenderer))
|
||||
)
|
||||
.withSecretInputs(secretInputsFromFlow(flow))
|
||||
.build();
|
||||
}
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.tasks.runners.TaskRunner;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
import io.kestra.core.plugins.PluginConfigurations;
|
||||
@@ -53,20 +51,6 @@ public class RunContextInitializer {
|
||||
@Value("${kestra.encryption.secret-key}")
|
||||
protected Optional<String> secretKey;
|
||||
|
||||
/**
|
||||
* Initializes the given {@link RunContext} for the given {@link Plugin}.
|
||||
*
|
||||
* @param runContext The {@link RunContext} to initialize.
|
||||
* @param plugin The {@link TaskRunner} used for initialization.
|
||||
* @return The {@link RunContext} to initialize
|
||||
*/
|
||||
public DefaultRunContext forPlugin(final DefaultRunContext runContext,
|
||||
final Plugin plugin) {
|
||||
runContext.init(applicationContext);
|
||||
runContext.setPluginConfiguration(pluginConfigurations.getConfigurationByPluginTypeOrAliases(plugin.getType(), plugin.getClass()));
|
||||
return runContext;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes the given {@link RunContext} for the given {@link WorkerTask} for executor.
|
||||
*
|
||||
|
||||
@@ -55,11 +55,11 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
||||
|
||||
public RunContextLogger(QueueInterface<LogEntry> logQueue, LogEntry logEntry, org.slf4j.event.Level loglevel, boolean logToFile) {
|
||||
if (logEntry.getTaskId() != null) {
|
||||
this.loggerName = "flow." + logEntry.getFlowId() + "." + logEntry.getTaskId();
|
||||
this.loggerName = baseLoggerName(logEntry) + "." + logEntry.getTaskId();
|
||||
} else if (logEntry.getTriggerId() != null) {
|
||||
this.loggerName = "flow." + logEntry.getFlowId() + "." + logEntry.getTriggerId();
|
||||
this.loggerName = baseLoggerName(logEntry) + "." + logEntry.getTriggerId();
|
||||
} else {
|
||||
this.loggerName = "flow." + logEntry.getFlowId();
|
||||
this.loggerName = baseLoggerName(logEntry);
|
||||
}
|
||||
|
||||
this.logQueue = logQueue;
|
||||
@@ -68,6 +68,10 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
||||
this.logToFile = logToFile;
|
||||
}
|
||||
|
||||
private String baseLoggerName(LogEntry logEntry) {
|
||||
return "flow." + logEntry.getTenantId() + "." + logEntry.getNamespace() + "." + logEntry.getFlowId();
|
||||
}
|
||||
|
||||
private static List<LogEntry> logEntry(ILoggingEvent event, String message, org.slf4j.event.Level level, LogEntry logEntry) {
|
||||
Iterable<String> split;
|
||||
|
||||
|
||||
@@ -160,7 +160,7 @@ public class RunnerUtils {
|
||||
executionEmitter.run();
|
||||
|
||||
if (duration == null) {
|
||||
Await.untilWithSleepInterval(() -> receive.get() != null, Duration.ofMillis(10));
|
||||
Await.until(() -> receive.get() != null, Duration.ofMillis(10));
|
||||
} else {
|
||||
Await.until(() -> receive.get() != null, Duration.ofMillis(10), duration);
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
@@ -10,6 +11,7 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.With;
|
||||
|
||||
@Value
|
||||
@AllArgsConstructor
|
||||
@@ -21,8 +23,7 @@ public class WorkerTaskResult implements HasUID {
|
||||
List<TaskRun> dynamicTaskRuns;
|
||||
|
||||
public WorkerTaskResult(TaskRun taskRun) {
|
||||
this.taskRun = taskRun;
|
||||
this.dynamicTaskRuns = new ArrayList<>();
|
||||
this(taskRun, new ArrayList<>());
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -2,11 +2,13 @@ package io.kestra.core.runners.pebble;
|
||||
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import io.kestra.core.runners.pebble.functions.RenderingFunctionInterface;
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.pebbletemplates.pebble.PebbleEngine;
|
||||
import io.pebbletemplates.pebble.extension.Extension;
|
||||
import io.pebbletemplates.pebble.extension.Function;
|
||||
import io.pebbletemplates.pebble.lexer.Syntax;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@@ -18,35 +20,44 @@ import java.util.stream.Collectors;
|
||||
|
||||
@Singleton
|
||||
public class PebbleEngineFactory {
|
||||
|
||||
|
||||
private final ApplicationContext applicationContext;
|
||||
private final VariableRenderer.VariableConfiguration variableConfiguration;
|
||||
|
||||
private final MeterRegistry meterRegistry;
|
||||
|
||||
@Inject
|
||||
public PebbleEngineFactory(ApplicationContext applicationContext, @Nullable VariableRenderer.VariableConfiguration variableConfiguration) {
|
||||
public PebbleEngineFactory(ApplicationContext applicationContext, @Nullable VariableRenderer.VariableConfiguration variableConfiguration, MeterRegistry meterRegistry) {
|
||||
this.applicationContext = applicationContext;
|
||||
this.variableConfiguration = variableConfiguration;
|
||||
this.meterRegistry = meterRegistry;
|
||||
}
|
||||
|
||||
|
||||
public PebbleEngine create() {
|
||||
PebbleEngine.Builder builder = newPebbleEngineBuilder();
|
||||
this.applicationContext.getBeansOfType(Extension.class).forEach(builder::extension);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
||||
public PebbleEngine createWithCustomSyntax(Syntax syntax, Class<? extends Extension> extension) {
|
||||
PebbleEngine.Builder builder = newPebbleEngineBuilder()
|
||||
.syntax(syntax);
|
||||
this.applicationContext.getBeansOfType(extension).forEach(builder::extension);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public PebbleEngine createWithMaskedFunctions(VariableRenderer renderer, final List<String> functionsToMask) {
|
||||
|
||||
|
||||
PebbleEngine.Builder builder = newPebbleEngineBuilder();
|
||||
|
||||
|
||||
this.applicationContext.getBeansOfType(Extension.class).stream()
|
||||
.map(e -> functionsToMask.stream().anyMatch(fun -> e.getFunctions().containsKey(fun))
|
||||
? extensionWithMaskedFunctions(renderer, e, functionsToMask)
|
||||
: e)
|
||||
.forEach(builder::extension);
|
||||
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
||||
private PebbleEngine.Builder newPebbleEngineBuilder() {
|
||||
PebbleEngine.Builder builder = new PebbleEngine.Builder()
|
||||
.registerExtensionCustomizer(ExtensionCustomizer::new)
|
||||
@@ -54,13 +65,15 @@ public class PebbleEngineFactory {
|
||||
.cacheActive(this.variableConfiguration.getCacheEnabled())
|
||||
.newLineTrimming(false)
|
||||
.autoEscaping(false);
|
||||
|
||||
|
||||
if (this.variableConfiguration.getCacheEnabled()) {
|
||||
builder = builder.templateCache(new PebbleLruCache(this.variableConfiguration.getCacheSize()));
|
||||
PebbleLruCache cache = new PebbleLruCache(this.variableConfiguration.getCacheSize());
|
||||
cache.register(meterRegistry);
|
||||
builder = builder.templateCache(cache);
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
|
||||
|
||||
private Extension extensionWithMaskedFunctions(VariableRenderer renderer, Extension initialExtension, List<String> maskedFunctions) {
|
||||
return (Extension) Proxy.newProxyInstance(
|
||||
initialExtension.getClass().getClassLoader(),
|
||||
@@ -74,16 +87,16 @@ public class PebbleEngineFactory {
|
||||
} else if (RenderingFunctionInterface.class.isAssignableFrom(entry.getValue().getClass())) {
|
||||
return Map.entry(entry.getKey(), this.variableRendererProxy(renderer, entry.getValue()));
|
||||
}
|
||||
|
||||
|
||||
return entry;
|
||||
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
}
|
||||
|
||||
|
||||
return method.invoke(initialExtension, methodArgs);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
private Function variableRendererProxy(VariableRenderer renderer, Function initialFunction) {
|
||||
return (Function) Proxy.newProxyInstance(
|
||||
initialFunction.getClass().getClassLoader(),
|
||||
@@ -96,7 +109,7 @@ public class PebbleEngineFactory {
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
private Function maskedFunctionProxy(Function initialFunction) {
|
||||
return (Function) Proxy.newProxyInstance(
|
||||
initialFunction.getClass().getClassLoader(),
|
||||
|
||||
@@ -1,29 +1,29 @@
|
||||
package io.kestra.core.runners.pebble;
|
||||
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.github.benmanes.caffeine.cache.Cache;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.binder.cache.CaffeineCacheMetrics;
|
||||
import io.pebbletemplates.pebble.cache.PebbleCache;
|
||||
import io.pebbletemplates.pebble.template.PebbleTemplate;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.function.Function;
|
||||
|
||||
@Slf4j
|
||||
public class PebbleLruCache implements PebbleCache<Object, PebbleTemplate> {
|
||||
Cache<Object, PebbleTemplate> cache;
|
||||
private final Cache<Object, PebbleTemplate> cache;
|
||||
|
||||
public PebbleLruCache(int maximumSize) {
|
||||
cache = CacheBuilder.newBuilder()
|
||||
cache = Caffeine.newBuilder()
|
||||
.initialCapacity(250)
|
||||
.maximumSize(maximumSize)
|
||||
.recordStats()
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public PebbleTemplate computeIfAbsent(Object key, Function<? super Object, ? extends PebbleTemplate> mappingFunction) {
|
||||
try {
|
||||
return cache.get(key, () -> mappingFunction.apply(key));
|
||||
return cache.get(key, mappingFunction);
|
||||
} catch (Exception e) {
|
||||
// we retry the mapping function in order to let the exception be thrown instead of being capture by cache
|
||||
return mappingFunction.apply(key);
|
||||
@@ -34,4 +34,8 @@ public class PebbleLruCache implements PebbleCache<Object, PebbleTemplate> {
|
||||
public void invalidateAll() {
|
||||
cache.invalidateAll();
|
||||
}
|
||||
|
||||
public void register(MeterRegistry meterRegistry) {
|
||||
CaffeineCacheMetrics.monitor(meterRegistry, cache, "pebble-template");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,9 +28,6 @@ public class ErrorLogsFunction implements Function {
|
||||
@Inject
|
||||
private PebbleUtils pebbleUtils;
|
||||
|
||||
@Inject
|
||||
private RetryUtils retryUtils;
|
||||
|
||||
@Override
|
||||
public List<String> getArgumentNames() {
|
||||
return Collections.emptyList();
|
||||
@@ -46,7 +43,7 @@ public class ErrorLogsFunction implements Function {
|
||||
Map<String, String> flow = (Map<String, String>) context.getVariable("flow");
|
||||
Map<String, String> execution = (Map<String, String>) context.getVariable("execution");
|
||||
|
||||
RetryUtils.Instance<List<LogEntry>, Throwable> retry = retryUtils.of(Exponential.builder()
|
||||
RetryUtils.Instance<List<LogEntry>, Throwable> retry = RetryUtils.of(Exponential.builder()
|
||||
.delayFactor(2.0)
|
||||
.interval(Duration.ofMillis(100))
|
||||
.maxInterval(Duration.ofSeconds(1))
|
||||
|
||||
@@ -26,7 +26,14 @@ public class ListOrMapOfLabelDeserializer extends JsonDeserializer<List<Label>>
|
||||
else if (p.hasToken(JsonToken.START_ARRAY)) {
|
||||
// deserialize as list
|
||||
List<Map<String, String>> ret = ctxt.readValue(p, List.class);
|
||||
return ret.stream().map(map -> new Label(map.get("key"), map.get("value"))).toList();
|
||||
return ret.stream().map(map -> {
|
||||
Object value = map.get("value");
|
||||
if (isAllowedType(value)) {
|
||||
return new Label(map.get("key"), String.valueOf(value));
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unsupported type for key: " + map.get("key") + ", value: " + value);
|
||||
}
|
||||
}).toList();
|
||||
}
|
||||
else if (p.hasToken(JsonToken.START_OBJECT)) {
|
||||
// deserialize as map
|
||||
|
||||
@@ -35,6 +35,10 @@ public final class YamlParser {
|
||||
return read(input, cls, type(cls));
|
||||
}
|
||||
|
||||
public static <T> T parse(String input, Class<T> cls, Boolean strict) {
|
||||
return strict ? read(input, cls, type(cls)) : readNonStrict(input, cls, type(cls));
|
||||
}
|
||||
|
||||
public static <T> T parse(Map<String, Object> input, Class<T> cls, Boolean strict) {
|
||||
ObjectMapper currentMapper = strict ? STRICT_MAPPER : NON_STRICT_MAPPER;
|
||||
|
||||
@@ -81,7 +85,31 @@ public final class YamlParser {
|
||||
throw toConstraintViolationException(input, resource, e);
|
||||
}
|
||||
}
|
||||
|
||||
private static <T> T readNonStrict(String input, Class<T> objectClass, String resource) {
|
||||
try {
|
||||
return NON_STRICT_MAPPER.readValue(input, objectClass);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw toConstraintViolationException(input, resource, e);
|
||||
}
|
||||
}
|
||||
private static String formatYamlErrorMessage(String originalMessage, JsonProcessingException e) {
|
||||
StringBuilder friendlyMessage = new StringBuilder();
|
||||
if (originalMessage.contains("Expected a field name")) {
|
||||
friendlyMessage.append("YAML syntax error: Invalid structure. Check indentation and ensure all fields are properly formatted.");
|
||||
} else if (originalMessage.contains("MappingStartEvent")) {
|
||||
friendlyMessage.append("YAML syntax error: Unexpected mapping start. Verify that scalar values are properly quoted if needed.");
|
||||
} else if (originalMessage.contains("Scalar value")) {
|
||||
friendlyMessage.append("YAML syntax error: Expected a simple value but found complex structure. Check for unquoted special characters.");
|
||||
} else {
|
||||
friendlyMessage.append("YAML parsing error: ").append(originalMessage.replaceAll("org\\.yaml\\.snakeyaml.*", "").trim());
|
||||
}
|
||||
if (e.getLocation() != null) {
|
||||
int line = e.getLocation().getLineNr();
|
||||
friendlyMessage.append(String.format(" (at line %d)", line));
|
||||
}
|
||||
// Return a generic but cleaner message for other YAML errors
|
||||
return friendlyMessage.toString();
|
||||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T> ConstraintViolationException toConstraintViolationException(T target, String resource, JsonProcessingException e) {
|
||||
if (e.getCause() instanceof ConstraintViolationException constraintViolationException) {
|
||||
@@ -121,11 +149,12 @@ public final class YamlParser {
|
||||
)
|
||||
));
|
||||
} else {
|
||||
String userFriendlyMessage = formatYamlErrorMessage(e.getMessage(), e);
|
||||
return new ConstraintViolationException(
|
||||
"Illegal " + resource + " source: " + e.getMessage(),
|
||||
"Illegal " + resource + " source: " + userFriendlyMessage,
|
||||
Collections.singleton(
|
||||
ManualConstraintViolation.of(
|
||||
e.getCause() == null ? e.getMessage() : e.getMessage() + "\nCaused by: " + e.getCause().getMessage(),
|
||||
userFriendlyMessage,
|
||||
target,
|
||||
(Class<T>) target.getClass(),
|
||||
"yaml",
|
||||
@@ -136,4 +165,3 @@ public final class YamlParser {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
package io.kestra.core.services;
|
||||
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.runners.AssetEmitter;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@Singleton
|
||||
public class AssetManagerFactory {
|
||||
public AssetEmitter of(boolean enabled) {
|
||||
return new AssetEmitter() {
|
||||
@Override
|
||||
public void upsert(Asset asset) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Asset> outputs() {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
32
core/src/main/java/io/kestra/core/services/AssetService.java
Normal file
32
core/src/main/java/io/kestra/core/services/AssetService.java
Normal file
@@ -0,0 +1,32 @@
|
||||
package io.kestra.core.services;
|
||||
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.assets.AssetIdentifier;
|
||||
import io.kestra.core.models.assets.AssetUser;
|
||||
import io.kestra.core.queues.QueueException;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.inject.Named;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
@Singleton
|
||||
public class AssetService implements Runnable {
|
||||
@PostConstruct
|
||||
public void start() {
|
||||
this.run();
|
||||
}
|
||||
|
||||
public void asyncUpsert(AssetUser assetUser, Asset asset) throws QueueException {
|
||||
// No-op
|
||||
}
|
||||
|
||||
public void assetLineage(AssetUser assetUser, List<AssetIdentifier> inputs, List<AssetIdentifier> outputs) throws QueueException {
|
||||
// No-op
|
||||
}
|
||||
|
||||
public void run() {
|
||||
// No-op
|
||||
}
|
||||
}
|
||||
@@ -4,7 +4,6 @@ import com.cronutils.utils.VisibleForTesting;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.models.conditions.ScheduleCondition;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
@@ -65,16 +64,6 @@ public class ConditionService {
|
||||
return this.valid(flow, conditions, conditionContext);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that all conditions are valid.
|
||||
* Warning, this method throws if a condition cannot be evaluated.
|
||||
*/
|
||||
public boolean isValid(List<ScheduleCondition> conditions, ConditionContext conditionContext) throws InternalException {
|
||||
return conditions
|
||||
.stream()
|
||||
.allMatch(throwPredicate(condition -> condition.test(conditionContext)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that all conditions are valid.
|
||||
* Warning, this method throws if a condition cannot be evaluated.
|
||||
|
||||
@@ -754,7 +754,7 @@ public class ExecutionService {
|
||||
var parentTaskRun = execution.findTaskRunByTaskRunId(taskRun.getParentTaskRunId());
|
||||
Execution newExecution = execution;
|
||||
if (parentTaskRun.getState().getCurrent() != State.Type.KILLED) {
|
||||
newExecution = newExecution.withTaskRun(parentTaskRun.withState(State.Type.KILLED));
|
||||
newExecution = newExecution.withTaskRun(parentTaskRun.withStateAndAttempt(State.Type.KILLED));
|
||||
}
|
||||
if (parentTaskRun.getParentTaskRunId() != null) {
|
||||
return killParentTaskruns(parentTaskRun, newExecution);
|
||||
|
||||
@@ -92,7 +92,14 @@ public class FlowService {
|
||||
return flowRepository
|
||||
.orElseThrow(() -> new IllegalStateException("Cannot perform operation on flow. Cause: No FlowRepository"));
|
||||
}
|
||||
|
||||
private static String formatValidationError(String message) {
|
||||
if (message.startsWith("Illegal flow source:")) {
|
||||
// Already formatted by YamlParser, return as-is
|
||||
return message;
|
||||
}
|
||||
// For other validation errors, provide context
|
||||
return "Validation error: " + message;
|
||||
}
|
||||
/**
|
||||
* Evaluates all checks defined in the given flow using the provided inputs.
|
||||
* <p>
|
||||
@@ -174,10 +181,12 @@ public class FlowService {
|
||||
modelValidator.validate(pluginDefaultService.injectAllDefaults(flow, false));
|
||||
|
||||
} catch (ConstraintViolationException e) {
|
||||
validateConstraintViolationBuilder.constraints(e.getMessage());
|
||||
String friendlyMessage = formatValidationError(e.getMessage());
|
||||
validateConstraintViolationBuilder.constraints(friendlyMessage);
|
||||
} catch (FlowProcessingException e) {
|
||||
if (e.getCause() instanceof ConstraintViolationException) {
|
||||
validateConstraintViolationBuilder.constraints(e.getMessage());
|
||||
if (e.getCause() instanceof ConstraintViolationException cve) {
|
||||
String friendlyMessage = formatValidationError(cve.getMessage());
|
||||
validateConstraintViolationBuilder.constraints(friendlyMessage);
|
||||
} else {
|
||||
Throwable cause = e.getCause() != null ? e.getCause() : e;
|
||||
validateConstraintViolationBuilder.constraints("Unable to validate the flow: " + cause.getMessage());
|
||||
@@ -579,4 +588,4 @@ public class FlowService {
|
||||
private IllegalStateException noRepositoryException() {
|
||||
return new IllegalStateException("No repository found. Make sure the `kestra.repository.type` property is set.");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,5 @@
|
||||
package io.kestra.core.storages;
|
||||
|
||||
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
|
||||
import io.kestra.core.services.NamespaceService;
|
||||
import jakarta.annotation.Nullable;
|
||||
import org.slf4j.Logger;
|
||||
@@ -101,6 +100,13 @@ public class InternalStorage implements Storage {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileAttributes getAttributes(URI uri) throws IOException {
|
||||
uriGuard(uri);
|
||||
|
||||
return this.storage.getAttributes(context.getTenantId(), context.getNamespace(), uri);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
**/
|
||||
@@ -265,7 +271,13 @@ public class InternalStorage implements Storage {
|
||||
return this.storage.put(context.getTenantId(), context.getNamespace(), resolve, new BufferedInputStream(inputStream));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<StorageContext.Task> getTaskStorageContext() {
|
||||
return Optional.ofNullable((context instanceof StorageContext.Task task) ? task : null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<FileAttributes> list(URI uri) throws IOException {
|
||||
return this.storage.list(context.getTenantId(), context.getNamespace(), uri);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
package io.kestra.core.storages;
|
||||
|
||||
import io.kestra.core.annotations.Retryable;
|
||||
import jakarta.annotation.Nullable;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
@@ -46,6 +48,15 @@ public interface Storage {
|
||||
*/
|
||||
InputStream getFile(URI uri) throws IOException;
|
||||
|
||||
/**
|
||||
* Retrieves the metadata attributes for the given URI.
|
||||
*
|
||||
* @param uri the URI of the object
|
||||
* @return the file attributes
|
||||
* @throws IOException if the attributes cannot be retrieved
|
||||
*/
|
||||
FileAttributes getAttributes(URI uri) throws IOException;
|
||||
|
||||
/**
|
||||
* Deletes the file for the given URI.
|
||||
* @param uri the file URI.
|
||||
@@ -162,4 +173,6 @@ public interface Storage {
|
||||
* @return the task storage context
|
||||
*/
|
||||
Optional<StorageContext.Task> getTaskStorageContext();
|
||||
|
||||
List<FileAttributes> list(URI uri) throws IOException;
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@@ -52,7 +53,7 @@ public interface StorageInterface extends AutoCloseable, Plugin {
|
||||
* @return an InputStream to read the object's contents
|
||||
* @throws IOException if the object cannot be read
|
||||
*/
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class, NoSuchFileException.class})
|
||||
InputStream get(String tenantId, @Nullable String namespace, URI uri) throws IOException;
|
||||
|
||||
/**
|
||||
@@ -64,7 +65,7 @@ public interface StorageInterface extends AutoCloseable, Plugin {
|
||||
* @return an InputStream to read the object's contents
|
||||
* @throws IOException if the object cannot be read
|
||||
*/
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class, NoSuchFileException.class})
|
||||
InputStream getInstanceResource(@Nullable String namespace, URI uri) throws IOException;
|
||||
|
||||
/**
|
||||
@@ -76,7 +77,7 @@ public interface StorageInterface extends AutoCloseable, Plugin {
|
||||
* @return the storage object with metadata
|
||||
* @throws IOException if the object cannot be retrieved
|
||||
*/
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class, NoSuchFileException.class})
|
||||
StorageObject getWithMetadata(String tenantId, @Nullable String namespace, URI uri) throws IOException;
|
||||
|
||||
/**
|
||||
@@ -89,7 +90,7 @@ public interface StorageInterface extends AutoCloseable, Plugin {
|
||||
* @return a list of matching object URIs
|
||||
* @throws IOException if the listing fails
|
||||
*/
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class, NoSuchFileException.class})
|
||||
List<URI> allByPrefix(String tenantId, @Nullable String namespace, URI prefix, boolean includeDirectories) throws IOException;
|
||||
|
||||
/**
|
||||
@@ -101,7 +102,7 @@ public interface StorageInterface extends AutoCloseable, Plugin {
|
||||
* @return a list of file attributes
|
||||
* @throws IOException if the listing fails
|
||||
*/
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class, NoSuchFileException.class})
|
||||
List<FileAttributes> list(String tenantId, @Nullable String namespace, URI uri) throws IOException;
|
||||
|
||||
/**
|
||||
@@ -113,7 +114,7 @@ public interface StorageInterface extends AutoCloseable, Plugin {
|
||||
* @return a list of file attributes
|
||||
* @throws IOException if the listing fails
|
||||
*/
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class, NoSuchFileException.class})
|
||||
List<FileAttributes> listInstanceResource(@Nullable String namespace, URI uri) throws IOException;
|
||||
|
||||
/**
|
||||
@@ -159,7 +160,7 @@ public interface StorageInterface extends AutoCloseable, Plugin {
|
||||
* @return the file attributes
|
||||
* @throws IOException if the attributes cannot be retrieved
|
||||
*/
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class, NoSuchFileException.class})
|
||||
FileAttributes getAttributes(String tenantId, @Nullable String namespace, URI uri) throws IOException;
|
||||
|
||||
/**
|
||||
@@ -171,7 +172,7 @@ public interface StorageInterface extends AutoCloseable, Plugin {
|
||||
* @return the file attributes
|
||||
* @throws IOException if the attributes cannot be retrieved
|
||||
*/
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class, NoSuchFileException.class})
|
||||
FileAttributes getInstanceAttributes(@Nullable String namespace, URI uri) throws IOException;
|
||||
|
||||
/**
|
||||
@@ -288,7 +289,7 @@ public interface StorageInterface extends AutoCloseable, Plugin {
|
||||
* @return the URI of the moved object
|
||||
* @throws IOException if moving fails
|
||||
*/
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class})
|
||||
@Retryable(includes = {IOException.class}, excludes = {FileNotFoundException.class, NoSuchFileException.class})
|
||||
URI move(String tenantId, @Nullable String namespace, URI from, URI to) throws IOException;
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package io.kestra.core.test.flow;
|
||||
|
||||
import io.kestra.core.models.assets.Asset;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
@@ -8,6 +9,7 @@ import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Getter
|
||||
@@ -25,5 +27,7 @@ public class TaskFixture {
|
||||
|
||||
private Map<String, Object> outputs;
|
||||
|
||||
private List<Asset> assets;
|
||||
|
||||
private Property<String> description;
|
||||
}
|
||||
|
||||
@@ -6,18 +6,18 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.BooleanSupplier;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* @deprecated use {@link org.awaitility.Awaitility} instead
|
||||
*/
|
||||
@Deprecated
|
||||
public class Await {
|
||||
private static final Duration defaultSleep = Duration.ofMillis(100);
|
||||
|
||||
public static void until(BooleanSupplier condition) {
|
||||
Await.untilWithSleepInterval(condition, null);
|
||||
Await.until(condition, null);
|
||||
}
|
||||
|
||||
public static void untilWithTimeout(BooleanSupplier condition, Duration timeout) throws TimeoutException {
|
||||
until(condition, null, timeout);
|
||||
}
|
||||
|
||||
public static void untilWithSleepInterval(BooleanSupplier condition, Duration sleep) {
|
||||
public static void until(BooleanSupplier condition, Duration sleep) {
|
||||
if (sleep == null) {
|
||||
sleep = defaultSleep;
|
||||
}
|
||||
@@ -78,10 +78,10 @@ public class Await {
|
||||
return result.get();
|
||||
}
|
||||
|
||||
public static <T> T untilWithSleepInterval(Supplier<T> supplier, Duration sleep) {
|
||||
public static <T> T until(Supplier<T> supplier, Duration sleep) {
|
||||
AtomicReference<T> result = new AtomicReference<>();
|
||||
|
||||
Await.untilWithSleepInterval(untilSupplier(supplier, result), sleep);
|
||||
Await.until(untilSupplier(supplier, result), sleep);
|
||||
|
||||
return result.get();
|
||||
}
|
||||
|
||||
@@ -1,13 +1,39 @@
|
||||
package io.kestra.core.utils;
|
||||
|
||||
import io.kestra.core.models.Setting;
|
||||
import io.kestra.core.repositories.SettingRepositoryInterface;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
@Singleton
|
||||
public class EditionProvider {
|
||||
public Edition get() {
|
||||
return Edition.OSS;
|
||||
}
|
||||
|
||||
@Inject
|
||||
private Optional<SettingRepositoryInterface> settingRepository; // repositories are not always there on unit tests
|
||||
|
||||
@PostConstruct
|
||||
void start() {
|
||||
// check the edition in the settings and update if needed, we didn't use it would allow us to detect incompatible update later if needed
|
||||
settingRepository.ifPresent(settingRepositoryInterface -> persistEdition(settingRepositoryInterface, get()));
|
||||
}
|
||||
|
||||
private void persistEdition(SettingRepositoryInterface settingRepositoryInterface, Edition edition) {
|
||||
Optional<Setting> versionSetting = settingRepositoryInterface.findByKey(Setting.INSTANCE_EDITION);
|
||||
if (versionSetting.isEmpty() || !versionSetting.get().getValue().equals(edition)) {
|
||||
settingRepositoryInterface.save(Setting.builder()
|
||||
.key(Setting.INSTANCE_EDITION)
|
||||
.value(edition)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public enum Edition {
|
||||
OSS,
|
||||
EE
|
||||
|
||||
@@ -11,11 +11,14 @@ import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* Utility class to create {@link java.util.concurrent.ExecutorService} with {@link java.util.concurrent.ExecutorService} instances.
|
||||
* WARNING: those instances will use the {@link ThreadUncaughtExceptionHandler} which terminates Kestra if an error occurs in any thread,
|
||||
* so it should not be used inside plugins.
|
||||
*/
|
||||
@Singleton
|
||||
@Slf4j
|
||||
public class ExecutorsUtils {
|
||||
@Inject
|
||||
private ThreadMainFactoryBuilder threadFactoryBuilder;
|
||||
|
||||
@Inject
|
||||
private MeterRegistry meterRegistry;
|
||||
@@ -24,7 +27,7 @@ public class ExecutorsUtils {
|
||||
return this.wrap(
|
||||
name,
|
||||
Executors.newCachedThreadPool(
|
||||
threadFactoryBuilder.build(name + "_%d")
|
||||
ThreadMainFactoryBuilder.build(name + "_%d")
|
||||
)
|
||||
);
|
||||
}
|
||||
@@ -36,7 +39,7 @@ public class ExecutorsUtils {
|
||||
60L,
|
||||
TimeUnit.SECONDS,
|
||||
new LinkedBlockingQueue<>(),
|
||||
threadFactoryBuilder.build(name + "_%d")
|
||||
ThreadMainFactoryBuilder.build(name + "_%d")
|
||||
);
|
||||
|
||||
threadPoolExecutor.allowCoreThreadTimeOut(true);
|
||||
@@ -51,7 +54,7 @@ public class ExecutorsUtils {
|
||||
return this.wrap(
|
||||
name,
|
||||
Executors.newSingleThreadExecutor(
|
||||
threadFactoryBuilder.build(name + "_%d")
|
||||
ThreadMainFactoryBuilder.build(name + "_%d")
|
||||
)
|
||||
);
|
||||
}
|
||||
@@ -60,7 +63,7 @@ public class ExecutorsUtils {
|
||||
return this.wrap(
|
||||
name,
|
||||
Executors.newSingleThreadScheduledExecutor(
|
||||
threadFactoryBuilder.build(name + "_%d")
|
||||
ThreadMainFactoryBuilder.build(name + "_%d")
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@@ -65,10 +65,17 @@ public class ListUtils {
|
||||
}
|
||||
|
||||
public static List<String> convertToListString(Object object){
|
||||
if (object instanceof List<?> list && (list.isEmpty() || list.getFirst() instanceof String)) {
|
||||
return (List<String>) list;
|
||||
} else {
|
||||
throw new IllegalArgumentException("%s in not an instance of List of String".formatted(object));
|
||||
return convertToList(object)
|
||||
.stream()
|
||||
.map(Object::toString)
|
||||
.toList();
|
||||
}
|
||||
|
||||
public static <T> List<List<T>> partition(List<T> list, int size) {
|
||||
List<List<T>> parts = new ArrayList<>();
|
||||
for (int i = 0; i < list.size(); i += size) {
|
||||
parts.add(list.subList(i, Math.min(i + size, list.size())));
|
||||
}
|
||||
return parts;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ import org.slf4j.LoggerFactory;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
/**
|
||||
* Utility class for logging
|
||||
* Utility class for server logging
|
||||
*/
|
||||
public final class Logs {
|
||||
|
||||
@@ -18,7 +18,7 @@ public final class Logs {
|
||||
private static final String EXECUTION_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[execution: {}] ";
|
||||
private static final String TRIGGER_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[trigger: {}] ";
|
||||
private static final String TASKRUN_PREFIX_WITH_TENANT = FLOW_PREFIX_WITH_TENANT + "[task: {}] [execution: {}] [taskrun: {}] ";
|
||||
|
||||
|
||||
private Logs() {}
|
||||
|
||||
public static void logExecution(FlowId flow, Logger logger, Level level, String message, Object... args) {
|
||||
@@ -29,7 +29,7 @@ public final class Logs {
|
||||
}
|
||||
|
||||
/**
|
||||
* Log an {@link Execution} via the execution logger named: 'execution.{flowId}'.
|
||||
* Log an {@link Execution} via the executor logger named: 'executor.{tenantId}.{namespace}.{flowId}'.
|
||||
*/
|
||||
public static void logExecution(Execution execution, Level level, String message, Object... args) {
|
||||
Logger logger = logger(execution);
|
||||
@@ -43,7 +43,7 @@ public final class Logs {
|
||||
}
|
||||
|
||||
/**
|
||||
* Log a {@link TriggerContext} via the trigger logger named: 'trigger.{flowId}.{triggereId}'.
|
||||
* Log a {@link TriggerContext} via the scheduler logger named: 'trigger.{tenantId}.{namespace}.{flowId}.{triggerId}'.
|
||||
*/
|
||||
public static void logTrigger(TriggerContext triggerContext, Level level, String message, Object... args) {
|
||||
Logger logger = logger(triggerContext);
|
||||
@@ -57,7 +57,7 @@ public final class Logs {
|
||||
}
|
||||
|
||||
/**
|
||||
* Log a {@link TaskRun} via the taskRun logger named: 'task.{flowId}.{taskId}'.
|
||||
* Log a {@link TaskRun} via the worker logger named: 'worker.{tenantId}.{namespace}.{flowId}.{taskId}'.
|
||||
*/
|
||||
public static void logTaskRun(TaskRun taskRun, Level level, String message, Object... args) {
|
||||
String prefix = TASKRUN_PREFIX_WITH_TENANT;
|
||||
@@ -73,19 +73,19 @@ public final class Logs {
|
||||
|
||||
private static Logger logger(TaskRun taskRun) {
|
||||
return LoggerFactory.getLogger(
|
||||
"task." + taskRun.getFlowId() + "." + taskRun.getTaskId()
|
||||
"worker." + taskRun.getTenantId() + "." + taskRun.getNamespace() + "." + taskRun.getFlowId() + "." + taskRun.getTaskId()
|
||||
);
|
||||
}
|
||||
|
||||
private static Logger logger(TriggerContext triggerContext) {
|
||||
return LoggerFactory.getLogger(
|
||||
"trigger." + triggerContext.getFlowId() + "." + triggerContext.getTriggerId()
|
||||
"scheduler." + triggerContext.getTenantId() + "." + triggerContext.getNamespace() + "." + triggerContext.getFlowId() + "." + triggerContext.getTriggerId()
|
||||
);
|
||||
}
|
||||
|
||||
private static Logger logger(Execution execution) {
|
||||
return LoggerFactory.getLogger(
|
||||
"execution." + execution.getFlowId()
|
||||
"executor." + execution.getTenantId() + "." + execution.getNamespace() + "." + execution.getFlowId()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -120,7 +120,10 @@ public class MapUtils {
|
||||
private static Collection<?> mergeCollections(Collection<?> colA, Collection<?> colB) {
|
||||
List<Object> merged = new ArrayList<>(colA.size() + colB.size());
|
||||
merged.addAll(colA);
|
||||
merged.addAll(colB);
|
||||
if (!colB.isEmpty()) {
|
||||
List<?> filtered = colB.stream().filter(it -> !colA.contains(it)).toList();
|
||||
merged.addAll(filtered);
|
||||
}
|
||||
return merged;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,14 +1,12 @@
|
||||
package io.kestra.core.utils;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import io.kestra.core.models.executions.metrics.Counter;
|
||||
import io.kestra.core.models.executions.metrics.Timer;
|
||||
import io.kestra.core.models.tasks.FileExistComportment;
|
||||
import io.kestra.core.models.tasks.NamespaceFiles;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.storages.NamespaceFile;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.time.DurationFormatUtils;
|
||||
import org.apache.commons.lang3.time.StopWatch;
|
||||
@@ -19,26 +17,27 @@ import java.io.InputStream;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
|
||||
@Singleton
|
||||
public class NamespaceFilesUtils {
|
||||
@Inject
|
||||
private ExecutorsUtils executorsUtils;
|
||||
public final class NamespaceFilesUtils {
|
||||
private static final int maxThreads = Math.max(Runtime.getRuntime().availableProcessors() * 4, 32);
|
||||
private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
|
||||
0,
|
||||
maxThreads,
|
||||
60L,
|
||||
TimeUnit.SECONDS,
|
||||
new SynchronousQueue<>(),
|
||||
new ThreadFactoryBuilder().setNameFormat("namespace-files").build()
|
||||
);;
|
||||
|
||||
private ExecutorService executorService;
|
||||
|
||||
@PostConstruct
|
||||
public void postConstruct() {
|
||||
this.executorService = executorsUtils.maxCachedThreadPool(Math.max(Runtime.getRuntime().availableProcessors() * 4, 32), "namespace-file");
|
||||
private NamespaceFilesUtils() {
|
||||
// utility class pattern
|
||||
}
|
||||
|
||||
public void loadNamespaceFiles(
|
||||
public static void loadNamespaceFiles(
|
||||
RunContext runContext,
|
||||
NamespaceFiles namespaceFiles
|
||||
)
|
||||
@@ -63,7 +62,11 @@ public class NamespaceFilesUtils {
|
||||
matchedNamespaceFiles.addAll(files);
|
||||
}
|
||||
|
||||
// Use half of the available threads to avoid impacting concurrent tasks
|
||||
int parallelism = maxThreads / 2;
|
||||
Flux.fromIterable(matchedNamespaceFiles)
|
||||
.parallel(parallelism)
|
||||
.runOn(Schedulers.fromExecutorService(EXECUTOR_SERVICE))
|
||||
.doOnNext(throwConsumer(nsFile -> {
|
||||
InputStream content = runContext.storage().getFile(nsFile.uri());
|
||||
Path path = folderPerNamespace ?
|
||||
@@ -71,7 +74,7 @@ public class NamespaceFilesUtils {
|
||||
Path.of(nsFile.path());
|
||||
runContext.workingDir().putFile(path, content, fileExistComportment);
|
||||
}))
|
||||
.publishOn(Schedulers.fromExecutorService(executorService))
|
||||
.sequential()
|
||||
.blockLast();
|
||||
|
||||
Duration duration = stopWatch.getDuration();
|
||||
|
||||
@@ -17,36 +17,36 @@ import org.slf4j.Logger;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.List;
|
||||
import java.util.function.BiPredicate;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import jakarta.inject.Singleton;
|
||||
public final class RetryUtils {
|
||||
private RetryUtils() {
|
||||
// utility class pattern
|
||||
}
|
||||
|
||||
@Singleton
|
||||
public class RetryUtils {
|
||||
public <T, E extends Throwable> Instance<T, E> of() {
|
||||
public static <T, E extends Throwable> Instance<T, E> of() {
|
||||
return Instance.<T, E>builder()
|
||||
.build();
|
||||
}
|
||||
|
||||
public <T, E extends Throwable> Instance<T, E> of(AbstractRetry policy) {
|
||||
public static <T, E extends Throwable> Instance<T, E> of(AbstractRetry policy) {
|
||||
return Instance.<T, E>builder()
|
||||
.policy(policy)
|
||||
.build();
|
||||
}
|
||||
|
||||
public <T, E extends Throwable> Instance<T, E> of(AbstractRetry policy, Function<RetryFailed, E> failureFunction) {
|
||||
public static <T, E extends Throwable> Instance<T, E> of(AbstractRetry policy, Function<RetryFailed, E> failureFunction) {
|
||||
return Instance.<T, E>builder()
|
||||
.policy(policy)
|
||||
.failureFunction(failureFunction)
|
||||
.build();
|
||||
}
|
||||
|
||||
public <T, E extends Throwable> Instance<T, E> of(AbstractRetry policy, Logger logger) {
|
||||
public static <T, E extends Throwable> Instance<T, E> of(AbstractRetry policy, Logger logger) {
|
||||
return Instance.<T, E>builder()
|
||||
.policy(policy)
|
||||
.logger(logger)
|
||||
@@ -199,7 +199,6 @@ public class RetryUtils {
|
||||
|
||||
private final int attemptCount;
|
||||
private final Duration elapsedTime;
|
||||
private final Instant startTime;
|
||||
|
||||
public <T> RetryFailed(ExecutionAttemptedEvent<? extends T> event) {
|
||||
super(
|
||||
@@ -210,7 +209,6 @@ public class RetryUtils {
|
||||
|
||||
this.attemptCount = event.getAttemptCount();
|
||||
this.elapsedTime = event.getElapsedTime();
|
||||
this.startTime = event.getStartTime().get();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user