mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-25 11:12:12 -05:00
Compare commits
381 Commits
chore/noCo
...
develop
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cb31e48f4f | ||
|
|
a3f96a2741 | ||
|
|
5ca6fa8d77 | ||
|
|
a3a206f3c4 | ||
|
|
31f1e505e3 | ||
|
|
75e0c1d11f | ||
|
|
4cf883877d | ||
|
|
87b1e8fb01 | ||
|
|
b5c6101090 | ||
|
|
e4323728d6 | ||
|
|
5495971ecf | ||
|
|
dec1ee4272 | ||
|
|
69cc6b2715 | ||
|
|
431b4ccdb9 | ||
|
|
e9207a6f53 | ||
|
|
419c1041d5 | ||
|
|
02cd5efb05 | ||
|
|
2d549940c4 | ||
|
|
97e138fbae | ||
|
|
d48f3b9bd9 | ||
|
|
291fba3281 | ||
|
|
db3b3236ac | ||
|
|
5a8a631b47 | ||
|
|
2da191896f | ||
|
|
111026369b | ||
|
|
e3a0e59e9c | ||
|
|
f352be5746 | ||
|
|
5fc6d0b5d7 | ||
|
|
de5750f656 | ||
|
|
fa870b8df2 | ||
|
|
fa4bf64a23 | ||
|
|
27f81b5b6d | ||
|
|
90d322cd67 | ||
|
|
04246ace13 | ||
|
|
e0e745cb91 | ||
|
|
c69ecd7200 | ||
|
|
4b3419bc15 | ||
|
|
352d4eb194 | ||
|
|
e433833e62 | ||
|
|
d16a8de90f | ||
|
|
4784e459d6 | ||
|
|
2abea0fcde | ||
|
|
5d5165b7b9 | ||
|
|
44d0c10713 | ||
|
|
167734e32a | ||
|
|
24e61c81c0 | ||
|
|
379764a033 | ||
|
|
d55dd275c3 | ||
|
|
f409657e8a | ||
|
|
22f0b3ffdf | ||
|
|
0d99dc6862 | ||
|
|
fd3adc48b8 | ||
|
|
1a8a47c8cd | ||
|
|
7ea95f393e | ||
|
|
6935900699 | ||
|
|
0bc8e8d74a | ||
|
|
7f77b24ae0 | ||
|
|
ec6820dc25 | ||
|
|
d94193c143 | ||
|
|
c9628047fa | ||
|
|
4cbc069af4 | ||
|
|
eabe573fe6 | ||
|
|
ecd64617c3 | ||
|
|
a5650bca0f | ||
|
|
ed59e262d4 | ||
|
|
a5f9d54f7d | ||
|
|
47f4f43198 | ||
|
|
5d31c97f7f | ||
|
|
f8107285c4 | ||
|
|
8dc8dc1796 | ||
|
|
834dfd2947 | ||
|
|
6edb88841f | ||
|
|
5653531628 | ||
|
|
ee61276106 | ||
|
|
abcf76f7b4 | ||
|
|
67ada7f61b | ||
|
|
0c13633f77 | ||
|
|
a6cf2015ff | ||
|
|
2f9216c70b | ||
|
|
1903e6fac5 | ||
|
|
2d2cb00cab | ||
|
|
01b5441d16 | ||
|
|
efc778e294 | ||
|
|
60235a4e73 | ||
|
|
b167c52e76 | ||
|
|
216b124294 | ||
|
|
b6e4df8de2 | ||
|
|
429e7c7945 | ||
|
|
e302b4be4a | ||
|
|
8e7ad9ae25 | ||
|
|
41a11abf16 | ||
|
|
1be16d5e9d | ||
|
|
e263224d7b | ||
|
|
12b89588a6 | ||
|
|
eae5eb80cb | ||
|
|
c0f6298484 | ||
|
|
ba1d6b2232 | ||
|
|
048dcb80cc | ||
|
|
a81de811d7 | ||
|
|
a960a9f982 | ||
|
|
c4d4fd935f | ||
|
|
f063a5a2d9 | ||
|
|
ac91d5605f | ||
|
|
e3d3c3651b | ||
|
|
5b6836237e | ||
|
|
2f8284b133 | ||
|
|
42992fd7c3 | ||
|
|
3a481f93d3 | ||
|
|
7e964ae563 | ||
|
|
25e54edbc9 | ||
|
|
e88dc7af76 | ||
|
|
b7a027f0dc | ||
|
|
98141d6010 | ||
|
|
bf119ab6df | ||
|
|
9bd6353b77 | ||
|
|
c0ab581cf1 | ||
|
|
0f38e19663 | ||
|
|
0c14ea621c | ||
|
|
fb14e57a7c | ||
|
|
09c707d865 | ||
|
|
86e08d71dd | ||
|
|
94c00cedeb | ||
|
|
eb12832b1e | ||
|
|
687cefdfb9 | ||
|
|
8eae8aba72 | ||
|
|
abdbb8d364 | ||
|
|
8a55ab3af6 | ||
|
|
b7cb933e1e | ||
|
|
3af003e5e4 | ||
|
|
c3861a5532 | ||
|
|
ae1f10f45a | ||
|
|
612dccfb8c | ||
|
|
2ae8df2f5f | ||
|
|
1abfa74a16 | ||
|
|
69a793b227 | ||
|
|
35ccb3e39b | ||
|
|
3a7fcb2aa1 | ||
|
|
103c5b92e9 | ||
|
|
5253eeef95 | ||
|
|
848f835191 | ||
|
|
3e55e67534 | ||
|
|
7bca8b4924 | ||
|
|
56febfb415 | ||
|
|
925b8c6954 | ||
|
|
708816fe67 | ||
|
|
5502473fa4 | ||
|
|
c6cf0147a4 | ||
|
|
2951f4b4bc | ||
|
|
4ea13e258b | ||
|
|
3f8dcb47fd | ||
|
|
42dc3b930c | ||
|
|
97a78abd28 | ||
|
|
b3b2ef1b5a | ||
|
|
596a26a137 | ||
|
|
8a9a1df436 | ||
|
|
55d0880ed3 | ||
|
|
a74ebd5cd6 | ||
|
|
f3aed38964 | ||
|
|
2595e56199 | ||
|
|
e821bd7f65 | ||
|
|
09762d2a8d | ||
|
|
018c22918f | ||
|
|
3e9c8cf7da | ||
|
|
008404e442 | ||
|
|
2b224bcde8 | ||
|
|
1977b61693 | ||
|
|
8e2267f86c | ||
|
|
24355c2a88 | ||
|
|
51adcfa908 | ||
|
|
a55baa1f96 | ||
|
|
32793fde18 | ||
|
|
4381d585ec | ||
|
|
e595e26c45 | ||
|
|
b833cf28b5 | ||
|
|
ac11e9545c | ||
|
|
a07df5f6cd | ||
|
|
f626c85346 | ||
|
|
e15b53ebb5 | ||
|
|
7edb6bc379 | ||
|
|
78c81f932b | ||
|
|
56bb3ca29c | ||
|
|
14029e8c14 | ||
|
|
bea3d63d89 | ||
|
|
24a3bbd303 | ||
|
|
f9932af2e8 | ||
|
|
e0410c8f24 | ||
|
|
424a6cb41a | ||
|
|
afde71e913 | ||
|
|
086c32e711 | ||
|
|
710abcfaac | ||
|
|
be951d015c | ||
|
|
a07260bef4 | ||
|
|
dd19f8391d | ||
|
|
354873e220 | ||
|
|
386d4a15f0 | ||
|
|
1b75f15680 | ||
|
|
957bf74d97 | ||
|
|
3cbad1ce0d | ||
|
|
760050e9fc | ||
|
|
43f47ec337 | ||
|
|
ad5521199a | ||
|
|
fe7849d7fe | ||
|
|
feeaeff0b2 | ||
|
|
ed6bc50163 | ||
|
|
069845f579 | ||
|
|
f613eb0433 | ||
|
|
e440c402b4 | ||
|
|
700527b5dc | ||
|
|
5245014a32 | ||
|
|
5db0f44fb6 | ||
|
|
a8635108b7 | ||
|
|
cd4470044e | ||
|
|
4ec7f23a7b | ||
|
|
107ba16ce3 | ||
|
|
042d548598 | ||
|
|
94bd6f0a1e | ||
|
|
f43f11e125 | ||
|
|
3dfa5f97c4 | ||
|
|
8898ba736b | ||
|
|
f5665bf719 | ||
|
|
b6db688003 | ||
|
|
93f5e366ed | ||
|
|
0465ffa5df | ||
|
|
e869c54883 | ||
|
|
32da15b2ea | ||
|
|
a72ecfc2eb | ||
|
|
7cb494b244 | ||
|
|
9d21ab4b26 | ||
|
|
8f3a5058b1 | ||
|
|
56fb304ff6 | ||
|
|
28370d80df | ||
|
|
cc5fd30b2c | ||
|
|
61c39d23c5 | ||
|
|
b5a40b2fcc | ||
|
|
825b9dbcdb | ||
|
|
393b132444 | ||
|
|
b0ce760e50 | ||
|
|
2d68dad70c | ||
|
|
d08a2d8930 | ||
|
|
2722735d2d | ||
|
|
e9b7d190d4 | ||
|
|
d6cfa01fd5 | ||
|
|
d8e3a9bd44 | ||
|
|
b18e3b76ef | ||
|
|
4660091dc9 | ||
|
|
067414ffbe | ||
|
|
26f6154eed | ||
|
|
ea44128d2b | ||
|
|
4602546045 | ||
|
|
aecd050314 | ||
|
|
d6c290cb91 | ||
|
|
56216ef0b4 | ||
|
|
e1f983cc2d | ||
|
|
68f92e1159 | ||
|
|
5b597b9520 | ||
|
|
b0606a4380 | ||
|
|
06450bfd65 | ||
|
|
ce12e19f99 | ||
|
|
e20d67f4f2 | ||
|
|
4d353937c3 | ||
|
|
8edad60695 | ||
|
|
c0ecc2cb20 | ||
|
|
682d258e7b | ||
|
|
d20f7039c7 | ||
|
|
4e1b53fadf | ||
|
|
2191331750 | ||
|
|
90c3281eae | ||
|
|
9fa94deba9 | ||
|
|
9d73d72ab0 | ||
|
|
4799ee320f | ||
|
|
40880bf7d8 | ||
|
|
7aca309be5 | ||
|
|
64899f3103 | ||
|
|
fbe6df34ca | ||
|
|
df21ef4064 | ||
|
|
64a2c3b746 | ||
|
|
f06b1c5347 | ||
|
|
ef154bb029 | ||
|
|
496e01eb3e | ||
|
|
f2c15185fb | ||
|
|
20c5328199 | ||
|
|
91330496f2 | ||
|
|
101700ac53 | ||
|
|
36389d7d79 | ||
|
|
f29dbe53a8 | ||
|
|
a6d34151bf | ||
|
|
4e54fac980 | ||
|
|
6e50654544 | ||
|
|
d146ebfb01 | ||
|
|
e353399d47 | ||
|
|
038083cdf4 | ||
|
|
568e66c75e | ||
|
|
5a8552ad36 | ||
|
|
da323d792a | ||
|
|
659731813a | ||
|
|
b8b20e76ba | ||
|
|
cf0b551f8f | ||
|
|
84840fe090 | ||
|
|
0dba0367f7 | ||
|
|
905341c185 | ||
|
|
b33fbc284d | ||
|
|
71f1bb9477 | ||
|
|
491e286eee | ||
|
|
469e230ebd | ||
|
|
ebb86f6d19 | ||
|
|
b68dcb7bf5 | ||
|
|
65786343ef | ||
|
|
d6933b8e49 | ||
|
|
8bd5593b2d | ||
|
|
af87713258 | ||
|
|
371c1281ca | ||
|
|
6a111a676c | ||
|
|
15da58dbf4 | ||
|
|
e37e2b0166 | ||
|
|
9f90412237 | ||
|
|
c3d94dc8ff | ||
|
|
98678deabb | ||
|
|
248c2154a2 | ||
|
|
546039e30a | ||
|
|
27bcb9c347 | ||
|
|
3f7b6a0e72 | ||
|
|
aeca59a3e4 | ||
|
|
d7caf9ae00 | ||
|
|
b5efc27763 | ||
|
|
4909978f7f | ||
|
|
f8740871ec | ||
|
|
187319ad54 | ||
|
|
9459d6556b | ||
|
|
a9d1e9ac4d | ||
|
|
c6c62dbe47 | ||
|
|
8f4bafc666 | ||
|
|
e46fbe480e | ||
|
|
7fd16b24e0 | ||
|
|
51529c8ead | ||
|
|
f53135a856 | ||
|
|
bd4eebed32 | ||
|
|
f2e7283c72 | ||
|
|
e31e833ce6 | ||
|
|
e7a99bb37f | ||
|
|
1fd4bf7499 | ||
|
|
c5851ce254 | ||
|
|
1f1976099e | ||
|
|
6a8e6b414b | ||
|
|
80d81820c9 | ||
|
|
0a62957f05 | ||
|
|
2c46bc0c39 | ||
|
|
f0189c32fc | ||
|
|
e6058f3d3e | ||
|
|
a5ec12c62a | ||
|
|
5439d395b1 | ||
|
|
bb363f8832 | ||
|
|
865aaa1fde | ||
|
|
116e5aad2d | ||
|
|
5860ce73bb | ||
|
|
527d80cd74 | ||
|
|
c99bd1d4ea | ||
|
|
c4a6ea617f | ||
|
|
a4b0beaf63 | ||
|
|
a5847aeb3a | ||
|
|
49bbc15d91 | ||
|
|
9d6694f807 | ||
|
|
eb51c5be37 | ||
|
|
90ee720d49 | ||
|
|
fd259082a6 | ||
|
|
b5323f969c | ||
|
|
6c826e93c8 | ||
|
|
aae3e6605d | ||
|
|
ea17077b0a | ||
|
|
117200eaab | ||
|
|
3216611828 | ||
|
|
1173eb2dde | ||
|
|
360b58a851 | ||
|
|
57e288abdd | ||
|
|
7fa14eb3f5 | ||
|
|
0ed2b0a53c | ||
|
|
68ace7a59b | ||
|
|
105b1b36e5 | ||
|
|
15e82f65c6 | ||
|
|
aec75bb673 | ||
|
|
f489678532 | ||
|
|
79fc5a3f24 |
4
.github/CONTRIBUTING.md
vendored
4
.github/CONTRIBUTING.md
vendored
@@ -63,9 +63,9 @@ You can also build it from a terminal using `./gradlew build`, the Gradle wrappe
|
||||
- Configure the following environment variables:
|
||||
- `MICRONAUT_ENVIRONMENTS`: can be set to any string and will load a custom configuration file in `cli/src/main/resources/application-{env}.yml`.
|
||||
- `KESTRA_PLUGINS_PATH`: is the path where you will save plugins as Jar and will be load on startup.
|
||||
- See the screenshot below for an example: 
|
||||
- See the screenshot below for an example: 
|
||||
- If you encounter **JavaScript memory heap out** error during startup, configure `NODE_OPTIONS` environment variable with some large value.
|
||||
- Example `NODE_OPTIONS: --max-old-space-size=4096` or `NODE_OPTIONS: --max-old-space-size=8192` 
|
||||
- Example `NODE_OPTIONS: --max-old-space-size=4096` or `NODE_OPTIONS: --max-old-space-size=8192` 
|
||||
- The server starts by default on port 8080 and is reachable on `http://localhost:8080`
|
||||
|
||||
If you want to launch all tests, you need Python and some packages installed on your machine, on Ubuntu you can install them with:
|
||||
|
||||
1
.github/ISSUE_TEMPLATE/bug.yml
vendored
1
.github/ISSUE_TEMPLATE/bug.yml
vendored
@@ -2,6 +2,7 @@ name: Bug report
|
||||
description: Report a bug or unexpected behavior in the project
|
||||
|
||||
labels: ["bug", "area/backend", "area/frontend"]
|
||||
type: Bug
|
||||
|
||||
body:
|
||||
- type: markdown
|
||||
|
||||
1
.github/ISSUE_TEMPLATE/feature.yml
vendored
1
.github/ISSUE_TEMPLATE/feature.yml
vendored
@@ -2,6 +2,7 @@ name: Feature request
|
||||
description: Suggest a new feature or improvement to enhance the project
|
||||
|
||||
labels: ["enhancement", "area/backend", "area/frontend"]
|
||||
type: Feature
|
||||
|
||||
body:
|
||||
- type: textarea
|
||||
|
||||
|
Before Width: | Height: | Size: 130 KiB After Width: | Height: | Size: 130 KiB |
|
Before Width: | Height: | Size: 210 KiB After Width: | Height: | Size: 210 KiB |
99
.github/dependabot.yml
vendored
99
.github/dependabot.yml
vendored
@@ -26,7 +26,7 @@ updates:
|
||||
open-pull-requests-limit: 50
|
||||
labels: ["dependency-upgrade", "area/backend"]
|
||||
ignore:
|
||||
# Ignore versions of Protobuf that are equal to or greater than 4.0.0 as Orc still uses 3
|
||||
# Ignore versions of Protobuf >= 4.0.0 because Orc still uses version 3
|
||||
- dependency-name: "com.google.protobuf:*"
|
||||
versions: ["[4,)"]
|
||||
|
||||
@@ -44,68 +44,75 @@ updates:
|
||||
build:
|
||||
applies-to: version-updates
|
||||
patterns: ["@esbuild/*", "@rollup/*", "@swc/*"]
|
||||
|
||||
types:
|
||||
applies-to: version-updates
|
||||
patterns: ["@types/*"]
|
||||
|
||||
storybook:
|
||||
applies-to: version-updates
|
||||
patterns: ["@storybook/*"]
|
||||
patterns: ["storybook*", "@storybook/*", "eslint-plugin-storybook"]
|
||||
|
||||
vitest:
|
||||
applies-to: version-updates
|
||||
patterns: ["vitest", "@vitest/*"]
|
||||
patch:
|
||||
|
||||
major:
|
||||
update-types: ["major"]
|
||||
applies-to: version-updates
|
||||
exclude-patterns: [
|
||||
"@esbuild/*",
|
||||
"@rollup/*",
|
||||
"@swc/*",
|
||||
"@types/*",
|
||||
"storybook*",
|
||||
"@storybook/*",
|
||||
"eslint-plugin-storybook",
|
||||
"vitest",
|
||||
"@vitest/*",
|
||||
# Temporary exclusion of these packages from major updates
|
||||
"eslint-plugin-vue",
|
||||
]
|
||||
|
||||
minor:
|
||||
update-types: ["minor"]
|
||||
applies-to: version-updates
|
||||
exclude-patterns: [
|
||||
"@esbuild/*",
|
||||
"@rollup/*",
|
||||
"@swc/*",
|
||||
"@types/*",
|
||||
"storybook*",
|
||||
"@storybook/*",
|
||||
"eslint-plugin-storybook",
|
||||
"vitest",
|
||||
"@vitest/*",
|
||||
# Temporary exclusion of these packages from minor updates
|
||||
"moment-timezone",
|
||||
"monaco-editor",
|
||||
]
|
||||
|
||||
patch:
|
||||
update-types: ["patch"]
|
||||
applies-to: version-updates
|
||||
patterns: ["*"]
|
||||
exclude-patterns:
|
||||
[
|
||||
"@esbuild/*",
|
||||
"@rollup/*",
|
||||
"@swc/*",
|
||||
"@types/*",
|
||||
"storybook*",
|
||||
"@storybook/*",
|
||||
"vitest",
|
||||
"@vitest/*",
|
||||
]
|
||||
update-types: ["patch"]
|
||||
minor:
|
||||
applies-to: version-updates
|
||||
patterns: ["*"]
|
||||
exclude-patterns: [
|
||||
"@esbuild/*",
|
||||
"@rollup/*",
|
||||
"@swc/*",
|
||||
"@types/*",
|
||||
"@storybook/*",
|
||||
"vitest",
|
||||
"@vitest/*",
|
||||
# Temporary exclusion of packages below from minor updates
|
||||
"moment-timezone",
|
||||
"monaco-editor",
|
||||
]
|
||||
update-types: ["minor"]
|
||||
major:
|
||||
applies-to: version-updates
|
||||
patterns: ["*"]
|
||||
exclude-patterns: [
|
||||
"@esbuild/*",
|
||||
"@rollup/*",
|
||||
"@swc/*",
|
||||
"@types/*",
|
||||
"@storybook/*",
|
||||
"vitest",
|
||||
"@vitest/*",
|
||||
# Temporary exclusion of packages below from major updates
|
||||
"eslint-plugin-storybook",
|
||||
"eslint-plugin-vue",
|
||||
"vitest",
|
||||
"@vitest/*",
|
||||
]
|
||||
update-types: ["major"]
|
||||
ignore:
|
||||
# Ignore updates to monaco-yaml, version is pinned to 5.3.1 due to patch-package script additions
|
||||
- dependency-name: "monaco-yaml"
|
||||
versions:
|
||||
- ">=5.3.2"
|
||||
|
||||
# Ignore updates of version 1.x, as we're using the beta of 2.x (still in beta)
|
||||
ignore:
|
||||
# Ignore updates to monaco-yaml; version is pinned to 5.3.1 due to patch-package script additions
|
||||
- dependency-name: "monaco-yaml"
|
||||
versions: [">=5.3.2"]
|
||||
|
||||
# Ignore updates of version 1.x for vue-virtual-scroller, as the project uses the beta of 2.x
|
||||
- dependency-name: "vue-virtual-scroller"
|
||||
versions:
|
||||
- "1.x"
|
||||
versions: ["1.x"]
|
||||
|
||||
2
.github/pull_request_template.md
vendored
2
.github/pull_request_template.md
vendored
@@ -12,7 +12,7 @@ _Example: Replaces legacy scroll directive with the new API._
|
||||
### 🔗 Related Issue
|
||||
|
||||
Which issue does this PR resolve? Use [GitHub Keywords](https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/using-keywords-in-issues-and-pull-requests#linking-a-pull-request-to-an-issue) to automatically link the pull request to the issue.
|
||||
_Example: Closes https://github.com/kestra-io/kestra/issues/12345._
|
||||
_Example: Closes https://github.com/kestra-io/kestra/issues/ISSUE_NUMBER._
|
||||
|
||||
### 🎨 Frontend Checklist
|
||||
|
||||
|
||||
2
.github/workflows/auto-translate-ui-keys.yml
vendored
2
.github/workflows/auto-translate-ui-keys.yml
vendored
@@ -20,7 +20,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 10
|
||||
steps:
|
||||
- uses: actions/checkout@v5
|
||||
- uses: actions/checkout@v6
|
||||
name: Checkout
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
2
.github/workflows/codeql-analysis.yml
vendored
2
.github/workflows/codeql-analysis.yml
vendored
@@ -27,7 +27,7 @@ jobs:
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v5
|
||||
uses: actions/checkout@v6
|
||||
with:
|
||||
# We must fetch at least the immediate parents so that if this is
|
||||
# a pull request then we can checkout the head.
|
||||
|
||||
@@ -33,7 +33,7 @@ jobs:
|
||||
exit 1;
|
||||
fi
|
||||
# Checkout
|
||||
- uses: actions/checkout@v5
|
||||
- uses: actions/checkout@v6
|
||||
with:
|
||||
fetch-depth: 0
|
||||
path: kestra
|
||||
|
||||
2
.github/workflows/global-start-release.yml
vendored
2
.github/workflows/global-start-release.yml
vendored
@@ -39,7 +39,7 @@ jobs:
|
||||
|
||||
# Checkout
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v5
|
||||
uses: actions/checkout@v6
|
||||
with:
|
||||
fetch-depth: 0
|
||||
token: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
|
||||
1
.github/workflows/main-build.yml
vendored
1
.github/workflows/main-build.yml
vendored
@@ -64,6 +64,7 @@ jobs:
|
||||
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
|
||||
|
||||
publish-develop-maven:
|
||||
|
||||
1
.github/workflows/release-docker.yml
vendored
1
.github/workflows/release-docker.yml
vendored
@@ -32,3 +32,4 @@ jobs:
|
||||
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
|
||||
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
|
||||
8
.github/workflows/vulnerabilities-check.yml
vendored
8
.github/workflows/vulnerabilities-check.yml
vendored
@@ -17,7 +17,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v5
|
||||
- uses: actions/checkout@v6
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
@@ -43,7 +43,7 @@ jobs:
|
||||
|
||||
# Upload dependency check report
|
||||
- name: Upload dependency check report
|
||||
uses: actions/upload-artifact@v5
|
||||
uses: actions/upload-artifact@v6
|
||||
if: ${{ always() }}
|
||||
with:
|
||||
name: dependency-check-report
|
||||
@@ -58,7 +58,7 @@ jobs:
|
||||
actions: read
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v5
|
||||
- uses: actions/checkout@v6
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
@@ -95,7 +95,7 @@ jobs:
|
||||
actions: read
|
||||
steps:
|
||||
# Checkout
|
||||
- uses: actions/checkout@v5
|
||||
- uses: actions/checkout@v6
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
|
||||
@@ -29,8 +29,8 @@ start_time2=$(date +%s)
|
||||
|
||||
echo "cd ./ui"
|
||||
cd ./ui
|
||||
echo "npm i"
|
||||
npm i
|
||||
echo "npm ci"
|
||||
npm ci
|
||||
|
||||
echo 'sh ./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"'
|
||||
./run-e2e-tests.sh --kestra-docker-image-to-test "kestra/kestra:$LOCAL_IMAGE_VERSION"
|
||||
|
||||
187
build.gradle
187
build.gradle
@@ -21,7 +21,7 @@ plugins {
|
||||
|
||||
// test
|
||||
id "com.adarshr.test-logger" version "4.0.0"
|
||||
id "org.sonarqube" version "7.0.1.6134"
|
||||
id "org.sonarqube" version "7.2.1.6560"
|
||||
id 'jacoco-report-aggregation'
|
||||
|
||||
// helper
|
||||
@@ -32,7 +32,7 @@ plugins {
|
||||
|
||||
// release
|
||||
id 'net.researchgate.release' version '3.1.0'
|
||||
id "com.gorylenko.gradle-git-properties" version "2.5.3"
|
||||
id "com.gorylenko.gradle-git-properties" version "2.5.4"
|
||||
id 'signing'
|
||||
id "com.vanniktech.maven.publish" version "0.35.0"
|
||||
|
||||
@@ -171,13 +171,22 @@ allprojects {
|
||||
subprojects {subProj ->
|
||||
|
||||
if (subProj.name != 'platform' && subProj.name != 'jmh-benchmarks') {
|
||||
|
||||
apply plugin: "com.adarshr.test-logger"
|
||||
apply plugin: 'jacoco'
|
||||
|
||||
java {
|
||||
sourceCompatibility = targetJavaVersion
|
||||
targetCompatibility = targetJavaVersion
|
||||
}
|
||||
|
||||
configurations {
|
||||
agent {
|
||||
canBeResolved = true
|
||||
canBeConsumed = true
|
||||
}
|
||||
}
|
||||
|
||||
dependencies {
|
||||
// Platform
|
||||
testAnnotationProcessor enforcedPlatform(project(":platform"))
|
||||
@@ -204,9 +213,17 @@ subprojects {subProj ->
|
||||
|
||||
//assertj
|
||||
testImplementation 'org.assertj:assertj-core'
|
||||
|
||||
agent "org.aspectj:aspectjweaver:1.9.25.1"
|
||||
|
||||
testImplementation platform("io.qameta.allure:allure-bom")
|
||||
testImplementation "io.qameta.allure:allure-junit5"
|
||||
}
|
||||
|
||||
def commonTestConfig = { Test t ->
|
||||
t.ignoreFailures = true
|
||||
t.finalizedBy jacocoTestReport
|
||||
|
||||
// set Xmx for test workers
|
||||
t.maxHeapSize = '4g'
|
||||
|
||||
@@ -223,13 +240,59 @@ subprojects {subProj ->
|
||||
t.environment 'ENV_TEST2', "Pass by env"
|
||||
|
||||
|
||||
if (subProj.name == 'core' || subProj.name == 'jdbc-h2' || subProj.name == 'jdbc-mysql' || subProj.name == 'jdbc-postgres') {
|
||||
// JUnit 5 parallel settings
|
||||
t.systemProperty 'junit.jupiter.execution.parallel.enabled', 'true'
|
||||
t.systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent'
|
||||
t.systemProperty 'junit.jupiter.execution.parallel.mode.classes.default', 'same_thread'
|
||||
t.systemProperty 'junit.jupiter.execution.parallel.config.strategy', 'dynamic'
|
||||
// if (subProj.name == 'core' || subProj.name == 'jdbc-h2' || subProj.name == 'jdbc-mysql' || subProj.name == 'jdbc-postgres') {
|
||||
// // JUnit 5 parallel settings
|
||||
// t.systemProperty 'junit.jupiter.execution.parallel.enabled', 'true'
|
||||
// t.systemProperty 'junit.jupiter.execution.parallel.mode.default', 'concurrent'
|
||||
// t.systemProperty 'junit.jupiter.execution.parallel.mode.classes.default', 'same_thread'
|
||||
// t.systemProperty 'junit.jupiter.execution.parallel.config.strategy', 'dynamic'
|
||||
// }
|
||||
}
|
||||
|
||||
tasks.register('integrationTest', Test) { Test t ->
|
||||
description = 'Runs integration tests'
|
||||
group = 'verification'
|
||||
|
||||
useJUnitPlatform {
|
||||
includeTags 'integration'
|
||||
}
|
||||
|
||||
testClassesDirs = sourceSets.test.output.classesDirs
|
||||
classpath = sourceSets.test.runtimeClasspath
|
||||
|
||||
reports {
|
||||
junitXml.required = true
|
||||
junitXml.outputPerTestCase = true
|
||||
junitXml.mergeReruns = true
|
||||
junitXml.includeSystemErrLog = true
|
||||
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
|
||||
}
|
||||
|
||||
// Integration tests typically not parallel (but you can enable)
|
||||
maxParallelForks = 1
|
||||
commonTestConfig(t)
|
||||
}
|
||||
|
||||
tasks.register('unitTest', Test) { Test t ->
|
||||
description = 'Runs unit tests'
|
||||
group = 'verification'
|
||||
|
||||
useJUnitPlatform {
|
||||
excludeTags 'flaky', 'integration'
|
||||
}
|
||||
|
||||
testClassesDirs = sourceSets.test.output.classesDirs
|
||||
classpath = sourceSets.test.runtimeClasspath
|
||||
|
||||
reports {
|
||||
junitXml.required = true
|
||||
junitXml.outputPerTestCase = true
|
||||
junitXml.mergeReruns = true
|
||||
junitXml.includeSystemErrLog = true
|
||||
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
|
||||
}
|
||||
|
||||
commonTestConfig(t)
|
||||
}
|
||||
|
||||
tasks.register('flakyTest', Test) { Test t ->
|
||||
@@ -239,7 +302,6 @@ subprojects {subProj ->
|
||||
useJUnitPlatform {
|
||||
includeTags 'flaky'
|
||||
}
|
||||
ignoreFailures = true
|
||||
|
||||
reports {
|
||||
junitXml.required = true
|
||||
@@ -249,10 +311,13 @@ subprojects {subProj ->
|
||||
junitXml.outputLocation = layout.buildDirectory.dir("test-results/flakyTest")
|
||||
}
|
||||
commonTestConfig(t)
|
||||
|
||||
}
|
||||
|
||||
test {
|
||||
// test task (default)
|
||||
tasks.named('test', Test) { Test t ->
|
||||
group = 'verification'
|
||||
description = 'Runs all non-flaky tests.'
|
||||
|
||||
useJUnitPlatform {
|
||||
excludeTags 'flaky'
|
||||
}
|
||||
@@ -263,10 +328,12 @@ subprojects {subProj ->
|
||||
junitXml.includeSystemErrLog = true
|
||||
junitXml.outputLocation = layout.buildDirectory.dir("test-results/test")
|
||||
}
|
||||
commonTestConfig(it)
|
||||
commonTestConfig(t)
|
||||
jvmArgs = ["-javaagent:${configurations.agent.singleFile}"]
|
||||
}
|
||||
|
||||
|
||||
finalizedBy(tasks.named('flakyTest'))
|
||||
tasks.named('check') {
|
||||
dependsOn(tasks.named('test'))// default behaviour
|
||||
}
|
||||
|
||||
testlogger {
|
||||
@@ -282,83 +349,25 @@ subprojects {subProj ->
|
||||
}
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************\
|
||||
* End-to-End Tests
|
||||
**********************************************************************************************************************/
|
||||
def e2eTestsCheck = tasks.register('e2eTestsCheck') {
|
||||
group = 'verification'
|
||||
description = "Runs the 'check' task for all e2e-tests modules"
|
||||
doFirst {
|
||||
project.ext.set("e2e-tests", true)
|
||||
}
|
||||
}
|
||||
|
||||
subprojects {
|
||||
// Add e2e-tests modules check tasks to e2eTestsCheck
|
||||
if (project.name.startsWith("e2e-tests")) {
|
||||
test {
|
||||
onlyIf {
|
||||
project.hasProperty("e2e-tests")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
afterEvaluate {
|
||||
// Add e2e-tests modules check tasks to e2eTestsCheck
|
||||
if (project.name.startsWith("e2e-tests")) {
|
||||
e2eTestsCheck.configure {
|
||||
finalizedBy(check)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************\
|
||||
* Allure Reports
|
||||
**********************************************************************************************************************/
|
||||
subprojects {
|
||||
if (it.name != 'platform' && it.name != 'jmh-benchmarks') {
|
||||
dependencies {
|
||||
testImplementation platform("io.qameta.allure:allure-bom")
|
||||
testImplementation "io.qameta.allure:allure-junit5"
|
||||
}
|
||||
|
||||
configurations {
|
||||
agent {
|
||||
canBeResolved = true
|
||||
canBeConsumed = true
|
||||
}
|
||||
}
|
||||
|
||||
dependencies {
|
||||
agent "org.aspectj:aspectjweaver:1.9.25"
|
||||
}
|
||||
|
||||
test {
|
||||
jvmArgs = ["-javaagent:${configurations.agent.singleFile}"]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************\
|
||||
* Jacoco
|
||||
**********************************************************************************************************************/
|
||||
subprojects {
|
||||
if (it.name != 'platform' && it.name != 'jmh-benchmarks') {
|
||||
apply plugin: 'jacoco'
|
||||
|
||||
test {
|
||||
finalizedBy jacocoTestReport
|
||||
}
|
||||
|
||||
jacocoTestReport {
|
||||
dependsOn test
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tasks.named('check') {
|
||||
dependsOn tasks.named('testCodeCoverageReport', JacocoReport)
|
||||
finalizedBy jacocoTestReport
|
||||
}
|
||||
|
||||
tasks.register('unitTest') {
|
||||
// No jacocoTestReport here, because it depends by default on :test,
|
||||
// and that would make :test being run twice in our CI.
|
||||
// In practice the report will be generated later in the CI by :check.
|
||||
}
|
||||
|
||||
tasks.register('integrationTest') {
|
||||
dependsOn tasks.named('testCodeCoverageReport', JacocoReport)
|
||||
finalizedBy jacocoTestReport
|
||||
}
|
||||
|
||||
tasks.register('flakyTest') {
|
||||
dependsOn tasks.named('testCodeCoverageReport', JacocoReport)
|
||||
finalizedBy jacocoTestReport
|
||||
}
|
||||
|
||||
tasks.named('testCodeCoverageReport') {
|
||||
|
||||
@@ -42,7 +42,7 @@ import picocli.CommandLine.Option;
|
||||
@Introspected
|
||||
public abstract class AbstractCommand implements Callable<Integer> {
|
||||
@Inject
|
||||
private ApplicationContext applicationContext;
|
||||
protected ApplicationContext applicationContext;
|
||||
|
||||
@Inject
|
||||
private EndpointDefaultConfiguration endpointConfiguration;
|
||||
|
||||
@@ -93,7 +93,7 @@ public class App implements Callable<Integer> {
|
||||
try {
|
||||
exitCode = new CommandLine(cls, new MicronautFactory(applicationContext)).execute(args);
|
||||
} catch (CommandLine.InitializationException e){
|
||||
System.err.println("Could not initialize picoli ComandLine, err: " + e.getMessage());
|
||||
System.err.println("Could not initialize picocli CommandLine, err: " + e.getMessage());
|
||||
e.printStackTrace();
|
||||
exitCode = 1;
|
||||
}
|
||||
|
||||
@@ -18,7 +18,8 @@ import picocli.CommandLine;
|
||||
FlowDotCommand.class,
|
||||
FlowExportCommand.class,
|
||||
FlowUpdateCommand.class,
|
||||
FlowUpdatesCommand.class
|
||||
FlowUpdatesCommand.class,
|
||||
FlowsSyncFromSourceCommand.class
|
||||
}
|
||||
)
|
||||
@Slf4j
|
||||
|
||||
@@ -0,0 +1,55 @@
|
||||
package io.kestra.cli.commands.flows;
|
||||
|
||||
import io.kestra.cli.AbstractApiCommand;
|
||||
import io.kestra.cli.services.TenantIdSelectorService;
|
||||
import io.kestra.core.models.flows.FlowWithSource;
|
||||
import io.kestra.core.models.flows.GenericFlow;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import jakarta.inject.Inject;
|
||||
import java.util.List;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "syncFromSource",
|
||||
description = "Update a single flow",
|
||||
mixinStandardHelpOptions = true
|
||||
)
|
||||
@Slf4j
|
||||
public class FlowsSyncFromSourceCommand extends AbstractApiCommand {
|
||||
|
||||
@Inject
|
||||
private TenantIdSelectorService tenantService;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
|
||||
FlowRepositoryInterface repository = applicationContext.getBean(FlowRepositoryInterface.class);
|
||||
String tenant = tenantService.getTenantId(tenantId);
|
||||
|
||||
List<FlowWithSource> persistedFlows = repository.findAllWithSource(tenant);
|
||||
|
||||
int count = 0;
|
||||
for (FlowWithSource persistedFlow : persistedFlows) {
|
||||
// Ensure exactly one trailing newline. We need this new line
|
||||
// because when we update a flow from its source,
|
||||
// we don't update it if no change is detected.
|
||||
// The goal here is to force an update from the source for every flows
|
||||
GenericFlow flow = GenericFlow.fromYaml(tenant,persistedFlow.getSource() + System.lineSeparator());
|
||||
repository.update(flow, persistedFlow);
|
||||
stdOut("- %s.%s".formatted(flow.getNamespace(), flow.getId()));
|
||||
count++;
|
||||
}
|
||||
stdOut("%s flow(s) successfully updated!".formatted(count));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
protected boolean loadExternalPlugins() {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -10,7 +10,8 @@ import picocli.CommandLine;
|
||||
description = "populate metadata for entities",
|
||||
subcommands = {
|
||||
KvMetadataMigrationCommand.class,
|
||||
SecretsMetadataMigrationCommand.class
|
||||
SecretsMetadataMigrationCommand.class,
|
||||
NsFilesMetadataMigrationCommand.class
|
||||
}
|
||||
)
|
||||
@Slf4j
|
||||
|
||||
@@ -1,47 +1,51 @@
|
||||
package io.kestra.cli.commands.migrations.metadata;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.kestra.core.models.kv.PersistedKvMetadata;
|
||||
import io.kestra.core.models.namespaces.files.NamespaceFileMetadata;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.repositories.KvMetadataRepositoryInterface;
|
||||
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
|
||||
import io.kestra.core.storages.FileAttributes;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.storages.kv.InternalKVStore;
|
||||
import io.kestra.core.storages.kv.KVEntry;
|
||||
import io.kestra.core.tenant.TenantService;
|
||||
import jakarta.inject.Inject;
|
||||
import io.kestra.core.utils.NamespaceUtils;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.AllArgsConstructor;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.time.Instant;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.*;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
|
||||
@Singleton
|
||||
@AllArgsConstructor
|
||||
public class MetadataMigrationService {
|
||||
@Inject
|
||||
private TenantService tenantService;
|
||||
protected FlowRepositoryInterface flowRepository;
|
||||
protected TenantService tenantService;
|
||||
protected KvMetadataRepositoryInterface kvMetadataRepository;
|
||||
protected NamespaceFileMetadataRepositoryInterface namespaceFileMetadataRepository;
|
||||
protected StorageInterface storageInterface;
|
||||
protected NamespaceUtils namespaceUtils;
|
||||
|
||||
@Inject
|
||||
private FlowRepositoryInterface flowRepository;
|
||||
|
||||
@Inject
|
||||
private KvMetadataRepositoryInterface kvMetadataRepository;
|
||||
|
||||
@Inject
|
||||
private StorageInterface storageInterface;
|
||||
|
||||
protected Map<String, List<String>> namespacesPerTenant() {
|
||||
@VisibleForTesting
|
||||
public Map<String, List<String>> namespacesPerTenant() {
|
||||
String tenantId = tenantService.resolveTenant();
|
||||
return Map.of(tenantId, flowRepository.findDistinctNamespace(tenantId));
|
||||
return Map.of(tenantId, Stream.concat(
|
||||
Stream.of(namespaceUtils.getSystemFlowNamespace()),
|
||||
flowRepository.findDistinctNamespace(tenantId).stream()
|
||||
).map(NamespaceUtils::asTree).flatMap(Collection::stream).distinct().toList());
|
||||
}
|
||||
|
||||
public void kvMigration() throws IOException {
|
||||
@@ -49,7 +53,9 @@ public class MetadataMigrationService {
|
||||
.flatMap(namespacesForTenant -> namespacesForTenant.getValue().stream().map(namespace -> Map.entry(namespacesForTenant.getKey(), namespace)))
|
||||
.flatMap(throwFunction(namespaceForTenant -> {
|
||||
InternalKVStore kvStore = new InternalKVStore(namespaceForTenant.getKey(), namespaceForTenant.getValue(), storageInterface, kvMetadataRepository);
|
||||
List<FileAttributes> list = listAllFromStorage(storageInterface, namespaceForTenant.getKey(), namespaceForTenant.getValue());
|
||||
List<FileAttributes> list = listAllFromStorage(storageInterface, StorageContext::kvPrefix, namespaceForTenant.getKey(), namespaceForTenant.getValue()).stream()
|
||||
.map(PathAndAttributes::attributes)
|
||||
.toList();
|
||||
Map<Boolean, List<KVEntry>> entriesByIsExpired = list.stream()
|
||||
.map(throwFunction(fileAttributes -> KVEntry.from(namespaceForTenant.getValue(), fileAttributes)))
|
||||
.collect(Collectors.partitioningBy(kvEntry -> Optional.ofNullable(kvEntry.expirationDate()).map(expirationDate -> Instant.now().isAfter(expirationDate)).orElse(false)));
|
||||
@@ -75,15 +81,39 @@ public class MetadataMigrationService {
|
||||
}));
|
||||
}
|
||||
|
||||
public void nsFilesMigration() throws IOException {
|
||||
this.namespacesPerTenant().entrySet().stream()
|
||||
.flatMap(namespacesForTenant -> namespacesForTenant.getValue().stream().map(namespace -> Map.entry(namespacesForTenant.getKey(), namespace)))
|
||||
.flatMap(throwFunction(namespaceForTenant -> {
|
||||
List<PathAndAttributes> list = listAllFromStorage(storageInterface, StorageContext::namespaceFilePrefix, namespaceForTenant.getKey(), namespaceForTenant.getValue());
|
||||
return list.stream()
|
||||
.map(pathAndAttributes -> NamespaceFileMetadata.of(namespaceForTenant.getKey(), namespaceForTenant.getValue(), pathAndAttributes.path(), pathAndAttributes.attributes()));
|
||||
}))
|
||||
.forEach(throwConsumer(nsFileMetadata -> {
|
||||
if (namespaceFileMetadataRepository.findByPath(nsFileMetadata.getTenantId(), nsFileMetadata.getNamespace(), nsFileMetadata.getPath()).isEmpty()) {
|
||||
namespaceFileMetadataRepository.save(nsFileMetadata);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
public void secretMigration() throws Exception {
|
||||
throw new UnsupportedOperationException("Secret migration is not needed in the OSS version");
|
||||
}
|
||||
|
||||
private static List<FileAttributes> listAllFromStorage(StorageInterface storage, String tenant, String namespace) throws IOException {
|
||||
private static List<PathAndAttributes> listAllFromStorage(StorageInterface storage, Function<String, String> prefixFunction, String tenant, String namespace) throws IOException {
|
||||
try {
|
||||
return storage.list(tenant, namespace, URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.kvPrefix(namespace)));
|
||||
} catch (FileNotFoundException e) {
|
||||
String prefix = prefixFunction.apply(namespace);
|
||||
if (!storage.exists(tenant, namespace, URI.create(StorageContext.KESTRA_PROTOCOL + prefix))) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
return storage.allByPrefix(tenant, namespace, URI.create(StorageContext.KESTRA_PROTOCOL + prefix + "/"), true).stream()
|
||||
.map(throwFunction(uri -> new PathAndAttributes(uri.getPath().substring(prefix.length()), storage.getAttributes(tenant, namespace, uri))))
|
||||
.toList();
|
||||
} catch (FileNotFoundException | NoSuchFileException e) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
||||
public record PathAndAttributes(String path, FileAttributes attributes) {}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,31 @@
|
||||
package io.kestra.cli.commands.migrations.metadata;
|
||||
|
||||
import io.kestra.cli.AbstractCommand;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Provider;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import picocli.CommandLine;
|
||||
|
||||
@CommandLine.Command(
|
||||
name = "nsfiles",
|
||||
description = "populate metadata for Namespace Files"
|
||||
)
|
||||
@Slf4j
|
||||
public class NsFilesMetadataMigrationCommand extends AbstractCommand {
|
||||
@Inject
|
||||
private Provider<MetadataMigrationService> metadataMigrationServiceProvider;
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
super.call();
|
||||
try {
|
||||
metadataMigrationServiceProvider.get().nsFilesMigration();
|
||||
} catch (Exception e) {
|
||||
System.err.println("❌ Namespace Files Metadata migration failed: " + e.getMessage());
|
||||
e.printStackTrace();
|
||||
return 1;
|
||||
}
|
||||
System.out.println("✅ Namespace Files Metadata migration complete.");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -57,7 +57,7 @@ public class StateStoreMigrateCommand extends AbstractCommand {
|
||||
String taskRunValue = statesUriPart.length > 2 ? statesUriPart[1] : null;
|
||||
String stateSubName = statesUriPart[statesUriPart.length - 1];
|
||||
boolean flowScoped = flowQualifierWithStateQualifiers[0].endsWith("/" + flow.getId());
|
||||
StateStore stateStore = new StateStore(runContext(runContextFactory, flow), false);
|
||||
StateStore stateStore = new StateStore(runContextFactory.of(flow, Map.of()), false);
|
||||
|
||||
try (InputStream is = storageInterface.get(flow.getTenantId(), flow.getNamespace(), stateStoreFileUri)) {
|
||||
stateStore.putState(flowScoped, stateName, stateSubName, taskRunValue, is.readAllBytes());
|
||||
@@ -70,12 +70,4 @@ public class StateStoreMigrateCommand extends AbstractCommand {
|
||||
stdOut("Successfully ran the state-store migration.");
|
||||
return 0;
|
||||
}
|
||||
|
||||
private RunContext runContext(RunContextFactory runContextFactory, Flow flow) {
|
||||
Map<String, String> flowVariables = new HashMap<>();
|
||||
flowVariables.put("tenantId", flow.getTenantId());
|
||||
flowVariables.put("id", flow.getId());
|
||||
flowVariables.put("namespace", flow.getNamespace());
|
||||
return runContextFactory.of(flow, Map.of("flow", flowVariables));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,73 @@
|
||||
package io.kestra.cli.commands.flows;
|
||||
|
||||
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import io.micronaut.runtime.server.EmbeddedServer;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URL;
|
||||
import java.util.List;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class FlowsSyncFromSourceCommandTest {
|
||||
@Test
|
||||
void updateAllFlowsFromSource() {
|
||||
URL directory = FlowUpdatesCommandTest.class.getClassLoader().getResource("flows");
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
|
||||
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
|
||||
embeddedServer.start();
|
||||
|
||||
String[] args = {
|
||||
"--plugins",
|
||||
"/tmp", // pass this arg because it can cause failure
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word",
|
||||
"--delete",
|
||||
directory.getPath(),
|
||||
};
|
||||
PicocliRunner.call(FlowUpdatesCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString()).contains("successfully updated !");
|
||||
out.reset();
|
||||
|
||||
FlowRepositoryInterface repository = ctx.getBean(FlowRepositoryInterface.class);
|
||||
List<Flow> flows = repository.findAll(MAIN_TENANT);
|
||||
for (Flow flow : flows) {
|
||||
assertThat(flow.getRevision()).isEqualTo(1);
|
||||
}
|
||||
|
||||
args = new String[]{
|
||||
"--plugins",
|
||||
"/tmp", // pass this arg because it can cause failure
|
||||
"--server",
|
||||
embeddedServer.getURL().toString(),
|
||||
"--user",
|
||||
"myuser:pass:word"
|
||||
|
||||
};
|
||||
PicocliRunner.call(FlowsSyncFromSourceCommand.class, ctx, args);
|
||||
|
||||
assertThat(out.toString()).contains("4 flow(s) successfully updated!");
|
||||
assertThat(out.toString()).contains("- io.kestra.outsider.quattro");
|
||||
assertThat(out.toString()).contains("- io.kestra.cli.second");
|
||||
assertThat(out.toString()).contains("- io.kestra.cli.third");
|
||||
assertThat(out.toString()).contains("- io.kestra.cli.first");
|
||||
|
||||
flows = repository.findAll(MAIN_TENANT);
|
||||
for (Flow flow : flows) {
|
||||
assertThat(flow.getRevision()).isEqualTo(2);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,57 @@
|
||||
package io.kestra.cli.commands.migrations.metadata;
|
||||
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.tenant.TenantService;
|
||||
import io.kestra.core.utils.NamespaceUtils;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class MetadataMigrationServiceTest<T extends MetadataMigrationService> {
|
||||
private static final String TENANT_ID = TestsUtils.randomTenant();
|
||||
|
||||
protected static final String SYSTEM_NAMESPACE = "my.system.namespace";
|
||||
|
||||
@Test
|
||||
void namespacesPerTenant() {
|
||||
Map<String, List<String>> expected = getNamespacesPerTenant();
|
||||
Map<String, List<String>> result = metadataMigrationService(
|
||||
expected
|
||||
).namespacesPerTenant();
|
||||
|
||||
assertThat(result).hasSize(expected.size());
|
||||
expected.forEach((tenantId, namespaces) -> {
|
||||
assertThat(result.get(tenantId)).containsExactlyInAnyOrderElementsOf(
|
||||
Stream.concat(
|
||||
Stream.of(SYSTEM_NAMESPACE),
|
||||
namespaces.stream()
|
||||
).map(NamespaceUtils::asTree).flatMap(Collection::stream).distinct().toList()
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
protected Map<String, List<String>> getNamespacesPerTenant() {
|
||||
return Map.of(TENANT_ID, List.of("my.first.namespace", "my.second.namespace", "another.namespace"));
|
||||
}
|
||||
|
||||
protected T metadataMigrationService(Map<String, List<String>> namespacesPerTenant) {
|
||||
FlowRepositoryInterface mockedFlowRepository = Mockito.mock(FlowRepositoryInterface.class);
|
||||
Mockito.doAnswer((params) -> namespacesPerTenant.get(params.getArgument(0).toString())).when(mockedFlowRepository).findDistinctNamespace(Mockito.anyString());
|
||||
NamespaceUtils namespaceUtils = Mockito.mock(NamespaceUtils.class);
|
||||
Mockito.when(namespaceUtils.getSystemFlowNamespace()).thenReturn(SYSTEM_NAMESPACE);
|
||||
//noinspection unchecked
|
||||
return ((T) new MetadataMigrationService(mockedFlowRepository, new TenantService() {
|
||||
@Override
|
||||
public String resolveTenant() {
|
||||
return TENANT_ID;
|
||||
}
|
||||
}, null, null, null, namespaceUtils));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,175 @@
|
||||
package io.kestra.cli.commands.migrations.metadata;
|
||||
|
||||
import io.kestra.cli.App;
|
||||
import io.kestra.core.exceptions.ResourceExpiredException;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.GenericFlow;
|
||||
import io.kestra.core.models.kv.PersistedKvMetadata;
|
||||
import io.kestra.core.models.namespaces.files.NamespaceFileMetadata;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.repositories.KvMetadataRepositoryInterface;
|
||||
import io.kestra.core.repositories.NamespaceFileMetadataRepositoryInterface;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.storages.*;
|
||||
import io.kestra.core.storages.kv.*;
|
||||
import io.kestra.core.tenant.TenantService;
|
||||
import io.kestra.core.utils.TestsUtils;
|
||||
import io.kestra.plugin.core.log.Log;
|
||||
import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import io.micronaut.core.annotation.NonNull;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.*;
|
||||
import java.net.URI;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
||||
|
||||
public class NsFilesMetadataMigrationCommandTest {
|
||||
@Test
|
||||
void run() throws IOException {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
ByteArrayOutputStream err = new ByteArrayOutputStream();
|
||||
System.setErr(new PrintStream(err));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
/* Initial setup:
|
||||
* - namespace 1: my/path, value
|
||||
* - namespace 1: another/path
|
||||
* - namespace 2: yet/another/path
|
||||
* - Nothing in database */
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
String path = "/my/path";
|
||||
StorageInterface storage = ctx.getBean(StorageInterface.class);
|
||||
String value = "someValue";
|
||||
putOldNsFile(storage, namespace, path, value);
|
||||
|
||||
String anotherPath = "/another/path";
|
||||
String anotherValue = "anotherValue";
|
||||
putOldNsFile(storage, namespace, anotherPath, anotherValue);
|
||||
|
||||
String anotherNamespace = TestsUtils.randomNamespace();
|
||||
String yetAnotherPath = "/yet/another/path";
|
||||
String yetAnotherValue = "yetAnotherValue";
|
||||
putOldNsFile(storage, anotherNamespace, yetAnotherPath, yetAnotherValue);
|
||||
|
||||
NamespaceFileMetadataRepositoryInterface namespaceFileMetadataRepository = ctx.getBean(NamespaceFileMetadataRepositoryInterface.class);
|
||||
String tenantId = TenantService.MAIN_TENANT;
|
||||
assertThat(namespaceFileMetadataRepository.findByPath(tenantId, namespace, path).isPresent()).isFalse();
|
||||
|
||||
/* Expected outcome from the migration command:
|
||||
* - no namespace files has been migrated because no flow exist in the namespace so they are not picked up because we don't know they exist */
|
||||
String[] nsFilesMetadataMigrationCommand = {
|
||||
"migrate", "metadata", "nsfiles"
|
||||
};
|
||||
PicocliRunner.call(App.class, ctx, nsFilesMetadataMigrationCommand);
|
||||
|
||||
|
||||
assertThat(out.toString()).contains("✅ Namespace Files Metadata migration complete.");
|
||||
// Still it's not in the metadata repository because no flow exist to find that namespace file
|
||||
assertThat(namespaceFileMetadataRepository.findByPath(tenantId, namespace, path).isPresent()).isFalse();
|
||||
assertThat(namespaceFileMetadataRepository.findByPath(tenantId, namespace, anotherPath).isPresent()).isFalse();
|
||||
assertThat(namespaceFileMetadataRepository.findByPath(tenantId, anotherNamespace, yetAnotherPath).isPresent()).isFalse();
|
||||
|
||||
// A flow is created from namespace 1, so the namespace files in this namespace should be migrated
|
||||
FlowRepositoryInterface flowRepository = ctx.getBean(FlowRepositoryInterface.class);
|
||||
flowRepository.create(GenericFlow.of(Flow.builder()
|
||||
.tenantId(tenantId)
|
||||
.id("a-flow")
|
||||
.namespace(namespace)
|
||||
.tasks(List.of(Log.builder().id("log").type(Log.class.getName()).message("logging").build()))
|
||||
.build()));
|
||||
|
||||
/* We run the migration again:
|
||||
* - namespace 1 my/path file is seen and metadata is migrated to database
|
||||
* - namespace 1 another/path file is seen and metadata is migrated to database
|
||||
* - namespace 2 yet/another/path is not seen because no flow exist in this namespace */
|
||||
out.reset();
|
||||
PicocliRunner.call(App.class, ctx, nsFilesMetadataMigrationCommand);
|
||||
|
||||
assertThat(out.toString()).contains("✅ Namespace Files Metadata migration complete.");
|
||||
Optional<NamespaceFileMetadata> foundNsFile = namespaceFileMetadataRepository.findByPath(tenantId, namespace, path);
|
||||
assertThat(foundNsFile.isPresent()).isTrue();
|
||||
assertThat(foundNsFile.get().getVersion()).isEqualTo(1);
|
||||
assertThat(foundNsFile.get().getSize()).isEqualTo(value.length());
|
||||
|
||||
Optional<NamespaceFileMetadata> anotherFoundNsFile = namespaceFileMetadataRepository.findByPath(tenantId, namespace, anotherPath);
|
||||
assertThat(anotherFoundNsFile.isPresent()).isTrue();
|
||||
assertThat(anotherFoundNsFile.get().getVersion()).isEqualTo(1);
|
||||
assertThat(anotherFoundNsFile.get().getSize()).isEqualTo(anotherValue.length());
|
||||
|
||||
NamespaceFactory namespaceFactory = ctx.getBean(NamespaceFactory.class);
|
||||
Namespace namespaceStorage = namespaceFactory.of(tenantId, namespace, storage);
|
||||
FileAttributes nsFileRawMetadata = namespaceStorage.getFileMetadata(Path.of(path));
|
||||
assertThat(nsFileRawMetadata.getSize()).isEqualTo(value.length());
|
||||
assertThat(new String(namespaceStorage.getFileContent(Path.of(path)).readAllBytes())).isEqualTo(value);
|
||||
|
||||
FileAttributes anotherNsFileRawMetadata = namespaceStorage.getFileMetadata(Path.of(anotherPath));
|
||||
assertThat(anotherNsFileRawMetadata.getSize()).isEqualTo(anotherValue.length());
|
||||
assertThat(new String(namespaceStorage.getFileContent(Path.of(anotherPath)).readAllBytes())).isEqualTo(anotherValue);
|
||||
|
||||
assertThat(namespaceFileMetadataRepository.findByPath(tenantId, anotherNamespace, yetAnotherPath).isPresent()).isFalse();
|
||||
assertThatThrownBy(() -> namespaceStorage.getFileMetadata(Path.of(yetAnotherPath))).isInstanceOf(FileNotFoundException.class);
|
||||
|
||||
/* We run one last time the migration without any change to verify that we don't resave an existing metadata.
|
||||
* It covers the case where user didn't perform the migrate command yet but they played and added some KV from the UI (so those ones will already be in metadata database). */
|
||||
out.reset();
|
||||
PicocliRunner.call(App.class, ctx, nsFilesMetadataMigrationCommand);
|
||||
|
||||
assertThat(out.toString()).contains("✅ Namespace Files Metadata migration complete.");
|
||||
foundNsFile = namespaceFileMetadataRepository.findByPath(tenantId, namespace, path);
|
||||
assertThat(foundNsFile.get().getVersion()).isEqualTo(1);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void namespaceWithoutNsFile() {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
System.setOut(new PrintStream(out));
|
||||
ByteArrayOutputStream err = new ByteArrayOutputStream();
|
||||
System.setErr(new PrintStream(err));
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
String tenantId = TenantService.MAIN_TENANT;
|
||||
String namespace = TestsUtils.randomNamespace();
|
||||
|
||||
// A flow is created from namespace 1, so the namespace files in this namespace should be migrated
|
||||
FlowRepositoryInterface flowRepository = ctx.getBean(FlowRepositoryInterface.class);
|
||||
flowRepository.create(GenericFlow.of(Flow.builder()
|
||||
.tenantId(tenantId)
|
||||
.id("a-flow")
|
||||
.namespace(namespace)
|
||||
.tasks(List.of(Log.builder().id("log").type(Log.class.getName()).message("logging").build()))
|
||||
.build()));
|
||||
|
||||
String[] nsFilesMetadataMigrationCommand = {
|
||||
"migrate", "metadata", "nsfiles"
|
||||
};
|
||||
PicocliRunner.call(App.class, ctx, nsFilesMetadataMigrationCommand);
|
||||
|
||||
assertThat(out.toString()).contains("✅ Namespace Files Metadata migration complete.");
|
||||
assertThat(err.toString()).doesNotContain("java.nio.file.NoSuchFileException");
|
||||
}
|
||||
}
|
||||
|
||||
private static void putOldNsFile(StorageInterface storage, String namespace, String path, String value) throws IOException {
|
||||
URI nsFileStorageUri = getNsFileStorageUri(namespace, path);
|
||||
storage.put(TenantService.MAIN_TENANT, namespace, nsFileStorageUri, new StorageObject(
|
||||
null,
|
||||
new ByteArrayInputStream(value.getBytes(StandardCharsets.UTF_8))
|
||||
));
|
||||
}
|
||||
|
||||
private static @NonNull URI getNsFileStorageUri(String namespace, String path) {
|
||||
return URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.namespaceFilePrefix(namespace) + path);
|
||||
}
|
||||
}
|
||||
@@ -4,7 +4,6 @@ import io.micronaut.configuration.picocli.PicocliRunner;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.env.Environment;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.File;
|
||||
@@ -15,7 +14,6 @@ import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@@ -25,7 +23,8 @@ class PluginDocCommandTest {
|
||||
|
||||
@Test
|
||||
void run() throws IOException, URISyntaxException {
|
||||
Path pluginsPath = Files.createTempDirectory(PluginListCommandTest.class.getSimpleName());
|
||||
var testDirectoryName = PluginListCommandTest.class.getSimpleName();
|
||||
Path pluginsPath = Files.createTempDirectory(testDirectoryName + "_pluginsPath_");
|
||||
pluginsPath.toFile().deleteOnExit();
|
||||
|
||||
FileUtils.copyFile(
|
||||
@@ -34,7 +33,7 @@ class PluginDocCommandTest {
|
||||
new File(URI.create("file://" + pluginsPath.toAbsolutePath() + "/" + PLUGIN_TEMPLATE_TEST))
|
||||
);
|
||||
|
||||
Path docPath = Files.createTempDirectory(PluginInstallCommandTest.class.getSimpleName());
|
||||
Path docPath = Files.createTempDirectory(testDirectoryName + "_docPath_");
|
||||
docPath.toFile().deleteOnExit();
|
||||
|
||||
try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
|
||||
@@ -43,9 +42,9 @@ class PluginDocCommandTest {
|
||||
|
||||
List<Path> files = Files.list(docPath).toList();
|
||||
|
||||
assertThat(files.size()).isEqualTo(1);
|
||||
assertThat(files.getFirst().getFileName().toString()).isEqualTo("plugin-template-test");
|
||||
var directory = files.getFirst().toFile();
|
||||
assertThat(files.stream().map(path -> path.getFileName().toString())).contains("plugin-template-test");
|
||||
// don't know why, but sometimes there is an addition "plugin-notifications" directory present
|
||||
var directory = files.stream().filter(path -> "plugin-template-test".equals(path.getFileName().toString())).findFirst().get().toFile();
|
||||
assertThat(directory.isDirectory()).isTrue();
|
||||
assertThat(directory.listFiles().length).isEqualTo(3);
|
||||
|
||||
|
||||
@@ -55,11 +55,7 @@ class StateStoreMigrateCommandTest {
|
||||
);
|
||||
assertThat(storage.exists(tenantId, flow.getNamespace(), oldStateStoreUri)).isTrue();
|
||||
|
||||
RunContext runContext = ctx.getBean(RunContextFactory.class).of(flow, Map.of("flow", Map.of(
|
||||
"tenantId", tenantId,
|
||||
"id", flow.getId(),
|
||||
"namespace", flow.getNamespace()
|
||||
)));
|
||||
RunContext runContext = ctx.getBean(RunContextFactory.class).of(flow, Map.of());
|
||||
StateStore stateStore = new StateStore(runContext, true);
|
||||
Assertions.assertThrows(MigrationRequiredException.class, () -> stateStore.getState(true, "my-state", "sub-name", "my-taskrun-value"));
|
||||
|
||||
|
||||
@@ -19,7 +19,6 @@ import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.junitpioneer.jupiter.RetryingTest;
|
||||
|
||||
import static io.kestra.core.utils.Rethrow.throwRunnable;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
@@ -59,7 +58,7 @@ class FileChangedEventListenerTest {
|
||||
}
|
||||
|
||||
@FlakyTest
|
||||
@RetryingTest(2)
|
||||
@Test
|
||||
void test() throws IOException, TimeoutException {
|
||||
var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getSimpleName(), "test");
|
||||
// remove the flow if it already exists
|
||||
@@ -98,7 +97,7 @@ class FileChangedEventListenerTest {
|
||||
}
|
||||
|
||||
@FlakyTest
|
||||
@RetryingTest(2)
|
||||
@Test
|
||||
void testWithPluginDefault() throws IOException, TimeoutException {
|
||||
var tenant = TestsUtils.randomTenant(FileChangedEventListenerTest.class.getName(), "testWithPluginDefault");
|
||||
// remove the flow if it already exists
|
||||
@@ -138,4 +137,4 @@ class FileChangedEventListenerTest {
|
||||
Duration.ofSeconds(10)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -82,8 +82,8 @@ dependencies {
|
||||
testImplementation "io.micronaut:micronaut-http-server-netty"
|
||||
testImplementation "io.micronaut:micronaut-management"
|
||||
|
||||
testImplementation "org.testcontainers:testcontainers:1.21.3"
|
||||
testImplementation "org.testcontainers:junit-jupiter:1.21.3"
|
||||
testImplementation "org.testcontainers:testcontainers:1.21.4"
|
||||
testImplementation "org.testcontainers:junit-jupiter:1.21.4"
|
||||
testImplementation "org.bouncycastle:bcpkix-jdk18on"
|
||||
|
||||
testImplementation "org.wiremock:wiremock-jetty12"
|
||||
|
||||
@@ -15,6 +15,7 @@ import org.slf4j.LoggerFactory;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@@ -84,6 +85,11 @@ public abstract class KestraContext {
|
||||
|
||||
public abstract StorageInterface getStorageInterface();
|
||||
|
||||
/**
|
||||
* Returns the Micronaut active environments.
|
||||
*/
|
||||
public abstract Set<String> getEnvironments();
|
||||
|
||||
/**
|
||||
* Shutdowns the Kestra application.
|
||||
*/
|
||||
@@ -182,5 +188,10 @@ public abstract class KestraContext {
|
||||
// Lazy init of the PluginRegistry.
|
||||
return this.applicationContext.getBean(StorageInterface.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getEnvironments() {
|
||||
return this.applicationContext.getEnvironment().getActiveNames();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,13 +42,12 @@ import io.kestra.core.plugins.PluginRegistry;
|
||||
import io.kestra.core.plugins.RegisteredPlugin;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
import io.swagger.v3.oas.annotations.media.Content;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.lang.reflect.*;
|
||||
import java.time.*;
|
||||
@@ -299,7 +298,9 @@ public class JsonSchemaGenerator {
|
||||
}
|
||||
|
||||
// default value
|
||||
builder.forFields().withDefaultResolver(this::defaults);
|
||||
builder.forFields()
|
||||
.withIgnoreCheck(fieldScope -> fieldScope.getAnnotation(Hidden.class) != null)
|
||||
.withDefaultResolver(this::defaults);
|
||||
|
||||
// def name
|
||||
builder.forTypesInGeneral()
|
||||
@@ -809,9 +810,9 @@ public class JsonSchemaGenerator {
|
||||
// we don't return base properties unless specified with @PluginProperty and hidden is false
|
||||
builder
|
||||
.forFields()
|
||||
.withIgnoreCheck(fieldScope -> base != null &&
|
||||
.withIgnoreCheck(fieldScope -> (base != null &&
|
||||
(fieldScope.getAnnotation(PluginProperty.class) == null || fieldScope.getAnnotation(PluginProperty.class).hidden()) &&
|
||||
fieldScope.getDeclaringType().getTypeName().equals(base.getName())
|
||||
fieldScope.getDeclaringType().getTypeName().equals(base.getName())) || fieldScope.getAnnotation(Hidden.class) != null
|
||||
);
|
||||
|
||||
SchemaGeneratorConfig schemaGeneratorConfig = builder.build();
|
||||
|
||||
@@ -3,6 +3,7 @@ package io.kestra.core.docs;
|
||||
import io.kestra.core.models.annotations.PluginSubGroup;
|
||||
import io.kestra.core.plugins.RegisteredPlugin;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@@ -117,10 +118,17 @@ public class Plugin {
|
||||
.filter(not(io.kestra.core.models.Plugin::isInternal))
|
||||
.filter(clazzFilter)
|
||||
.filter(c -> !c.getName().startsWith("org.kestra."))
|
||||
.map(c -> new PluginElementMetadata(c.getName(), io.kestra.core.models.Plugin.isDeprecated(c) ? true : null))
|
||||
.map(c -> {
|
||||
Schema schema = c.getAnnotation(Schema.class);
|
||||
|
||||
var title = Optional.ofNullable(schema).map(Schema::title).filter(t -> !t.isEmpty()).orElse(null);
|
||||
var description = Optional.ofNullable(schema).map(Schema::description).filter(d -> !d.isEmpty()).orElse(null);
|
||||
var deprecated = io.kestra.core.models.Plugin.isDeprecated(c) ? true : null;
|
||||
|
||||
return new PluginElementMetadata(c.getName(), deprecated, title, description);
|
||||
})
|
||||
.toList();
|
||||
}
|
||||
|
||||
public record PluginElementMetadata(String cls, Boolean deprecated) {
|
||||
}
|
||||
public record PluginElementMetadata(String cls, Boolean deprecated, String title, String description) {}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,23 @@
|
||||
package io.kestra.core.exceptions;
|
||||
|
||||
/**
|
||||
* Exception that can be thrown when a Flow is not found.
|
||||
*/
|
||||
public class FlowNotFoundException extends NotFoundException {
|
||||
|
||||
/**
|
||||
* Creates a new {@link FlowNotFoundException} instance.
|
||||
*/
|
||||
public FlowNotFoundException() {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link NotFoundException} instance.
|
||||
*
|
||||
* @param message the error message.
|
||||
*/
|
||||
public FlowNotFoundException(final String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
package io.kestra.core.exceptions;
|
||||
|
||||
import io.kestra.core.models.flows.Data;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.flows.Output;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Exception that can be thrown when Inputs/Outputs have validation problems.
|
||||
*/
|
||||
public class InputOutputValidationException extends KestraRuntimeException {
|
||||
public InputOutputValidationException(String message) {
|
||||
super(message);
|
||||
}
|
||||
public static InputOutputValidationException of( String message, Input<?> input){
|
||||
String inputMessage = "Invalid value for input" + " `" + input.getId() + "`. Cause: " + message;
|
||||
return new InputOutputValidationException(inputMessage);
|
||||
}
|
||||
public static InputOutputValidationException of( String message, Output output){
|
||||
String outputMessage = "Invalid value for output" + " `" + output.getId() + "`. Cause: " + message;
|
||||
return new InputOutputValidationException(outputMessage);
|
||||
}
|
||||
public static InputOutputValidationException of(String message){
|
||||
return new InputOutputValidationException(message);
|
||||
}
|
||||
|
||||
public static InputOutputValidationException merge(Set<InputOutputValidationException> exceptions){
|
||||
String combinedMessage = exceptions.stream()
|
||||
.map(InputOutputValidationException::getMessage)
|
||||
.collect(Collectors.joining(System.lineSeparator()));
|
||||
throw new InputOutputValidationException(combinedMessage);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,6 +1,8 @@
|
||||
package io.kestra.core.exceptions;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* The top-level {@link KestraRuntimeException} for non-recoverable errors.
|
||||
|
||||
@@ -0,0 +1,15 @@
|
||||
package io.kestra.core.exceptions;
|
||||
|
||||
import java.io.Serial;
|
||||
|
||||
public class ResourceAccessDeniedException extends KestraRuntimeException {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public ResourceAccessDeniedException() {
|
||||
}
|
||||
|
||||
public ResourceAccessDeniedException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
@@ -7,7 +7,6 @@ import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.List;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.zip.ZipEntry;
|
||||
import java.util.zip.ZipInputStream;
|
||||
@@ -65,7 +64,7 @@ public interface HasSource {
|
||||
|
||||
if (isYAML(fileName)) {
|
||||
byte[] bytes = inputStream.readAllBytes();
|
||||
List<String> sources = List.of(new String(bytes).split("---"));
|
||||
List<String> sources = List.of(new String(bytes).split("(?m)^---\\s*$"));
|
||||
for (int i = 0; i < sources.size(); i++) {
|
||||
String source = sources.get(i);
|
||||
reader.accept(source, String.valueOf(i));
|
||||
|
||||
@@ -4,13 +4,16 @@ import io.kestra.core.utils.MapUtils;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Schema(description = "A key/value pair that can be attached to a Flow or Execution. Labels are often used to organize and categorize objects.")
|
||||
public record Label(@NotEmpty String key, @NotEmpty String value) {
|
||||
public record Label(
|
||||
@NotEmpty @Pattern(regexp = "^[\\p{Ll}][\\p{L}0-9._-]*$", message = "Invalid label key. A valid key contains only lowercase letters numbers hyphens (-) underscores (_) or periods (.) and must begin with a lowercase letter.") String key,
|
||||
@NotEmpty String value) {
|
||||
public static final String SYSTEM_PREFIX = "system.";
|
||||
|
||||
// system labels
|
||||
@@ -23,6 +26,7 @@ public record Label(@NotEmpty String key, @NotEmpty String value) {
|
||||
public static final String REPLAYED = SYSTEM_PREFIX + "replayed";
|
||||
public static final String SIMULATED_EXECUTION = SYSTEM_PREFIX + "simulatedExecution";
|
||||
public static final String TEST = SYSTEM_PREFIX + "test";
|
||||
public static final String FROM = SYSTEM_PREFIX + "from";
|
||||
|
||||
/**
|
||||
* Static helper method for converting a list of labels to a nested map.
|
||||
|
||||
@@ -94,7 +94,7 @@ public record QueryFilter(
|
||||
KIND("kind") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS,Op.NOT_EQUALS);
|
||||
return List.of(Op.EQUALS,Op.NOT_EQUALS, Op.IN, Op.NOT_IN);
|
||||
}
|
||||
},
|
||||
LABELS("labels") {
|
||||
@@ -106,7 +106,7 @@ public record QueryFilter(
|
||||
FLOW_ID("flowId") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX);
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.REGEX, Op.IN, Op.NOT_IN, Op.PREFIX);
|
||||
}
|
||||
},
|
||||
UPDATED("updated") {
|
||||
@@ -151,6 +151,12 @@ public record QueryFilter(
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
|
||||
}
|
||||
},
|
||||
TRIGGER_STATE("triggerState"){
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS);
|
||||
}
|
||||
},
|
||||
EXECUTION_ID("executionId") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
@@ -180,6 +186,24 @@ public record QueryFilter(
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS);
|
||||
}
|
||||
},
|
||||
PATH("path") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.IN);
|
||||
}
|
||||
},
|
||||
PARENT_PATH("parentPath") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.STARTS_WITH);
|
||||
}
|
||||
},
|
||||
VERSION("version") {
|
||||
@Override
|
||||
public List<Op> supportedOp() {
|
||||
return List.of(Op.EQUALS, Op.NOT_EQUALS);
|
||||
}
|
||||
};
|
||||
|
||||
private static final Map<String, Field> BY_VALUE = Arrays.stream(values())
|
||||
@@ -208,7 +232,7 @@ public record QueryFilter(
|
||||
FLOW {
|
||||
@Override
|
||||
public List<Field> supportedField() {
|
||||
return List.of(Field.LABELS, Field.NAMESPACE, Field.QUERY, Field.SCOPE);
|
||||
return List.of(Field.LABELS, Field.NAMESPACE, Field.QUERY, Field.SCOPE, Field.FLOW_ID);
|
||||
}
|
||||
},
|
||||
NAMESPACE {
|
||||
@@ -223,7 +247,7 @@ public record QueryFilter(
|
||||
return List.of(
|
||||
Field.QUERY, Field.SCOPE, Field.FLOW_ID, Field.START_DATE, Field.END_DATE,
|
||||
Field.STATE, Field.LABELS, Field.TRIGGER_EXECUTION_ID, Field.CHILD_FILTER,
|
||||
Field.NAMESPACE,Field.KIND
|
||||
Field.NAMESPACE, Field.KIND
|
||||
);
|
||||
}
|
||||
},
|
||||
@@ -253,7 +277,7 @@ public record QueryFilter(
|
||||
@Override
|
||||
public List<Field> supportedField() {
|
||||
return List.of(Field.QUERY, Field.SCOPE, Field.NAMESPACE, Field.WORKER_ID, Field.FLOW_ID,
|
||||
Field.START_DATE, Field.END_DATE, Field.TRIGGER_ID
|
||||
Field.START_DATE, Field.END_DATE, Field.TRIGGER_ID, Field.TRIGGER_STATE
|
||||
);
|
||||
}
|
||||
},
|
||||
@@ -275,6 +299,19 @@ public record QueryFilter(
|
||||
Field.UPDATED
|
||||
);
|
||||
}
|
||||
},
|
||||
NAMESPACE_FILE_METADATA {
|
||||
@Override
|
||||
public List<Field> supportedField() {
|
||||
return List.of(
|
||||
Field.QUERY,
|
||||
Field.NAMESPACE,
|
||||
Field.PATH,
|
||||
Field.PARENT_PATH,
|
||||
Field.VERSION,
|
||||
Field.UPDATED
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
public abstract List<Field> supportedField();
|
||||
|
||||
@@ -16,6 +16,7 @@ import jakarta.validation.constraints.NotNull;
|
||||
public class Setting {
|
||||
public static final String INSTANCE_UUID = "instance.uuid";
|
||||
public static final String INSTANCE_VERSION = "instance.version";
|
||||
public static final String INSTANCE_EDITION = "instance.edition";
|
||||
|
||||
@NotNull
|
||||
private String key;
|
||||
|
||||
@@ -658,18 +658,20 @@ public class Execution implements DeletedInterface, TenantInterface {
|
||||
public boolean hasFailedNoRetry(List<ResolvedTask> resolvedTasks, TaskRun parentTaskRun) {
|
||||
return this.findTaskRunByTasks(resolvedTasks, parentTaskRun)
|
||||
.stream()
|
||||
.anyMatch(taskRun -> {
|
||||
ResolvedTask resolvedTask = resolvedTasks.stream()
|
||||
.filter(t -> t.getTask().getId().equals(taskRun.getTaskId())).findFirst()
|
||||
.orElse(null);
|
||||
if (resolvedTask == null) {
|
||||
log.warn("Can't find task for taskRun '{}' in parentTaskRun '{}'",
|
||||
taskRun.getId(), parentTaskRun.getId());
|
||||
return false;
|
||||
}
|
||||
return !taskRun.shouldBeRetried(resolvedTask.getTask().getRetry())
|
||||
&& taskRun.getState().isFailed();
|
||||
});
|
||||
// NOTE: we check on isFailed first to avoid the costly shouldBeRetried() method
|
||||
.anyMatch(taskRun -> taskRun.getState().isFailed() && shouldNotBeRetried(resolvedTasks, parentTaskRun, taskRun));
|
||||
}
|
||||
|
||||
private static boolean shouldNotBeRetried(List<ResolvedTask> resolvedTasks, TaskRun parentTaskRun, TaskRun taskRun) {
|
||||
ResolvedTask resolvedTask = resolvedTasks.stream()
|
||||
.filter(t -> t.getTask().getId().equals(taskRun.getTaskId())).findFirst()
|
||||
.orElse(null);
|
||||
if (resolvedTask == null) {
|
||||
log.warn("Can't find task for taskRun '{}' in parentTaskRun '{}'",
|
||||
taskRun.getId(), parentTaskRun.getId());
|
||||
return false;
|
||||
}
|
||||
return !taskRun.shouldBeRetried(resolvedTask.getTask().getRetry());
|
||||
}
|
||||
|
||||
public boolean hasCreated() {
|
||||
|
||||
@@ -1,15 +1,16 @@
|
||||
package io.kestra.core.models.executions;
|
||||
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import lombok.Builder;
|
||||
import lombok.Value;
|
||||
import io.kestra.core.models.tasks.Output;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.Builder;
|
||||
import lombok.Value;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
|
||||
@Value
|
||||
@Builder
|
||||
@@ -21,6 +22,7 @@ public class ExecutionTrigger {
|
||||
@NotNull
|
||||
String type;
|
||||
|
||||
@Schema(type = "object", additionalProperties = Schema.AdditionalPropertiesValue.TRUE)
|
||||
Map<String, Object> variables;
|
||||
|
||||
URI logFile;
|
||||
|
||||
@@ -3,9 +3,7 @@ package io.kestra.core.models.executions;
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import io.kestra.core.models.TenantInterface;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.tasks.FlowableTask;
|
||||
import io.kestra.core.models.tasks.ResolvedTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.tasks.retrys.AbstractRetry;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
@@ -95,8 +93,16 @@ public class TaskRun implements TenantInterface {
|
||||
this.forceExecution
|
||||
);
|
||||
}
|
||||
public TaskRun withStateAndAttempt(State.Type state) {
|
||||
List<TaskRunAttempt> newAttempts = new ArrayList<>(this.attempts != null ? this.attempts : List.of());
|
||||
|
||||
if (newAttempts.isEmpty()) {
|
||||
newAttempts.add(TaskRunAttempt.builder().state(new State(state)).build());
|
||||
} else {
|
||||
TaskRunAttempt updatedLast = newAttempts.getLast().withState(state);
|
||||
newAttempts.set(newAttempts.size() - 1, updatedLast);
|
||||
}
|
||||
|
||||
public TaskRun replaceState(State newState) {
|
||||
return new TaskRun(
|
||||
this.tenantId,
|
||||
this.id,
|
||||
@@ -106,9 +112,9 @@ public class TaskRun implements TenantInterface {
|
||||
this.taskId,
|
||||
this.parentTaskRunId,
|
||||
this.value,
|
||||
this.attempts,
|
||||
newAttempts,
|
||||
this.outputs,
|
||||
newState,
|
||||
this.state.withState(state),
|
||||
this.iteration,
|
||||
this.dynamic,
|
||||
this.forceExecution
|
||||
@@ -314,4 +320,11 @@ public class TaskRun implements TenantInterface {
|
||||
.build();
|
||||
}
|
||||
|
||||
public TaskRun addAttempt(TaskRunAttempt attempt) {
|
||||
if (this.attempts == null) {
|
||||
this.attempts = new ArrayList<>();
|
||||
}
|
||||
this.attempts.add(attempt);
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,4 +24,8 @@ public class Concurrency {
|
||||
public enum Behavior {
|
||||
QUEUE, CANCEL, FAIL;
|
||||
}
|
||||
|
||||
public static boolean possibleTransitions(State.Type type) {
|
||||
return type.equals(State.Type.CANCELLED) || type.equals(State.Type.FAILED);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
package io.kestra.core.models.flows;
|
||||
|
||||
import io.kestra.core.models.validations.ManualConstraintViolation;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
|
||||
/**
|
||||
* Interface for defining an identifiable and typed data.
|
||||
@@ -29,16 +27,4 @@ public interface Data {
|
||||
*/
|
||||
String getDisplayName();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
default ConstraintViolationException toConstraintViolationException(String message, Object value) {
|
||||
Class<Data> cls = (Class<Data>) this.getClass();
|
||||
|
||||
return ManualConstraintViolation.toConstraintViolationException(
|
||||
"Invalid " + (this instanceof Output ? "output" : "input") + " for `" + getId() + "`, " + message + ", but received `" + value + "`",
|
||||
this,
|
||||
cls,
|
||||
this.getId(),
|
||||
value
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package io.kestra.core.models.flows;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
@@ -11,6 +10,7 @@ import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.flows.check.Check;
|
||||
import io.kestra.core.models.flows.sla.SLA;
|
||||
import io.kestra.core.models.listeners.Listener;
|
||||
import io.kestra.core.models.tasks.FlowableTask;
|
||||
@@ -130,6 +130,14 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
@PluginProperty
|
||||
List<SLA> sla;
|
||||
|
||||
@Schema(
|
||||
title = "Conditions evaluated before the flow is executed.",
|
||||
description = "A list of conditions that are evaluated before the flow is executed. If no checks are defined, the flow executes normally."
|
||||
)
|
||||
@Valid
|
||||
@PluginProperty
|
||||
List<Check> checks;
|
||||
|
||||
public Stream<String> allTypes() {
|
||||
return Stream.of(
|
||||
Optional.ofNullable(triggers).orElse(Collections.emptyList()).stream().map(AbstractTrigger::getType),
|
||||
@@ -346,7 +354,7 @@ public class Flow extends AbstractFlow implements HasUID {
|
||||
* To be conservative a flow MUST not return any source.
|
||||
*/
|
||||
@Override
|
||||
@JsonIgnore
|
||||
@Schema(hidden = true)
|
||||
public String getSource() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -1,14 +1,12 @@
|
||||
package io.kestra.core.models.flows;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import io.micronaut.core.annotation.Introspected;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
@SuperBuilder(toBuilder = true)
|
||||
@Getter
|
||||
@@ -43,11 +41,12 @@ public class FlowWithSource extends Flow {
|
||||
.concurrency(this.concurrency)
|
||||
.retry(this.retry)
|
||||
.sla(this.sla)
|
||||
.checks(this.checks)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonIgnore(value = false)
|
||||
@Schema(hidden = false)
|
||||
public String getSource() {
|
||||
return this.source;
|
||||
}
|
||||
@@ -85,6 +84,7 @@ public class FlowWithSource extends Flow {
|
||||
.concurrency(flow.concurrency)
|
||||
.retry(flow.retry)
|
||||
.sla(flow.sla)
|
||||
.checks(flow.checks)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -84,12 +84,24 @@ public class State {
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* non-terminated execution duration is hard to provide in SQL, so we set it to null when endDate is empty
|
||||
*/
|
||||
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
|
||||
public Duration getDuration() {
|
||||
return Duration.between(
|
||||
this.histories.getFirst().getDate(),
|
||||
this.histories.size() > 1 ? this.histories.get(this.histories.size() - 1).getDate() : Instant.now()
|
||||
);
|
||||
@JsonInclude(JsonInclude.Include.NON_EMPTY)
|
||||
public Optional<Duration> getDuration() {
|
||||
if (this.getEndDate().isPresent()) {
|
||||
return Optional.of(Duration.between(this.getStartDate(), this.getEndDate().get()));
|
||||
} else {
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return either the Duration persisted in database, or calculate it on the fly for non-terminated executions
|
||||
*/
|
||||
public Duration getDurationOrComputeIt() {
|
||||
return this.getDuration().orElseGet(() -> Duration.between(this.getStartDate(), Instant.now()));
|
||||
}
|
||||
|
||||
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
|
||||
@@ -109,7 +121,7 @@ public class State {
|
||||
|
||||
public String humanDuration() {
|
||||
try {
|
||||
return DurationFormatUtils.formatDurationHMS(getDuration().toMillis());
|
||||
return DurationFormatUtils.formatDurationHMS(getDurationOrComputeIt().toMillis());
|
||||
} catch (Throwable e) {
|
||||
return getDuration().toString();
|
||||
}
|
||||
@@ -255,6 +267,10 @@ public class State {
|
||||
return this == Type.RUNNING || this == Type.KILLING;
|
||||
}
|
||||
|
||||
public boolean onlyRunning() {
|
||||
return this == Type.RUNNING;
|
||||
}
|
||||
|
||||
public boolean isFailed() {
|
||||
return this == Type.FAILED;
|
||||
}
|
||||
|
||||
109
core/src/main/java/io/kestra/core/models/flows/check/Check.java
Normal file
109
core/src/main/java/io/kestra/core/models/flows/check/Check.java
Normal file
@@ -0,0 +1,109 @@
|
||||
package io.kestra.core.models.flows.check;
|
||||
|
||||
import jakarta.validation.constraints.NotEmpty;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Represents a check within a Kestra flow.
|
||||
* <p>
|
||||
* A {@code Check} defines a boolean condition that is evaluated when validating flow's inputs
|
||||
* and before triggering an execution.
|
||||
* <p>
|
||||
* If the condition evaluates to {@code false}, the configured {@link Behavior}
|
||||
* determines how the execution proceeds, and the {@link Style} determines how
|
||||
* the message is visually presented in the UI.
|
||||
* </p>
|
||||
*/
|
||||
@SuperBuilder
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
public class Check {
|
||||
|
||||
/**
|
||||
* The condition to evaluate.
|
||||
*/
|
||||
@NotNull
|
||||
@NotEmpty
|
||||
String condition;
|
||||
|
||||
/**
|
||||
* The message associated with this check, will be displayed when the condition evaluates to {@code false}.
|
||||
*/
|
||||
@NotEmpty
|
||||
String message;
|
||||
|
||||
/**
|
||||
* Defines the style of the message displayed in the UI when the condition evaluates to {@code false}.
|
||||
*/
|
||||
Style style = Style.INFO;
|
||||
|
||||
/**
|
||||
* The behavior to apply when the condition evaluates to {@code false}.
|
||||
*/
|
||||
Behavior behavior = Behavior.BLOCK_EXECUTION;
|
||||
|
||||
/**
|
||||
* The visual style used to display the message when the check fails.
|
||||
*/
|
||||
public enum Style {
|
||||
/**
|
||||
* Display the message as an error.
|
||||
*/
|
||||
ERROR,
|
||||
/**
|
||||
* Display the message as a success indicator.
|
||||
*/
|
||||
SUCCESS,
|
||||
/**
|
||||
* Display the message as a warning.
|
||||
*/
|
||||
WARNING,
|
||||
/**
|
||||
* Display the message as informational content.
|
||||
*/
|
||||
INFO;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines how the flow should behave when the condition evaluates to {@code false}.
|
||||
*/
|
||||
public enum Behavior {
|
||||
/**
|
||||
* Block the creation of the execution.
|
||||
*/
|
||||
BLOCK_EXECUTION,
|
||||
/**
|
||||
* Create the execution as failed.
|
||||
*/
|
||||
FAIL_EXECUTION,
|
||||
/**
|
||||
* Create a new execution as a result of the check failing.
|
||||
*/
|
||||
CREATE_EXECUTION;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolves the effective behavior for a list of {@link Check}s based on priority.
|
||||
*
|
||||
* @param checks the list of checks whose behaviors are to be evaluated
|
||||
* @return the highest-priority behavior, or {@code CREATE_EXECUTION} if the list is empty or only contains nulls
|
||||
*/
|
||||
public static Check.Behavior resolveBehavior(List<Check> checks) {
|
||||
if (checks == null || checks.isEmpty()) {
|
||||
return Behavior.CREATE_EXECUTION;
|
||||
}
|
||||
|
||||
return checks.stream()
|
||||
.map(Check::getBehavior)
|
||||
.filter(Objects::nonNull).min(Comparator.comparingInt(Enum::ordinal))
|
||||
.orElse(Behavior.CREATE_EXECUTION);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,10 +1,12 @@
|
||||
package io.kestra.core.models.flows.input;
|
||||
|
||||
import io.kestra.core.exceptions.InputOutputValidationException;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Represents an input along with its associated value and validation state.
|
||||
*
|
||||
@@ -12,15 +14,15 @@ import jakarta.validation.constraints.NotNull;
|
||||
* @param value The provided value for the input.
|
||||
* @param enabled {@code true} if the input is enabled; {@code false} otherwise.
|
||||
* @param isDefault {@code true} if the provided value is the default; {@code false} otherwise.
|
||||
* @param exception The validation exception, if the input value is invalid; {@code null} otherwise.
|
||||
* @param exceptions The validation exceptions, if the input value is invalid; {@code null} otherwise.
|
||||
*/
|
||||
public record InputAndValue(
|
||||
Input<?> input,
|
||||
Object value,
|
||||
boolean enabled,
|
||||
boolean isDefault,
|
||||
ConstraintViolationException exception) {
|
||||
|
||||
Set<InputOutputValidationException> exceptions) {
|
||||
|
||||
/**
|
||||
* Creates a new {@link InputAndValue} instance.
|
||||
*
|
||||
|
||||
@@ -7,6 +7,7 @@ import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.validations.ManualConstraintViolation;
|
||||
import io.kestra.core.validations.Regex;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.ConstraintViolation;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.Builder;
|
||||
@@ -14,10 +15,7 @@ import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.*;
|
||||
import java.util.function.Function;
|
||||
|
||||
@SuperBuilder
|
||||
@@ -77,30 +75,35 @@ public class MultiselectInput extends Input<List<String>> implements ItemTypeInt
|
||||
|
||||
@Override
|
||||
public void validate(List<String> inputs) throws ConstraintViolationException {
|
||||
Set<ConstraintViolation<?>> violations = new HashSet<>();
|
||||
|
||||
if (values != null && options != null) {
|
||||
throw ManualConstraintViolation.toConstraintViolationException(
|
||||
violations.add( ManualConstraintViolation.of(
|
||||
"you can't define both `values` and `options`",
|
||||
this,
|
||||
MultiselectInput.class,
|
||||
getId(),
|
||||
""
|
||||
);
|
||||
));
|
||||
}
|
||||
|
||||
if (!this.getAllowCustomValue()) {
|
||||
for (String input : inputs) {
|
||||
List<@Regex String> finalValues = this.values != null ? this.values : this.options;
|
||||
if (!finalValues.contains(input)) {
|
||||
throw ManualConstraintViolation.toConstraintViolationException(
|
||||
"it must match the values `" + finalValues + "`",
|
||||
violations.add(ManualConstraintViolation.of(
|
||||
"value `" + input + "` doesn't match the values `" + finalValues + "`",
|
||||
this,
|
||||
MultiselectInput.class,
|
||||
getId(),
|
||||
input
|
||||
);
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!violations.isEmpty()) {
|
||||
throw ManualConstraintViolation.toConstraintViolationException(violations);
|
||||
}
|
||||
}
|
||||
|
||||
/** {@inheritDoc} **/
|
||||
@@ -145,7 +148,7 @@ public class MultiselectInput extends Input<List<String>> implements ItemTypeInt
|
||||
|
||||
String type = Optional.ofNullable(result).map(Object::getClass).map(Class::getSimpleName).orElse("<null>");
|
||||
throw ManualConstraintViolation.toConstraintViolationException(
|
||||
"Invalid expression result. Expected a list of strings, but received " + type,
|
||||
"Invalid expression result. Expected a list of strings",
|
||||
this,
|
||||
MultiselectInput.class,
|
||||
getId(),
|
||||
|
||||
@@ -125,7 +125,7 @@ public class SelectInput extends Input<String> implements RenderableInput {
|
||||
|
||||
String type = Optional.ofNullable(result).map(Object::getClass).map(Class::getSimpleName).orElse("<null>");
|
||||
throw ManualConstraintViolation.toConstraintViolationException(
|
||||
"Invalid expression result. Expected a list of strings, but received " + type,
|
||||
"Invalid expression result. Expected a list of strings",
|
||||
this,
|
||||
SelectInput.class,
|
||||
getId(),
|
||||
|
||||
@@ -20,7 +20,6 @@ import java.util.Optional;
|
||||
@Slf4j
|
||||
@Getter
|
||||
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
|
||||
@AllArgsConstructor
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
public class PersistedKvMetadata implements DeletedInterface, TenantInterface, HasUID {
|
||||
@@ -54,6 +53,19 @@ public class PersistedKvMetadata implements DeletedInterface, TenantInterface, H
|
||||
|
||||
private boolean deleted;
|
||||
|
||||
public PersistedKvMetadata(String tenantId, String namespace, String name, String description, Integer version, boolean last, @Nullable Instant expirationDate, @Nullable Instant created, @Nullable Instant updated, boolean deleted) {
|
||||
this.tenantId = tenantId;
|
||||
this.namespace = namespace;
|
||||
this.name = name;
|
||||
this.description = description;
|
||||
this.version = version;
|
||||
this.last = last;
|
||||
this.expirationDate = expirationDate;
|
||||
this.created = Optional.ofNullable(created).orElse(Instant.now());
|
||||
this.updated = updated;
|
||||
this.deleted = deleted;
|
||||
}
|
||||
|
||||
public static PersistedKvMetadata from(String tenantId, KVEntry kvEntry) {
|
||||
return PersistedKvMetadata.builder()
|
||||
.tenantId(tenantId)
|
||||
@@ -68,12 +80,15 @@ public class PersistedKvMetadata implements DeletedInterface, TenantInterface, H
|
||||
}
|
||||
|
||||
public PersistedKvMetadata asLast() {
|
||||
Instant saveDate = Instant.now();
|
||||
return this.toBuilder().created(Optional.ofNullable(this.created).orElse(saveDate)).updated(saveDate).last(true).build();
|
||||
return this.toBuilder().updated(Instant.now()).last(true).build();
|
||||
}
|
||||
|
||||
public PersistedKvMetadata toDeleted() {
|
||||
return this.toBuilder().updated(Instant.now()).deleted(true).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String uid() {
|
||||
return IdUtils.fromParts(getTenantId(), getNamespace(), getName(), getVersion().toString());
|
||||
return IdUtils.fromParts(getTenantId(), getNamespace(), getName(), String.valueOf(getVersion()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,132 @@
|
||||
package io.kestra.core.models.namespaces.files;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import io.kestra.core.models.DeletedInterface;
|
||||
import io.kestra.core.models.HasUID;
|
||||
import io.kestra.core.models.TenantInterface;
|
||||
import io.kestra.core.storages.FileAttributes;
|
||||
import io.kestra.core.storages.NamespaceFile;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.swagger.v3.oas.annotations.Hidden;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
import lombok.*;
|
||||
import lombok.experimental.FieldDefaults;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.time.Instant;
|
||||
|
||||
@Builder(toBuilder = true)
|
||||
@Slf4j
|
||||
@Getter
|
||||
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
public class NamespaceFileMetadata implements DeletedInterface, TenantInterface, HasUID {
|
||||
@With
|
||||
@Hidden
|
||||
@Pattern(regexp = "^[a-z0-9][a-z0-9_-]*")
|
||||
private String tenantId;
|
||||
|
||||
@NotNull
|
||||
private String namespace;
|
||||
|
||||
@NotNull
|
||||
private String path;
|
||||
|
||||
private String parentPath;
|
||||
|
||||
@NotNull
|
||||
private Integer version;
|
||||
|
||||
@Builder.Default
|
||||
private boolean last = true;
|
||||
|
||||
@NotNull
|
||||
private Long size;
|
||||
|
||||
@Builder.Default
|
||||
private Instant created = Instant.now();
|
||||
|
||||
@Nullable
|
||||
private Instant updated;
|
||||
|
||||
private boolean deleted;
|
||||
|
||||
@JsonCreator
|
||||
public NamespaceFileMetadata(String tenantId, String namespace, String path, String parentPath, Integer version, boolean last, Long size, Instant created, @Nullable Instant updated, boolean deleted) {
|
||||
this.tenantId = tenantId;
|
||||
this.namespace = namespace;
|
||||
this.path = path;
|
||||
this.parentPath = parentPath(path);
|
||||
this.version = version;
|
||||
this.last = last;
|
||||
this.size = size;
|
||||
this.created = created;
|
||||
this.updated = updated;
|
||||
this.deleted = deleted;
|
||||
}
|
||||
|
||||
public static String path(String path, boolean trailingSlash) {
|
||||
if (trailingSlash && !path.endsWith("/")) {
|
||||
return path + "/";
|
||||
} else if (!trailingSlash && path.endsWith("/")) {
|
||||
return path.substring(0, path.length() - 1);
|
||||
}
|
||||
return path;
|
||||
}
|
||||
|
||||
public String path(boolean trailingSlash) {
|
||||
return path(this.path, trailingSlash);
|
||||
}
|
||||
|
||||
public static String parentPath(String path) {
|
||||
String withoutTrailingSlash = path.endsWith("/") ? path.substring(0, path.length() - 1) : path;
|
||||
// The parent path can't be set, it's always computed
|
||||
return withoutTrailingSlash.contains("/") ?
|
||||
withoutTrailingSlash.substring(0, withoutTrailingSlash.lastIndexOf("/") + 1) :
|
||||
null;
|
||||
}
|
||||
|
||||
public static NamespaceFileMetadata of(String tenantId, NamespaceFile namespaceFile) {
|
||||
return NamespaceFileMetadata.builder()
|
||||
.tenantId(tenantId)
|
||||
.namespace(namespaceFile.namespace())
|
||||
.path(namespaceFile.path(true).toString())
|
||||
.version(namespaceFile.version())
|
||||
.build();
|
||||
}
|
||||
|
||||
public static NamespaceFileMetadata of(String tenantId, String namespace, String path, FileAttributes fileAttributes) {
|
||||
return NamespaceFileMetadata.builder()
|
||||
.tenantId(tenantId)
|
||||
.namespace(namespace)
|
||||
.path(path)
|
||||
.created(Instant.ofEpochMilli(fileAttributes.getCreationTime()))
|
||||
.updated(Instant.ofEpochMilli(fileAttributes.getLastModifiedTime()))
|
||||
.size(fileAttributes.getSize())
|
||||
.version(1)
|
||||
.build();
|
||||
}
|
||||
|
||||
public NamespaceFileMetadata asLast() {
|
||||
Instant saveDate = Instant.now();
|
||||
return this.toBuilder().updated(saveDate).last(true).build();
|
||||
}
|
||||
|
||||
public NamespaceFileMetadata toDeleted() {
|
||||
return this.toBuilder().deleted(true).updated(Instant.now()).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String uid() {
|
||||
return IdUtils.fromParts(getTenantId(), getNamespace(), getPath(), String.valueOf(getVersion()));
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public boolean isDirectory() {
|
||||
return this.path.endsWith("/");
|
||||
}
|
||||
}
|
||||
@@ -11,6 +11,7 @@ import com.fasterxml.jackson.databind.ser.std.StdSerializer;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.RunContextProperty;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
@@ -35,7 +36,6 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
|
||||
@JsonDeserialize(using = Property.PropertyDeserializer.class)
|
||||
@JsonSerialize(using = Property.PropertySerializer.class)
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor(access = AccessLevel.PACKAGE)
|
||||
@Schema(
|
||||
oneOf = {
|
||||
@@ -51,6 +51,7 @@ public class Property<T> {
|
||||
.copy()
|
||||
.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false);
|
||||
|
||||
private final boolean skipCache;
|
||||
private String expression;
|
||||
private T value;
|
||||
|
||||
@@ -60,13 +61,23 @@ public class Property<T> {
|
||||
@Deprecated
|
||||
// Note: when not used, this constructor would not be deleted but made private so it can only be used by ofExpression(String) and the deserializer
|
||||
public Property(String expression) {
|
||||
this.expression = expression;
|
||||
this(expression, false);
|
||||
}
|
||||
|
||||
private Property(String expression, boolean skipCache) {
|
||||
this.expression = expression;
|
||||
this.skipCache = skipCache;
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated use {@link #ofValue(Object)} instead.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
@Deprecated
|
||||
public Property(Map<?, ?> map) {
|
||||
try {
|
||||
expression = MAPPER.writeValueAsString(map);
|
||||
this.skipCache = false;
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
@@ -79,14 +90,11 @@ public class Property<T> {
|
||||
/**
|
||||
* Returns a new {@link Property} with no cached rendered value,
|
||||
* so that the next render will evaluate its original Pebble expression.
|
||||
* <p>
|
||||
* The returned property will still cache its rendered result.
|
||||
* To re-evaluate on a subsequent render, call {@code skipCache()} again.
|
||||
*
|
||||
* @return a new {@link Property} without a pre-rendered value
|
||||
*/
|
||||
public Property<T> skipCache() {
|
||||
return Property.ofExpression(expression);
|
||||
return new Property<>(expression, true);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -133,6 +141,7 @@ public class Property<T> {
|
||||
|
||||
/**
|
||||
* Build a new Property object with a Pebble expression.<br>
|
||||
* This property object will not cache its rendered value.
|
||||
* <p>
|
||||
* Use {@link #ofValue(Object)} to build a property with a value instead.
|
||||
*/
|
||||
@@ -142,15 +151,15 @@ public class Property<T> {
|
||||
throw new IllegalArgumentException("'expression' must be a valid Pebble expression");
|
||||
}
|
||||
|
||||
return new Property<>(expression);
|
||||
return new Property<>(expression, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Render a property then convert it to its target type.<br>
|
||||
* Render a property, then convert it to its target type.<br>
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#as(Class)
|
||||
* @see RunContextProperty#as(Class)
|
||||
*/
|
||||
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||
return as(property, context, clazz, Map.of());
|
||||
@@ -159,25 +168,57 @@ public class Property<T> {
|
||||
/**
|
||||
* Render a property with additional variables, then convert it to its target type.<br>
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#as(Class, Map)
|
||||
* @see RunContextProperty#as(Class, Map)
|
||||
*/
|
||||
public static <T> T as(Property<T> property, PropertyContext context, Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
if (property.value == null) {
|
||||
if (property.skipCache || property.value == null) {
|
||||
String rendered = context.render(property.expression, variables);
|
||||
property.value = MAPPER.convertValue(rendered, clazz);
|
||||
property.value = deserialize(rendered, clazz);
|
||||
}
|
||||
|
||||
return property.value;
|
||||
}
|
||||
|
||||
private static <T> T deserialize(Object rendered, Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||
try {
|
||||
return MAPPER.convertValue(rendered, clazz);
|
||||
} catch (IllegalArgumentException e) {
|
||||
if (rendered instanceof String str) {
|
||||
try {
|
||||
return MAPPER.readValue(str, clazz);
|
||||
} catch (JsonProcessingException ex) {
|
||||
throw new IllegalVariableEvaluationException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
throw new IllegalVariableEvaluationException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static <T> T deserialize(Object rendered, JavaType type) throws IllegalVariableEvaluationException {
|
||||
try {
|
||||
return MAPPER.convertValue(rendered, type);
|
||||
} catch (IllegalArgumentException e) {
|
||||
if (rendered instanceof String str) {
|
||||
try {
|
||||
return MAPPER.readValue(str, type);
|
||||
} catch (JsonProcessingException ex) {
|
||||
throw new IllegalVariableEvaluationException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
throw new IllegalVariableEvaluationException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Render a property then convert it as a list of target type.<br>
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#asList(Class)
|
||||
* @see RunContextProperty#asList(Class)
|
||||
*/
|
||||
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz) throws IllegalVariableEvaluationException {
|
||||
return asList(property, context, itemClazz, Map.of());
|
||||
@@ -186,37 +227,39 @@ public class Property<T> {
|
||||
/**
|
||||
* Render a property with additional variables, then convert it as a list of target type.<br>
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#asList(Class, Map)
|
||||
* @see RunContextProperty#asList(Class, Map)
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T, I> T asList(Property<T> property, PropertyContext context, Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
if (property.value == null) {
|
||||
if (property.skipCache || property.value == null) {
|
||||
JavaType type = MAPPER.getTypeFactory().constructCollectionLikeType(List.class, itemClazz);
|
||||
try {
|
||||
String trimmedExpression = property.expression.trim();
|
||||
// We need to detect if the expression is already a list or if it's a pebble expression (for eg. referencing a variable containing a list).
|
||||
// Doing that allows us to, if it's an expression, first render then read it as a list.
|
||||
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
|
||||
property.value = MAPPER.readValue(context.render(property.expression, variables), type);
|
||||
}
|
||||
// Otherwise, if it's already a list, we read it as a list first then render it from run context which handle list rendering by rendering each item of the list
|
||||
else {
|
||||
List<?> asRawList = MAPPER.readValue(property.expression, List.class);
|
||||
property.value = (T) asRawList.stream()
|
||||
.map(throwFunction(item -> {
|
||||
if (item instanceof String str) {
|
||||
return MAPPER.convertValue(context.render(str, variables), itemClazz);
|
||||
} else if (item instanceof Map map) {
|
||||
return MAPPER.convertValue(context.render(map, variables), itemClazz);
|
||||
}
|
||||
return item;
|
||||
}))
|
||||
.toList();
|
||||
}
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalVariableEvaluationException(e);
|
||||
String trimmedExpression = property.expression.trim();
|
||||
// We need to detect if the expression is already a list or if it's a pebble expression (for eg. referencing a variable containing a list).
|
||||
// Doing that allows us to, if it's an expression, first render then read it as a list.
|
||||
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
|
||||
property.value = deserialize(context.render(property.expression, variables), type);
|
||||
}
|
||||
// Otherwise, if it's already a list, we read it as a list first then render it from run context which handle list rendering by rendering each item of the list
|
||||
else {
|
||||
List<?> asRawList = deserialize(property.expression, List.class);
|
||||
property.value = (T) asRawList.stream()
|
||||
.map(throwFunction(item -> {
|
||||
Object rendered = null;
|
||||
if (item instanceof String str) {
|
||||
rendered = context.render(str, variables);
|
||||
} else if (item instanceof Map map) {
|
||||
rendered = context.render(map, variables);
|
||||
}
|
||||
|
||||
if (rendered != null) {
|
||||
return deserialize(rendered, itemClazz);
|
||||
}
|
||||
|
||||
return item;
|
||||
}))
|
||||
.toList();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -226,9 +269,9 @@ public class Property<T> {
|
||||
/**
|
||||
* Render a property then convert it as a map of target types.<br>
|
||||
* <p>
|
||||
* This method is designed to be used only by the {@link io.kestra.core.runners.RunContextProperty}.
|
||||
* This method is designed to be used only by the {@link RunContextProperty}.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class)
|
||||
* @see RunContextProperty#asMap(Class, Class)
|
||||
*/
|
||||
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
|
||||
return asMap(property, runContext, keyClass, valueClass, Map.of());
|
||||
@@ -240,11 +283,11 @@ public class Property<T> {
|
||||
* This method is safe to be used as many times as you want as the rendering and conversion will be cached.
|
||||
* Warning, due to the caching mechanism, this method is not thread-safe.
|
||||
*
|
||||
* @see io.kestra.core.runners.RunContextProperty#asMap(Class, Class, Map)
|
||||
* @see RunContextProperty#asMap(Class, Class, Map)
|
||||
*/
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
public static <T, K, V> T asMap(Property<T> property, RunContext runContext, Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
|
||||
if (property.value == null) {
|
||||
if (property.skipCache || property.value == null) {
|
||||
JavaType targetMapType = MAPPER.getTypeFactory().constructMapType(Map.class, keyClass, valueClass);
|
||||
|
||||
try {
|
||||
@@ -252,12 +295,12 @@ public class Property<T> {
|
||||
// We need to detect if the expression is already a map or if it's a pebble expression (for eg. referencing a variable containing a map).
|
||||
// Doing that allows us to, if it's an expression, first render then read it as a map.
|
||||
if (trimmedExpression.startsWith("{{") && trimmedExpression.endsWith("}}")) {
|
||||
property.value = MAPPER.readValue(runContext.render(property.expression, variables), targetMapType);
|
||||
property.value = deserialize(runContext.render(property.expression, variables), targetMapType);
|
||||
}
|
||||
// Otherwise if it's already a map we read it as a map first then render it from run context which handle map rendering by rendering each entry of the map (otherwise it will fail with nested expressions in values for eg.)
|
||||
else {
|
||||
Map asRawMap = MAPPER.readValue(property.expression, Map.class);
|
||||
property.value = MAPPER.convertValue(runContext.render(asRawMap, variables), targetMapType);
|
||||
property.value = deserialize(runContext.render(asRawMap, variables), targetMapType);
|
||||
}
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalVariableEvaluationException(e);
|
||||
|
||||
@@ -4,10 +4,8 @@ import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.tasks.runners.TaskLogLineMatcher.TaskLogMatch;
|
||||
import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.services.FlowService;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
@@ -38,6 +36,7 @@ import static io.kestra.core.utils.Rethrow.throwConsumer;
|
||||
abstract public class PluginUtilsService {
|
||||
|
||||
private static final TypeReference<Map<String, String>> MAP_TYPE_REFERENCE = new TypeReference<>() {};
|
||||
private static final TaskLogLineMatcher LOG_LINE_MATCHER = new TaskLogLineMatcher();
|
||||
|
||||
public static Map<String, String> createOutputFiles(
|
||||
Path tempDirectory,
|
||||
@@ -170,12 +169,9 @@ abstract public class PluginUtilsService {
|
||||
}
|
||||
|
||||
public static Map<String, Object> parseOut(String line, Logger logger, RunContext runContext, boolean isStdErr, Instant customInstant) {
|
||||
|
||||
TaskLogLineMatcher logLineMatcher = ((DefaultRunContext) runContext).getApplicationContext().getBean(TaskLogLineMatcher.class);
|
||||
|
||||
Map<String, Object> outputs = new HashMap<>();
|
||||
try {
|
||||
Optional<TaskLogMatch> matches = logLineMatcher.matches(line, logger, runContext, customInstant);
|
||||
Optional<TaskLogMatch> matches = LOG_LINE_MATCHER.matches(line, logger, runContext, customInstant);
|
||||
if (matches.isPresent()) {
|
||||
TaskLogMatch taskLogMatch = matches.get();
|
||||
outputs.putAll(taskLogMatch.outputs());
|
||||
@@ -215,8 +211,7 @@ abstract public class PluginUtilsService {
|
||||
realNamespace = runContext.render(namespace);
|
||||
realFlowId = runContext.render(flowId);
|
||||
// validate that the flow exists: a.k.a access is authorized by this namespace
|
||||
FlowService flowService = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowService.class);
|
||||
flowService.checkAllowedNamespace(flowInfo.tenantId(), realNamespace, flowInfo.tenantId(), flowInfo.namespace());
|
||||
runContext.acl().allowNamespace(realNamespace).check();
|
||||
} else if (namespace != null || flowId != null) {
|
||||
throw new IllegalArgumentException("Both `namespace` and `flowId` must be set when `executionId` is set.");
|
||||
} else {
|
||||
|
||||
@@ -27,7 +27,6 @@ import static io.kestra.core.runners.RunContextLogger.ORIGINAL_TIMESTAMP_KEY;
|
||||
* ::{"outputs":{"key":"value"}}::
|
||||
* }</pre>
|
||||
*/
|
||||
@Singleton
|
||||
public class TaskLogLineMatcher {
|
||||
|
||||
protected static final Pattern LOG_DATA_SYNTAX = Pattern.compile("^::(\\{.*})::$");
|
||||
@@ -108,4 +107,4 @@ public class TaskLogLineMatcher {
|
||||
String message
|
||||
) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -82,6 +82,12 @@ abstract public class AbstractTrigger implements TriggerInterface {
|
||||
@PluginProperty(hidden = true, group = PluginProperty.CORE_GROUP)
|
||||
private boolean failOnTriggerError = false;
|
||||
|
||||
@PluginProperty(group = PluginProperty.CORE_GROUP)
|
||||
@Schema(
|
||||
title = "Specifies whether a trigger is allowed to start a new execution even if a previous run is still in progress."
|
||||
)
|
||||
private boolean allowConcurrent = false;
|
||||
|
||||
/**
|
||||
* For backward compatibility: we rename minLogLevel to logLevel.
|
||||
* @deprecated use {@link #logLevel} instead
|
||||
|
||||
@@ -1,22 +1,37 @@
|
||||
package io.kestra.core.models.triggers;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.annotations.PluginProperty;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.Map;
|
||||
|
||||
public interface Schedulable extends PollingTriggerInterface{
|
||||
String PLUGIN_PROPERTY_RECOVER_MISSED_SCHEDULES = "recoverMissedSchedules";
|
||||
|
||||
@Schema(
|
||||
title = "The inputs to pass to the scheduled flow"
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
Map<String, Object> getInputs();
|
||||
|
||||
@Schema(
|
||||
title = "Action to take in the case of missed schedules",
|
||||
description = "`ALL` will recover all missed schedules, `LAST` will only recovered the last missing one, `NONE` will not recover any missing schedule.\n" +
|
||||
"The default is `ALL` unless a different value is configured using the global plugin configuration."
|
||||
)
|
||||
@PluginProperty
|
||||
RecoverMissedSchedules getRecoverMissedSchedules();
|
||||
|
||||
/**
|
||||
* Compute the previous evaluation of a trigger.
|
||||
* This is used when a trigger misses some schedule to compute the next date to evaluate in the past.
|
||||
*/
|
||||
ZonedDateTime previousEvaluationDate(ConditionContext conditionContext) throws IllegalVariableEvaluationException;
|
||||
|
||||
RecoverMissedSchedules getRecoverMissedSchedules();
|
||||
|
||||
|
||||
/**
|
||||
* Load the default RecoverMissedSchedules from plugin property, or else ALL.
|
||||
*/
|
||||
|
||||
@@ -172,7 +172,7 @@ public class Trigger extends TriggerContext implements HasUID {
|
||||
|
||||
if (abstractTrigger instanceof PollingTriggerInterface pollingTriggerInterface) {
|
||||
try {
|
||||
nextDate = pollingTriggerInterface.nextEvaluationDate(conditionContext, Optional.empty());
|
||||
nextDate = pollingTriggerInterface.nextEvaluationDate(conditionContext, lastTrigger);
|
||||
} catch (InvalidTriggerConfigurationException e) {
|
||||
disabled = true;
|
||||
}
|
||||
|
||||
@@ -6,12 +6,9 @@ import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.ExecutionTrigger;
|
||||
import io.kestra.core.models.tasks.Output;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.FlowInputOutput;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.*;
|
||||
|
||||
public abstract class TriggerService {
|
||||
@@ -51,58 +48,6 @@ public abstract class TriggerService {
|
||||
return generateExecution(IdUtils.create(), trigger, context, executionTrigger, conditionContext);
|
||||
}
|
||||
|
||||
public static Execution generateScheduledExecution(
|
||||
AbstractTrigger trigger,
|
||||
ConditionContext conditionContext,
|
||||
TriggerContext context,
|
||||
List<Label> labels,
|
||||
Map<String, Object> inputs,
|
||||
Map<String, Object> variables,
|
||||
Optional<ZonedDateTime> scheduleDate
|
||||
) {
|
||||
RunContext runContext = conditionContext.getRunContext();
|
||||
ExecutionTrigger executionTrigger = ExecutionTrigger.of(trigger, variables);
|
||||
|
||||
List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(labels));
|
||||
if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) {
|
||||
// add a correlation ID if none exist
|
||||
executionLabels.add(new Label(Label.CORRELATION_ID, runContext.getTriggerExecutionId()));
|
||||
}
|
||||
Execution execution = Execution.builder()
|
||||
.id(runContext.getTriggerExecutionId())
|
||||
.tenantId(context.getTenantId())
|
||||
.namespace(context.getNamespace())
|
||||
.flowId(context.getFlowId())
|
||||
.flowRevision(conditionContext.getFlow().getRevision())
|
||||
.variables(conditionContext.getFlow().getVariables())
|
||||
.labels(executionLabels)
|
||||
.state(new State())
|
||||
.trigger(executionTrigger)
|
||||
.scheduleDate(scheduleDate.map(date -> date.toInstant()).orElse(null))
|
||||
.build();
|
||||
|
||||
Map<String, Object> allInputs = new HashMap<>();
|
||||
// add flow inputs with default value
|
||||
var flow = conditionContext.getFlow();
|
||||
if (flow.getInputs() != null) {
|
||||
flow.getInputs().stream()
|
||||
.filter(input -> input.getDefaults() != null)
|
||||
.forEach(input -> allInputs.put(input.getId(), input.getDefaults()));
|
||||
}
|
||||
|
||||
if (inputs != null) {
|
||||
allInputs.putAll(inputs);
|
||||
}
|
||||
|
||||
// add inputs and inject defaults
|
||||
if (!allInputs.isEmpty()) {
|
||||
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class);
|
||||
execution = execution.withInputs(flowInputOutput.readExecutionInputs(conditionContext.getFlow(), execution, allInputs));
|
||||
}
|
||||
|
||||
return execution;
|
||||
}
|
||||
|
||||
private static Execution generateExecution(
|
||||
String id,
|
||||
AbstractTrigger trigger,
|
||||
@@ -111,6 +56,7 @@ public abstract class TriggerService {
|
||||
ConditionContext conditionContext
|
||||
) {
|
||||
List<Label> executionLabels = new ArrayList<>(ListUtils.emptyOnNull(trigger.getLabels()));
|
||||
executionLabels.add(new Label(Label.FROM, "trigger"));
|
||||
if (executionLabels.stream().noneMatch(label -> Label.CORRELATION_ID.equals(label.key()))) {
|
||||
// add a correlation ID if none exist
|
||||
executionLabels.add(new Label(Label.CORRELATION_ID, id));
|
||||
|
||||
@@ -67,6 +67,11 @@ public class ManualConstraintViolation<T> implements ConstraintViolation<T> {
|
||||
invalidValue
|
||||
)));
|
||||
}
|
||||
public static <T> ConstraintViolationException toConstraintViolationException(
|
||||
Set<? extends ConstraintViolation<?>> constraintViolations
|
||||
) {
|
||||
return new ConstraintViolationException(constraintViolations);
|
||||
}
|
||||
|
||||
public String getMessageTemplate() {
|
||||
return "{messageTemplate}";
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
package io.kestra.core.plugins.notifications;
|
||||
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public interface ExecutionInterface {
|
||||
@Schema(
|
||||
title = "The execution id to use",
|
||||
description = "Default is the current execution, " +
|
||||
"change it to {{ trigger.executionId }} if you use this task with a Flow trigger to use the original execution."
|
||||
)
|
||||
Property<String> getExecutionId();
|
||||
|
||||
@Schema(
|
||||
title = "Custom fields to be added on notification"
|
||||
)
|
||||
Property<Map<String, Object>> getCustomFields();
|
||||
|
||||
@Schema(
|
||||
title = "Custom message to be added on notification"
|
||||
)
|
||||
Property<String> getCustomMessage();
|
||||
}
|
||||
@@ -0,0 +1,140 @@
|
||||
package io.kestra.core.plugins.notifications;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.retrys.Exponential;
|
||||
import io.kestra.core.repositories.ExecutionRepositoryInterface;
|
||||
import io.kestra.core.runners.DefaultRunContext;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.RetryUtils;
|
||||
import io.kestra.core.utils.UriProvider;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.*;
|
||||
|
||||
public final class ExecutionService {
|
||||
private ExecutionService() {}
|
||||
|
||||
public static Execution findExecution(RunContext runContext, Property<String> executionId) throws IllegalVariableEvaluationException, NoSuchElementException {
|
||||
ExecutionRepositoryInterface executionRepository = ((DefaultRunContext) runContext).getApplicationContext().getBean(ExecutionRepositoryInterface.class);
|
||||
|
||||
RetryUtils.Instance<Execution, NoSuchElementException> retryInstance = RetryUtils
|
||||
.of(Exponential.builder()
|
||||
.delayFactor(2.0)
|
||||
.interval(Duration.ofSeconds(1))
|
||||
.maxInterval(Duration.ofSeconds(15))
|
||||
.maxAttempts(-1)
|
||||
.maxDuration(Duration.ofMinutes(10))
|
||||
.build(),
|
||||
runContext.logger()
|
||||
);
|
||||
|
||||
var executionRendererId = runContext.render(executionId).as(String.class).orElse(null);
|
||||
var flowTriggerExecutionState = getOptionalFlowTriggerExecutionState(runContext);
|
||||
|
||||
var flowVars = (Map<String, String>) runContext.getVariables().get("flow");
|
||||
var isCurrentExecution = isCurrentExecution(runContext, executionRendererId);
|
||||
if (isCurrentExecution) {
|
||||
runContext.logger().info("Loading execution data for the current execution.");
|
||||
}
|
||||
|
||||
return retryInstance.run(
|
||||
NoSuchElementException.class,
|
||||
() -> executionRepository.findById(flowVars.get("tenantId"), executionRendererId)
|
||||
.filter(foundExecution -> isExecutionInTheWantedState(foundExecution, isCurrentExecution, flowTriggerExecutionState))
|
||||
.orElseThrow(() -> new NoSuchElementException("Unable to find execution '" + executionRendererId + "'"))
|
||||
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* ExecutionRepository can be out of sync in ElasticSearch stack, with this filter we try to mitigate that
|
||||
*
|
||||
* @param execution the Execution we fetched from ExecutionRepository
|
||||
* @param isCurrentExecution true if this *Execution Task is configured to send a notification for the current Execution
|
||||
* @param flowTriggerExecutionState the Execution State that triggered the Flow trigger, if any
|
||||
* @return true if we think we fetched the right Execution data for our usecase
|
||||
*/
|
||||
public static boolean isExecutionInTheWantedState(Execution execution, boolean isCurrentExecution, Optional<String> flowTriggerExecutionState) {
|
||||
if (isCurrentExecution) {
|
||||
// we don't wait for current execution to be terminated as it could not be possible as long as this task is running
|
||||
return true;
|
||||
}
|
||||
|
||||
if (flowTriggerExecutionState.isPresent()) {
|
||||
// we were triggered by a Flow trigger that can be, for example: PAUSED
|
||||
if (flowTriggerExecutionState.get().equals(State.Type.RUNNING.toString())) {
|
||||
// RUNNING special case: we take the first state we got
|
||||
return true;
|
||||
} else {
|
||||
// to handle the case where the ExecutionRepository is out of sync in ElasticSearch stack,
|
||||
// we try to match an Execution with the same state
|
||||
return execution.getState().getCurrent().name().equals(flowTriggerExecutionState.get());
|
||||
}
|
||||
} else {
|
||||
return execution.getState().getCurrent().isTerminated();
|
||||
}
|
||||
}
|
||||
|
||||
public static Map<String, Object> executionMap(RunContext runContext, ExecutionInterface executionInterface) throws IllegalVariableEvaluationException {
|
||||
Execution execution = findExecution(runContext, executionInterface.getExecutionId());
|
||||
UriProvider uriProvider = ((DefaultRunContext) runContext).getApplicationContext().getBean(UriProvider.class);
|
||||
|
||||
Map<String, Object> templateRenderMap = new HashMap<>();
|
||||
templateRenderMap.put("duration", execution.getState().humanDuration());
|
||||
templateRenderMap.put("startDate", execution.getState().getStartDate());
|
||||
templateRenderMap.put("link", uriProvider.executionUrl(execution));
|
||||
templateRenderMap.put("execution", JacksonMapper.toMap(execution));
|
||||
|
||||
runContext.render(executionInterface.getCustomMessage())
|
||||
.as(String.class)
|
||||
.ifPresent(s -> templateRenderMap.put("customMessage", s));
|
||||
|
||||
final Map<String, Object> renderedCustomFields = runContext.render(executionInterface.getCustomFields()).asMap(String.class, Object.class);
|
||||
if (!renderedCustomFields.isEmpty()) {
|
||||
templateRenderMap.put("customFields", renderedCustomFields);
|
||||
}
|
||||
|
||||
var isCurrentExecution = isCurrentExecution(runContext, execution.getId());
|
||||
|
||||
List<TaskRun> taskRuns;
|
||||
|
||||
if (isCurrentExecution) {
|
||||
taskRuns = execution.getTaskRunList();
|
||||
} else {
|
||||
taskRuns = execution.getTaskRunList().stream()
|
||||
.filter(t -> (execution.hasFailed() ? State.Type.FAILED : State.Type.SUCCESS).equals(t.getState().getCurrent()))
|
||||
.toList();
|
||||
}
|
||||
|
||||
if (!ListUtils.isEmpty(taskRuns)) {
|
||||
TaskRun lastTaskRun = taskRuns.getLast();
|
||||
templateRenderMap.put("firstFailed", State.Type.FAILED.equals(lastTaskRun.getState().getCurrent()) ? lastTaskRun : false);
|
||||
templateRenderMap.put("lastTask", lastTaskRun);
|
||||
}
|
||||
|
||||
return templateRenderMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* if there is a state, we assume this is a Flow trigger with type: {@link io.kestra.plugin.core.trigger.Flow.Output}
|
||||
*
|
||||
* @return the state of the execution that triggered the Flow trigger, or empty if another usecase/trigger
|
||||
*/
|
||||
private static Optional<String> getOptionalFlowTriggerExecutionState(RunContext runContext) {
|
||||
var triggerVar = Optional.ofNullable(
|
||||
runContext.getVariables().get("trigger")
|
||||
);
|
||||
return triggerVar.map(trigger -> ((Map<String, String>) trigger).get("state"));
|
||||
}
|
||||
|
||||
private static boolean isCurrentExecution(RunContext runContext, String executionId) {
|
||||
var executionVars = (Map<String, String>) runContext.getVariables().get("execution");
|
||||
return executionId.equals(executionVars.get("id"));
|
||||
}
|
||||
}
|
||||
@@ -12,6 +12,7 @@ import io.kestra.core.models.dashboards.charts.DataChart;
|
||||
import io.kestra.core.plugins.DefaultPluginRegistry;
|
||||
import io.kestra.core.plugins.PluginRegistry;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.micronaut.context.exceptions.NoSuchBeanException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -72,7 +73,7 @@ public final class PluginDeserializer<T extends Plugin> extends JsonDeserializer
|
||||
// By default, if no plugin-registry is configured retrieve
|
||||
// the one configured from the static Kestra's context.
|
||||
pluginRegistry = KestraContext.getContext().getPluginRegistry();
|
||||
} catch (IllegalStateException ignore) {
|
||||
} catch (IllegalStateException | NoSuchBeanException ignore) {
|
||||
// This error can only happen if the KestraContext is not initialized (i.e. in unit tests).
|
||||
log.error("No plugin registry was initialized. Use default implementation.");
|
||||
pluginRegistry = DefaultPluginRegistry.getOrCreate();
|
||||
|
||||
@@ -2,7 +2,6 @@ package io.kestra.core.repositories;
|
||||
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.executions.statistics.DailyExecutionStatistics;
|
||||
import io.kestra.core.models.executions.statistics.ExecutionCount;
|
||||
import io.kestra.core.models.executions.statistics.Flow;
|
||||
@@ -94,6 +93,8 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
|
||||
|
||||
Flux<Execution> findAllAsync(@Nullable String tenantId);
|
||||
|
||||
Flux<Execution> findAsync(String tenantId, List<QueryFilter> filters);
|
||||
|
||||
Execution delete(Execution execution);
|
||||
|
||||
Integer purge(Execution execution);
|
||||
|
||||
@@ -8,6 +8,7 @@ import io.kestra.plugin.core.dashboard.data.Flows;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
@@ -158,6 +159,8 @@ public interface FlowRepositoryInterface extends QueryBuilderInterface<Flows.Fie
|
||||
.toList();
|
||||
}
|
||||
|
||||
Flux<Flow> findAsync(String tenantId, List<QueryFilter> filters);
|
||||
|
||||
FlowWithSource create(GenericFlow flow);
|
||||
|
||||
FlowWithSource update(GenericFlow flow, FlowInterface previous) throws ConstraintViolationException;
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
package io.kestra.core.repositories;
|
||||
|
||||
import io.kestra.core.models.FetchVersion;
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.namespaces.files.NamespaceFileMetadata;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
public interface NamespaceFileMetadataRepositoryInterface extends SaveRepositoryInterface<NamespaceFileMetadata> {
|
||||
Optional<NamespaceFileMetadata> findByPath(
|
||||
String tenantId,
|
||||
String namespace,
|
||||
String path
|
||||
) throws IOException;
|
||||
|
||||
default ArrayListTotal<NamespaceFileMetadata> find(
|
||||
Pageable pageable,
|
||||
String tenantId,
|
||||
List<QueryFilter> filters,
|
||||
boolean allowDeleted
|
||||
) {
|
||||
return this.find(pageable, tenantId, filters, allowDeleted, FetchVersion.LATEST);
|
||||
}
|
||||
|
||||
ArrayListTotal<NamespaceFileMetadata> find(
|
||||
Pageable pageable,
|
||||
String tenantId,
|
||||
List<QueryFilter> filters,
|
||||
boolean allowDeleted,
|
||||
FetchVersion fetchBehavior
|
||||
);
|
||||
|
||||
default NamespaceFileMetadata delete(NamespaceFileMetadata namespaceFileMetadata) throws IOException {
|
||||
return this.save(namespaceFileMetadata.toBuilder().deleted(true).build());
|
||||
}
|
||||
|
||||
/**
|
||||
* Purge (hard delete) a list of namespace files metadata. If no version is specified, all versions are purged.
|
||||
* @param namespaceFilesMetadata the list of namespace files metadata to purge
|
||||
* @return the number of purged namespace files metadata
|
||||
*/
|
||||
Integer purge(List<NamespaceFileMetadata> namespaceFilesMetadata);
|
||||
}
|
||||
@@ -1,10 +1,10 @@
|
||||
package io.kestra.core.repositories;
|
||||
|
||||
import io.kestra.core.models.Setting;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
|
||||
public interface SettingRepositoryInterface {
|
||||
Optional<Setting> findByKey(String key);
|
||||
@@ -13,5 +13,7 @@ public interface SettingRepositoryInterface {
|
||||
|
||||
Setting save(Setting setting) throws ConstraintViolationException;
|
||||
|
||||
Setting internalSave(Setting setting) throws ConstraintViolationException;
|
||||
|
||||
Setting delete(Setting setting);
|
||||
}
|
||||
|
||||
@@ -16,8 +16,8 @@ import java.util.function.Function;
|
||||
public interface TriggerRepositoryInterface extends QueryBuilderInterface<Triggers.Fields> {
|
||||
Optional<Trigger> findLast(TriggerContext trigger);
|
||||
|
||||
Optional<Trigger> findByExecution(Execution execution);
|
||||
|
||||
Optional<Trigger> findByUid(String uid);
|
||||
|
||||
List<Trigger> findAll(String tenantId);
|
||||
|
||||
List<Trigger> findAllForAllTenants();
|
||||
@@ -43,9 +43,9 @@ public interface TriggerRepositoryInterface extends QueryBuilderInterface<Trigge
|
||||
|
||||
/**
|
||||
* Find all triggers that match the query, return a flux of triggers
|
||||
* as the search is not paginated
|
||||
*/
|
||||
Flux<Trigger> find(String tenantId, List<QueryFilter> filters);
|
||||
Flux<Trigger> findAsync(String tenantId, List<QueryFilter> filters);
|
||||
|
||||
|
||||
default Function<String, String> sortMapping() throws IllegalArgumentException {
|
||||
return Function.identity();
|
||||
|
||||
50
core/src/main/java/io/kestra/core/runners/AclChecker.java
Normal file
50
core/src/main/java/io/kestra/core/runners/AclChecker.java
Normal file
@@ -0,0 +1,50 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import javax.annotation.CheckReturnValue;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Check if the current taskrun has access to the requested resources.
|
||||
*
|
||||
* <p>
|
||||
* IMPORTANT: remember to call the <code>check()</code> method to check the ACL.
|
||||
*
|
||||
* @see AllowedResources
|
||||
*/
|
||||
public interface AclChecker {
|
||||
|
||||
/**Tasks that need to access resources outside their namespace should use this interface to check ACL (Allowed namespaces in EE).
|
||||
* Allow all namespaces.
|
||||
* <p>
|
||||
* IMPORTANT: remember to call the <code>check()</code> method to check the ACL.
|
||||
*/
|
||||
@CheckReturnValue
|
||||
AllowedResources allowAllNamespaces();
|
||||
|
||||
/**
|
||||
* Allow only the given namespace.
|
||||
* <p>
|
||||
* IMPORTANT: remember to call the <code>check()</code> method to check the ACL.
|
||||
*/
|
||||
@CheckReturnValue
|
||||
AllowedResources allowNamespace(String namespace);
|
||||
|
||||
/**
|
||||
* Allow only the given namespaces.
|
||||
* <p>
|
||||
* IMPORTANT: remember to call the <code>check()</code> method to check the ACL.
|
||||
*/
|
||||
@CheckReturnValue
|
||||
AllowedResources allowNamespaces(List<String> namespaces);
|
||||
|
||||
/**
|
||||
* Represents a set of allowed resources.
|
||||
* Tasks that need to access resources outside their namespace should call the <code>check()</code> method to check the ACL (Allowed namespaces in EE).
|
||||
*/
|
||||
interface AllowedResources {
|
||||
/**
|
||||
* Check if the current taskrun has access to the requested resources.
|
||||
*/
|
||||
void check();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,86 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.services.NamespaceService;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
class AclCheckerImpl implements AclChecker {
|
||||
private final NamespaceService namespaceService;
|
||||
private final RunContext.FlowInfo flowInfo;
|
||||
|
||||
AclCheckerImpl(ApplicationContext applicationContext, RunContext.FlowInfo flowInfo) {
|
||||
this.namespaceService = applicationContext.getBean(NamespaceService.class);
|
||||
this.flowInfo = flowInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AllowedResources allowAllNamespaces() {
|
||||
return new AllowAllNamespaces(flowInfo, namespaceService);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AllowedResources allowNamespace(String namespace) {
|
||||
return new AllowNamespace(flowInfo, namespaceService, namespace);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AllowedResources allowNamespaces(List<String> namespaces) {
|
||||
return new AllowNamespaces(flowInfo, namespaceService, namespaces);
|
||||
}
|
||||
|
||||
|
||||
static class AllowAllNamespaces implements AllowedResources {
|
||||
private final RunContext.FlowInfo flowInfo;
|
||||
private final NamespaceService namespaceService;
|
||||
|
||||
AllowAllNamespaces(RunContext.FlowInfo flowInfo, NamespaceService namespaceService) {
|
||||
this.flowInfo = Objects.requireNonNull(flowInfo);
|
||||
this.namespaceService = Objects.requireNonNull(namespaceService);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void check() {
|
||||
this.namespaceService.checkAllowedAllNamespaces(flowInfo.tenantId(), flowInfo.tenantId(), flowInfo.namespace());
|
||||
}
|
||||
}
|
||||
|
||||
static class AllowNamespace implements AllowedResources {
|
||||
private final RunContext.FlowInfo flowInfo;
|
||||
private final NamespaceService namespaceService;
|
||||
private final String namespace;
|
||||
|
||||
public AllowNamespace(RunContext.FlowInfo flowInfo, NamespaceService namespaceService, String namespace) {
|
||||
this.flowInfo = Objects.requireNonNull(flowInfo);
|
||||
this.namespaceService = Objects.requireNonNull(namespaceService);
|
||||
this.namespace = Objects.requireNonNull(namespace);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void check() {
|
||||
namespaceService.checkAllowedNamespace(flowInfo.tenantId(), namespace, flowInfo.tenantId(), flowInfo.namespace());
|
||||
}
|
||||
}
|
||||
|
||||
static class AllowNamespaces implements AllowedResources {
|
||||
private final RunContext.FlowInfo flowInfo;
|
||||
private final NamespaceService namespaceService;
|
||||
private final List<String> namespaces;
|
||||
|
||||
AllowNamespaces(RunContext.FlowInfo flowInfo, NamespaceService namespaceService, List<String> namespaces) {
|
||||
this.flowInfo = Objects.requireNonNull(flowInfo);
|
||||
this.namespaceService = Objects.requireNonNull(namespaceService);
|
||||
this.namespaces = Objects.requireNonNull(namespaces);
|
||||
|
||||
if (namespaces.isEmpty()) {
|
||||
throw new IllegalArgumentException("At least one namespace must be provided");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void check() {
|
||||
namespaces.forEach(namespace -> namespaceService.checkAllowedNamespace(flowInfo.tenantId(), namespace, flowInfo.tenantId(), flowInfo.namespace()));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -6,10 +6,12 @@ import com.google.common.base.CaseFormat;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.metrics.MetricRegistry;
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.executions.AbstractMetricEntry;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.plugins.PluginConfigurations;
|
||||
import io.kestra.core.services.KVStoreService;
|
||||
import io.kestra.core.storages.Storage;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
@@ -123,7 +125,12 @@ public class DefaultRunContext extends RunContext {
|
||||
this.traceParent = traceParent;
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Plugin should not use the ApplicationContext anymore, and neither should they cast to this implementation.
|
||||
* Plugin should instead rely on supported API only.
|
||||
*/
|
||||
@JsonIgnore
|
||||
@Deprecated(since = "1.2.0", forRemoval = true)
|
||||
public ApplicationContext getApplicationContext() {
|
||||
return applicationContext;
|
||||
}
|
||||
@@ -230,6 +237,14 @@ public class DefaultRunContext extends RunContext {
|
||||
return runContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RunContext cloneForPlugin(Plugin plugin) {
|
||||
PluginConfigurations pluginConfigurations = applicationContext.getBean(PluginConfigurations.class);
|
||||
DefaultRunContext runContext = clone();
|
||||
runContext.pluginConfiguration = pluginConfigurations.getConfigurationByPluginTypeOrAliases(plugin.getType(), plugin.getClass());
|
||||
return runContext;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@@ -574,11 +589,21 @@ public class DefaultRunContext extends RunContext {
|
||||
return isInitialized.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AclChecker acl() {
|
||||
return new AclCheckerImpl(this.applicationContext, flowInfo());
|
||||
}
|
||||
|
||||
@Override
|
||||
public LocalPath localPath() {
|
||||
return localPath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputAndOutput inputAndOutput() {
|
||||
return new InputAndOutputImpl(this.applicationContext, this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Builder class for constructing new {@link DefaultRunContext} objects.
|
||||
*/
|
||||
|
||||
@@ -53,12 +53,10 @@ public final class ExecutableUtils {
|
||||
}
|
||||
|
||||
public static SubflowExecutionResult subflowExecutionResult(TaskRun parentTaskrun, Execution execution) {
|
||||
List<TaskRunAttempt> attempts = parentTaskrun.getAttempts() == null ? new ArrayList<>() : new ArrayList<>(parentTaskrun.getAttempts());
|
||||
attempts.add(TaskRunAttempt.builder().state(parentTaskrun.getState()).build());
|
||||
return SubflowExecutionResult.builder()
|
||||
.executionId(execution.getId())
|
||||
.state(parentTaskrun.getState().getCurrent())
|
||||
.parentTaskRun(parentTaskrun.withAttempts(attempts))
|
||||
.parentTaskRun(parentTaskrun.addAttempt(TaskRunAttempt.builder().state(parentTaskrun.getState()).build()))
|
||||
.build();
|
||||
}
|
||||
|
||||
@@ -191,12 +189,11 @@ public final class ExecutableUtils {
|
||||
variables.put("taskRunIteration", currentTaskRun.getIteration());
|
||||
}
|
||||
|
||||
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class);
|
||||
Instant scheduleOnDate = runContext.render(scheduleDate).as(ZonedDateTime.class).map(date -> date.toInstant()).orElse(null);
|
||||
Execution execution = Execution
|
||||
.newExecution(
|
||||
flow,
|
||||
(f, e) -> flowInputOutput.readExecutionInputs(f, e, inputs),
|
||||
(f, e) -> runContext.inputAndOutput().readInputs(f, e, inputs),
|
||||
newLabels,
|
||||
Optional.empty())
|
||||
.withTrigger(ExecutionTrigger.builder()
|
||||
|
||||
@@ -3,13 +3,13 @@ package io.kestra.core.runners;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.kestra.core.encryption.EncryptionService;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.exceptions.KestraRuntimeException;
|
||||
|
||||
import io.kestra.core.exceptions.InputOutputValidationException;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Data;
|
||||
import io.kestra.core.models.flows.DependsOn;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.Input;
|
||||
import io.kestra.core.models.flows.Output;
|
||||
import io.kestra.core.models.flows.RenderableInput;
|
||||
import io.kestra.core.models.flows.Type;
|
||||
import io.kestra.core.models.flows.input.FileInput;
|
||||
@@ -19,7 +19,6 @@ import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.property.PropertyContext;
|
||||
import io.kestra.core.models.property.URIFetcher;
|
||||
import io.kestra.core.models.tasks.common.EncryptedString;
|
||||
import io.kestra.core.models.validations.ManualConstraintViolation;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
@@ -158,11 +157,7 @@ public class FlowInputOutput {
|
||||
File tempFile = File.createTempFile(prefix, fileExtension);
|
||||
try (var inputStream = fileUpload.getInputStream();
|
||||
var outputStream = new FileOutputStream(tempFile)) {
|
||||
long transferredBytes = inputStream.transferTo(outputStream);
|
||||
if (transferredBytes == 0) {
|
||||
sink.error(new KestraRuntimeException("Can't upload file: " + fileUpload.getFilename()));
|
||||
return;
|
||||
}
|
||||
inputStream.transferTo(outputStream);
|
||||
URI from = storageInterface.from(execution, inputId, fileName, tempFile);
|
||||
sink.next(Map.entry(inputId, from.toString()));
|
||||
} finally {
|
||||
@@ -213,8 +208,8 @@ public class FlowInputOutput {
|
||||
.filter(InputAndValue::enabled)
|
||||
.map(it -> {
|
||||
//TODO check to return all exception at-once.
|
||||
if (it.exception() != null) {
|
||||
throw it.exception();
|
||||
if (it.exceptions() != null && !it.exceptions().isEmpty()) {
|
||||
throw InputOutputValidationException.merge(it.exceptions());
|
||||
}
|
||||
return new AbstractMap.SimpleEntry<>(it.input().getId(), it.value());
|
||||
})
|
||||
@@ -298,13 +293,9 @@ public class FlowInputOutput {
|
||||
try {
|
||||
isInputEnabled = Boolean.TRUE.equals(runContext.renderTyped(dependsOnCondition.get()));
|
||||
} catch (IllegalVariableEvaluationException e) {
|
||||
resolvable.resolveWithError(ManualConstraintViolation.toConstraintViolationException(
|
||||
"Invalid condition: " + e.getMessage(),
|
||||
input,
|
||||
(Class<Input>)input.getClass(),
|
||||
input.getId(),
|
||||
this
|
||||
));
|
||||
resolvable.resolveWithError(
|
||||
InputOutputValidationException.of("Invalid condition: " + e.getMessage())
|
||||
);
|
||||
isInputEnabled = false;
|
||||
}
|
||||
}
|
||||
@@ -337,7 +328,7 @@ public class FlowInputOutput {
|
||||
// validate and parse input value
|
||||
if (value == null) {
|
||||
if (input.getRequired()) {
|
||||
resolvable.resolveWithError(input.toConstraintViolationException("missing required input", null));
|
||||
resolvable.resolveWithError(InputOutputValidationException.of("Missing required input:" + input.getId()));
|
||||
} else {
|
||||
resolvable.resolveWithValue(null);
|
||||
}
|
||||
@@ -347,17 +338,18 @@ public class FlowInputOutput {
|
||||
parsedInput.ifPresent(parsed -> ((Input) resolvable.get().input()).validate(parsed.getValue()));
|
||||
parsedInput.ifPresent(typed -> resolvable.resolveWithValue(typed.getValue()));
|
||||
} catch (ConstraintViolationException e) {
|
||||
ConstraintViolationException exception = e.getConstraintViolations().size() == 1 ?
|
||||
input.toConstraintViolationException(List.copyOf(e.getConstraintViolations()).getFirst().getMessage(), value) :
|
||||
input.toConstraintViolationException(e.getMessage(), value);
|
||||
resolvable.resolveWithError(exception);
|
||||
Input<?> finalInput = input;
|
||||
Set<InputOutputValidationException> exceptions = e.getConstraintViolations().stream()
|
||||
.map(c-> InputOutputValidationException.of(c.getMessage(), finalInput))
|
||||
.collect(Collectors.toSet());
|
||||
resolvable.resolveWithError(exceptions);
|
||||
}
|
||||
}
|
||||
} catch (ConstraintViolationException e) {
|
||||
resolvable.resolveWithError(e);
|
||||
} catch (Exception e) {
|
||||
ConstraintViolationException exception = input.toConstraintViolationException(e instanceof IllegalArgumentException ? e.getMessage() : e.toString(), resolvable.get().value());
|
||||
resolvable.resolveWithError(exception);
|
||||
} catch (IllegalArgumentException e){
|
||||
resolvable.resolveWithError(InputOutputValidationException.of(e.getMessage(), input));
|
||||
}
|
||||
catch (Exception e) {
|
||||
resolvable.resolveWithError(InputOutputValidationException.of(e.getMessage()));
|
||||
}
|
||||
|
||||
return resolvable.get();
|
||||
@@ -382,11 +374,11 @@ public class FlowInputOutput {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static <T> Object resolveDefaultPropertyAs(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||
return Property.as((Property<T>) input.getDefaults(), renderer, clazz);
|
||||
return Property.as((Property<T>) input.getDefaults().skipCache(), renderer, clazz);
|
||||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
private static <T> Object resolveDefaultPropertyAsList(Input<?> input, PropertyContext renderer, Class<T> clazz) throws IllegalVariableEvaluationException {
|
||||
return Property.asList((Property<List<T>>) input.getDefaults(), renderer, clazz);
|
||||
return Property.asList((Property<List<T>>) input.getDefaults().skipCache(), renderer, clazz);
|
||||
}
|
||||
|
||||
private RunContext buildRunContextForExecutionAndInputs(final FlowInterface flow, final Execution execution, Map<String, InputAndValue> dependencies, final boolean decryptSecrets) {
|
||||
@@ -445,8 +437,12 @@ public class FlowInputOutput {
|
||||
}
|
||||
return entry;
|
||||
});
|
||||
} catch (Exception e) {
|
||||
throw output.toConstraintViolationException(e.getMessage(), current);
|
||||
}
|
||||
catch (IllegalArgumentException e){
|
||||
throw InputOutputValidationException.of(e.getMessage(), output);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw InputOutputValidationException.of(e.getMessage());
|
||||
}
|
||||
})
|
||||
.filter(Optional::isPresent)
|
||||
@@ -502,14 +498,14 @@ public class FlowInputOutput {
|
||||
yield storageInterface.from(execution, id, current.toString().substring(current.toString().lastIndexOf("/") + 1), new File(current.toString()));
|
||||
}
|
||||
}
|
||||
case JSON -> JacksonMapper.toObject(current.toString());
|
||||
case YAML -> YAML_MAPPER.readValue(current.toString(), JacksonMapper.OBJECT_TYPE_REFERENCE);
|
||||
case JSON -> (current instanceof Map || current instanceof Collection<?>) ? current : JacksonMapper.toObject(current.toString());
|
||||
case YAML -> (current instanceof Map || current instanceof Collection<?>) ? current : YAML_MAPPER.readValue(current.toString(), JacksonMapper.OBJECT_TYPE_REFERENCE);
|
||||
case URI -> {
|
||||
Matcher matcher = URI_PATTERN.matcher(current.toString());
|
||||
if (matcher.matches()) {
|
||||
yield current.toString();
|
||||
} else {
|
||||
throw new IllegalArgumentException("Expected `URI` but received `" + current + "`");
|
||||
throw new IllegalArgumentException("Invalid URI format.");
|
||||
}
|
||||
}
|
||||
case ARRAY, MULTISELECT -> {
|
||||
@@ -539,34 +535,10 @@ public class FlowInputOutput {
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw e;
|
||||
} catch (Throwable e) {
|
||||
throw new Exception("Expected `" + type + "` but received `" + current + "` with errors:\n```\n" + e.getMessage() + "\n```");
|
||||
throw new Exception(" errors:\n```\n" + e.getMessage() + "\n```");
|
||||
}
|
||||
}
|
||||
|
||||
public static Map<String, Object> renderFlowOutputs(List<Output> outputs, RunContext runContext) throws IllegalVariableEvaluationException {
|
||||
if (outputs == null) return Map.of();
|
||||
|
||||
// render required outputs
|
||||
Map<String, Object> outputsById = outputs
|
||||
.stream()
|
||||
.filter(output -> output.getRequired() == null || output.getRequired())
|
||||
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
|
||||
outputsById = runContext.render(outputsById);
|
||||
|
||||
// render optional outputs one by one to catch, log, and skip any error.
|
||||
for (io.kestra.core.models.flows.Output output : outputs) {
|
||||
if (Boolean.FALSE.equals(output.getRequired())) {
|
||||
try {
|
||||
outputsById.putAll(runContext.render(Map.of(output.getId(), output.getValue())));
|
||||
} catch (Exception e) {
|
||||
runContext.logger().warn("Failed to render optional flow output '{}'. Output is ignored.", output.getId(), e);
|
||||
outputsById.put(output.getId(), null);
|
||||
}
|
||||
}
|
||||
}
|
||||
return outputsById;
|
||||
}
|
||||
|
||||
/**
|
||||
* Mutable wrapper to hold a flow's input, and it's resolved value.
|
||||
*/
|
||||
@@ -595,27 +567,30 @@ public class FlowInputOutput {
|
||||
}
|
||||
|
||||
public void isDefault(boolean isDefault) {
|
||||
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), isDefault, this.input.exception());
|
||||
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), isDefault, this.input.exceptions());
|
||||
}
|
||||
|
||||
public void setInput(final Input<?> input) {
|
||||
this.input = new InputAndValue(input, this.input.value(), this.input.enabled(), this.input.isDefault(), this.input.exception());
|
||||
this.input = new InputAndValue(input, this.input.value(), this.input.enabled(), this.input.isDefault(), this.input.exceptions());
|
||||
}
|
||||
|
||||
public void resolveWithEnabled(boolean enabled) {
|
||||
this.input = new InputAndValue(this.input.input(), input.value(), enabled, this.input.isDefault(), this.input.exception());
|
||||
this.input = new InputAndValue(this.input.input(), input.value(), enabled, this.input.isDefault(), this.input.exceptions());
|
||||
markAsResolved();
|
||||
}
|
||||
|
||||
public void resolveWithValue(@Nullable Object value) {
|
||||
this.input = new InputAndValue(this.input.input(), value, this.input.enabled(), this.input.isDefault(), this.input.exception());
|
||||
this.input = new InputAndValue(this.input.input(), value, this.input.enabled(), this.input.isDefault(), this.input.exceptions());
|
||||
markAsResolved();
|
||||
}
|
||||
|
||||
public void resolveWithError(@Nullable ConstraintViolationException exception) {
|
||||
public void resolveWithError(@Nullable Set<InputOutputValidationException> exception) {
|
||||
this.input = new InputAndValue(this.input.input(), this.input.value(), this.input.enabled(), this.input.isDefault(), exception);
|
||||
markAsResolved();
|
||||
}
|
||||
private void resolveWithError(@Nullable InputOutputValidationException exception){
|
||||
resolveWithError(Collections.singleton(exception));
|
||||
}
|
||||
|
||||
private void markAsResolved() {
|
||||
this.isResolved = true;
|
||||
|
||||
@@ -11,6 +11,7 @@ import io.kestra.core.models.flows.State;
|
||||
import io.kestra.core.models.tasks.ResolvedTask;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.plugin.core.flow.Dag;
|
||||
|
||||
import java.util.*;
|
||||
@@ -143,6 +144,13 @@ public class FlowableUtils {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
// have submitted, leave
|
||||
Optional<TaskRun> lastSubmitted = execution.findLastSubmitted(taskRuns);
|
||||
if (lastSubmitted.isPresent()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
|
||||
// last success, find next
|
||||
Optional<TaskRun> lastTerminated = execution.findLastTerminated(taskRuns);
|
||||
if (lastTerminated.isPresent()) {
|
||||
@@ -150,14 +158,41 @@ public class FlowableUtils {
|
||||
|
||||
if (currentTasks.size() > lastIndex + 1) {
|
||||
return Collections.singletonList(currentTasks.get(lastIndex + 1).toNextTaskRunIncrementIteration(execution, parentTaskRun.getIteration()));
|
||||
} else {
|
||||
return Collections.singletonList(currentTasks.getFirst().toNextTaskRunIncrementIteration(execution, parentTaskRun.getIteration()));
|
||||
}
|
||||
}
|
||||
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
public static Optional<State.Type> resolveSequentialState(
|
||||
Execution execution,
|
||||
List<ResolvedTask> tasks,
|
||||
List<ResolvedTask> errors,
|
||||
List<ResolvedTask> _finally,
|
||||
TaskRun parentTaskRun,
|
||||
RunContext runContext,
|
||||
boolean allowFailure,
|
||||
boolean allowWarning
|
||||
) {
|
||||
if (ListUtils.emptyOnNull(tasks).stream()
|
||||
.filter(resolvedTask -> !resolvedTask.getTask().getDisabled())
|
||||
.findAny()
|
||||
.isEmpty()) {
|
||||
return Optional.of(State.Type.SUCCESS);
|
||||
}
|
||||
|
||||
return resolveState(
|
||||
execution,
|
||||
tasks,
|
||||
errors,
|
||||
_finally,
|
||||
parentTaskRun,
|
||||
runContext,
|
||||
allowFailure,
|
||||
allowWarning
|
||||
);
|
||||
}
|
||||
|
||||
public static Optional<State.Type> resolveState(
|
||||
Execution execution,
|
||||
List<ResolvedTask> tasks,
|
||||
@@ -213,7 +248,7 @@ public class FlowableUtils {
|
||||
}
|
||||
} else {
|
||||
// first call, the error flow is not ready, we need to notify the parent task that can be failed to init error flows
|
||||
if (execution.hasFailed(tasks, parentTaskRun) || terminalState == State.Type.FAILED) {
|
||||
if (execution.hasFailedNoRetry(tasks, parentTaskRun) || terminalState == State.Type.FAILED) {
|
||||
return Optional.of(execution.guessFinalState(tasks, parentTaskRun, allowFailure, allowWarning, terminalState));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,29 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.Output;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* InputAndOutput could be used to work with flow execution inputs and outputs.
|
||||
*/
|
||||
public interface InputAndOutput {
|
||||
/**
|
||||
* Reads the inputs of a flow execution.
|
||||
*/
|
||||
Map<String, Object> readInputs(FlowInterface flow, Execution execution, Map<String, Object> inputs);
|
||||
|
||||
/**
|
||||
* Processes the outputs of a flow execution (parse them based on their types).
|
||||
*/
|
||||
Map<String, Object> typedOutputs(FlowInterface flow, Execution execution, Map<String, Object> rOutputs);
|
||||
|
||||
/**
|
||||
* Render flow execution outputs.
|
||||
*/
|
||||
Map<String, Object> renderOutputs(List<Output> outputs) throws IllegalVariableEvaluationException;
|
||||
}
|
||||
@@ -0,0 +1,56 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
import io.kestra.core.models.flows.Output;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
class InputAndOutputImpl implements InputAndOutput {
|
||||
private final FlowInputOutput flowInputOutput;
|
||||
private final RunContext runContext;
|
||||
|
||||
InputAndOutputImpl(ApplicationContext applicationContext, RunContext runContext) {
|
||||
this.flowInputOutput = applicationContext.getBean(FlowInputOutput.class);
|
||||
this.runContext = runContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> readInputs(FlowInterface flow, Execution execution, Map<String, Object> inputs) {
|
||||
return flowInputOutput.readExecutionInputs(flow, execution, inputs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> typedOutputs(FlowInterface flow, Execution execution, Map<String, Object> rOutputs) {
|
||||
return flowInputOutput.typedOutputs(flow, execution, rOutputs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> renderOutputs(List<Output> outputs) throws IllegalVariableEvaluationException {
|
||||
if (outputs == null) return Map.of();
|
||||
|
||||
// render required outputs
|
||||
Map<String, Object> outputsById = outputs
|
||||
.stream()
|
||||
.filter(output -> output.getRequired() == null || output.getRequired())
|
||||
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
|
||||
outputsById = runContext.render(outputsById);
|
||||
|
||||
// render optional outputs one by one to catch, log, and skip any error.
|
||||
for (io.kestra.core.models.flows.Output output : outputs) {
|
||||
if (Boolean.FALSE.equals(output.getRequired())) {
|
||||
try {
|
||||
outputsById.putAll(runContext.render(Map.of(output.getId(), output.getValue())));
|
||||
} catch (Exception e) {
|
||||
runContext.logger().warn("Failed to render optional flow output '{}'. Output is ignored.", output.getId(), e);
|
||||
outputsById.put(output.getId(), null);
|
||||
}
|
||||
}
|
||||
}
|
||||
return outputsById;
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import io.kestra.core.encryption.EncryptionService;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.executions.AbstractMetricEntry;
|
||||
import io.kestra.core.models.property.Property;
|
||||
import io.kestra.core.models.property.PropertyContext;
|
||||
@@ -192,5 +193,27 @@ public abstract class RunContext implements PropertyContext {
|
||||
public record FlowInfo(String tenantId, String namespace, String id, Integer revision) {
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated there is no legitimate use case of this method outside the run context internal self-usage, so it should not be part of the interface
|
||||
*/
|
||||
@Deprecated(since = "1.2.0", forRemoval = true)
|
||||
public abstract boolean isInitialized();
|
||||
|
||||
/**
|
||||
* Get access to the ACL checker.
|
||||
* Plugins are responsible for using the ACL checker when they access restricted resources, for example,
|
||||
* when Namespace ACLs are used (EE).
|
||||
*/
|
||||
public abstract AclChecker acl();
|
||||
|
||||
/**
|
||||
* Clone this run context for a specific plugin.
|
||||
* @return a new run context with the plugin configuration of the given plugin.
|
||||
*/
|
||||
public abstract RunContext cloneForPlugin(Plugin plugin);
|
||||
|
||||
/**
|
||||
* @return an InputAndOutput that can be used to work with inputs and outputs.
|
||||
*/
|
||||
public abstract InputAndOutput inputAndOutput();
|
||||
}
|
||||
|
||||
@@ -12,9 +12,10 @@ import io.kestra.core.models.property.PropertyContext;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.plugins.PluginConfigurations;
|
||||
import io.kestra.core.services.FlowService;
|
||||
import io.kestra.core.services.KVStoreService;
|
||||
import io.kestra.core.services.NamespaceService;
|
||||
import io.kestra.core.storages.InternalStorage;
|
||||
import io.kestra.core.storages.NamespaceFactory;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
@@ -48,7 +49,7 @@ public class RunContextFactory {
|
||||
protected StorageInterface storageInterface;
|
||||
|
||||
@Inject
|
||||
protected FlowService flowService;
|
||||
protected NamespaceService namespaceService;
|
||||
|
||||
@Inject
|
||||
protected MetricRegistry metricRegistry;
|
||||
@@ -76,6 +77,9 @@ public class RunContextFactory {
|
||||
@Inject
|
||||
private KVStoreService kvStoreService;
|
||||
|
||||
@Inject
|
||||
private NamespaceFactory namespaceFactory;
|
||||
|
||||
// hacky
|
||||
public RunContextInitializer initializer() {
|
||||
return applicationContext.getBean(RunContextInitializer.class);
|
||||
@@ -103,7 +107,7 @@ public class RunContextFactory {
|
||||
.withLogger(runContextLogger)
|
||||
// Execution
|
||||
.withPluginConfiguration(Map.of())
|
||||
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forExecution(execution), storageInterface, flowService))
|
||||
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forExecution(execution), storageInterface, namespaceService, namespaceFactory))
|
||||
.withVariableRenderer(variableRenderer)
|
||||
.withVariables(runVariableModifier.apply(
|
||||
newRunVariablesBuilder()
|
||||
@@ -133,7 +137,7 @@ public class RunContextFactory {
|
||||
.withLogger(runContextLogger)
|
||||
// Task
|
||||
.withPluginConfiguration(pluginConfigurations.getConfigurationByPluginTypeOrAliases(task.getType(), task.getClass()))
|
||||
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forTask(taskRun), storageInterface, flowService))
|
||||
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forTask(taskRun), storageInterface, namespaceService, namespaceFactory))
|
||||
.withVariables(newRunVariablesBuilder()
|
||||
.withFlow(flow)
|
||||
.withTask(task)
|
||||
@@ -167,14 +171,16 @@ public class RunContextFactory {
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@VisibleForTesting
|
||||
public RunContext of(final FlowInterface flow, final Map<String, Object> variables) {
|
||||
RunContextLogger runContextLogger = new RunContextLogger();
|
||||
return newBuilder()
|
||||
.withLogger(runContextLogger)
|
||||
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forFlow(flow), storageInterface, flowService))
|
||||
.withVariables(variables)
|
||||
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forFlow(flow), storageInterface, namespaceService, namespaceFactory))
|
||||
.withVariables(newRunVariablesBuilder()
|
||||
.withFlow(flow)
|
||||
.withVariables(variables)
|
||||
.build(runContextLogger, PropertyContext.create(this.variableRenderer))
|
||||
)
|
||||
.withSecretInputs(secretInputsFromFlow(flow))
|
||||
.build();
|
||||
}
|
||||
@@ -212,7 +218,8 @@ public class RunContextFactory {
|
||||
}
|
||||
},
|
||||
storageInterface,
|
||||
flowService
|
||||
namespaceService,
|
||||
namespaceFactory
|
||||
))
|
||||
.withVariables(variables)
|
||||
.withTask(task)
|
||||
|
||||
@@ -1,15 +1,14 @@
|
||||
package io.kestra.core.runners;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import io.kestra.core.models.Plugin;
|
||||
import io.kestra.core.models.executions.TaskRun;
|
||||
import io.kestra.core.models.tasks.Task;
|
||||
import io.kestra.core.models.tasks.runners.TaskRunner;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
import io.kestra.core.models.triggers.TriggerContext;
|
||||
import io.kestra.core.plugins.PluginConfigurations;
|
||||
import io.kestra.core.services.FlowService;
|
||||
import io.kestra.core.services.NamespaceService;
|
||||
import io.kestra.core.storages.InternalStorage;
|
||||
import io.kestra.core.storages.NamespaceFactory;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.utils.IdUtils;
|
||||
@@ -44,25 +43,14 @@ public class RunContextInitializer {
|
||||
protected StorageInterface storageInterface;
|
||||
|
||||
@Inject
|
||||
protected FlowService flowService;
|
||||
protected NamespaceFactory namespaceFactory;
|
||||
|
||||
@Inject
|
||||
protected NamespaceService namespaceService;
|
||||
|
||||
@Value("${kestra.encryption.secret-key}")
|
||||
protected Optional<String> secretKey;
|
||||
|
||||
/**
|
||||
* Initializes the given {@link RunContext} for the given {@link Plugin}.
|
||||
*
|
||||
* @param runContext The {@link RunContext} to initialize.
|
||||
* @param plugin The {@link TaskRunner} used for initialization.
|
||||
* @return The {@link RunContext} to initialize
|
||||
*/
|
||||
public DefaultRunContext forPlugin(final DefaultRunContext runContext,
|
||||
final Plugin plugin) {
|
||||
runContext.init(applicationContext);
|
||||
runContext.setPluginConfiguration(pluginConfigurations.getConfigurationByPluginTypeOrAliases(plugin.getType(), plugin.getClass()));
|
||||
return runContext;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes the given {@link RunContext} for the given {@link WorkerTask} for executor.
|
||||
*
|
||||
@@ -135,7 +123,7 @@ public class RunContextInitializer {
|
||||
|
||||
runContext.setVariables(enrichedVariables);
|
||||
runContext.setPluginConfiguration(pluginConfigurations.getConfigurationByPluginTypeOrAliases(task.getType(), task.getClass()));
|
||||
runContext.setStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forTask(taskRun), storageInterface, flowService));
|
||||
runContext.setStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forTask(taskRun), storageInterface, namespaceService, namespaceFactory));
|
||||
runContext.setLogger(runContextLogger);
|
||||
runContext.setTask(task);
|
||||
|
||||
@@ -230,7 +218,8 @@ public class RunContextInitializer {
|
||||
runContextLogger.logger(),
|
||||
context,
|
||||
storageInterface,
|
||||
flowService
|
||||
namespaceService,
|
||||
namespaceFactory
|
||||
);
|
||||
|
||||
runContext.setLogger(runContextLogger);
|
||||
|
||||
@@ -55,11 +55,11 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
||||
|
||||
public RunContextLogger(QueueInterface<LogEntry> logQueue, LogEntry logEntry, org.slf4j.event.Level loglevel, boolean logToFile) {
|
||||
if (logEntry.getTaskId() != null) {
|
||||
this.loggerName = "flow." + logEntry.getFlowId() + "." + logEntry.getTaskId();
|
||||
this.loggerName = baseLoggerName(logEntry) + "." + logEntry.getTaskId();
|
||||
} else if (logEntry.getTriggerId() != null) {
|
||||
this.loggerName = "flow." + logEntry.getFlowId() + "." + logEntry.getTriggerId();
|
||||
this.loggerName = baseLoggerName(logEntry) + "." + logEntry.getTriggerId();
|
||||
} else {
|
||||
this.loggerName = "flow." + logEntry.getFlowId();
|
||||
this.loggerName = baseLoggerName(logEntry);
|
||||
}
|
||||
|
||||
this.logQueue = logQueue;
|
||||
@@ -68,6 +68,10 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
|
||||
this.logToFile = logToFile;
|
||||
}
|
||||
|
||||
private String baseLoggerName(LogEntry logEntry) {
|
||||
return "flow." + logEntry.getTenantId() + "." + logEntry.getNamespace() + "." + logEntry.getFlowId();
|
||||
}
|
||||
|
||||
private static List<LogEntry> logEntry(ILoggingEvent event, String message, org.slf4j.event.Level level, LogEntry logEntry) {
|
||||
Iterable<String> split;
|
||||
|
||||
|
||||
@@ -2,11 +2,13 @@ package io.kestra.core.runners.pebble;
|
||||
|
||||
import io.kestra.core.runners.VariableRenderer;
|
||||
import io.kestra.core.runners.pebble.functions.RenderingFunctionInterface;
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.core.annotation.Nullable;
|
||||
import io.pebbletemplates.pebble.PebbleEngine;
|
||||
import io.pebbletemplates.pebble.extension.Extension;
|
||||
import io.pebbletemplates.pebble.extension.Function;
|
||||
import io.pebbletemplates.pebble.lexer.Syntax;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@@ -18,35 +20,44 @@ import java.util.stream.Collectors;
|
||||
|
||||
@Singleton
|
||||
public class PebbleEngineFactory {
|
||||
|
||||
|
||||
private final ApplicationContext applicationContext;
|
||||
private final VariableRenderer.VariableConfiguration variableConfiguration;
|
||||
|
||||
private final MeterRegistry meterRegistry;
|
||||
|
||||
@Inject
|
||||
public PebbleEngineFactory(ApplicationContext applicationContext, @Nullable VariableRenderer.VariableConfiguration variableConfiguration) {
|
||||
public PebbleEngineFactory(ApplicationContext applicationContext, @Nullable VariableRenderer.VariableConfiguration variableConfiguration, MeterRegistry meterRegistry) {
|
||||
this.applicationContext = applicationContext;
|
||||
this.variableConfiguration = variableConfiguration;
|
||||
this.meterRegistry = meterRegistry;
|
||||
}
|
||||
|
||||
|
||||
public PebbleEngine create() {
|
||||
PebbleEngine.Builder builder = newPebbleEngineBuilder();
|
||||
this.applicationContext.getBeansOfType(Extension.class).forEach(builder::extension);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
||||
public PebbleEngine createWithCustomSyntax(Syntax syntax, Class<? extends Extension> extension) {
|
||||
PebbleEngine.Builder builder = newPebbleEngineBuilder()
|
||||
.syntax(syntax);
|
||||
this.applicationContext.getBeansOfType(extension).forEach(builder::extension);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public PebbleEngine createWithMaskedFunctions(VariableRenderer renderer, final List<String> functionsToMask) {
|
||||
|
||||
|
||||
PebbleEngine.Builder builder = newPebbleEngineBuilder();
|
||||
|
||||
|
||||
this.applicationContext.getBeansOfType(Extension.class).stream()
|
||||
.map(e -> functionsToMask.stream().anyMatch(fun -> e.getFunctions().containsKey(fun))
|
||||
? extensionWithMaskedFunctions(renderer, e, functionsToMask)
|
||||
: e)
|
||||
.forEach(builder::extension);
|
||||
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
||||
private PebbleEngine.Builder newPebbleEngineBuilder() {
|
||||
PebbleEngine.Builder builder = new PebbleEngine.Builder()
|
||||
.registerExtensionCustomizer(ExtensionCustomizer::new)
|
||||
@@ -54,13 +65,15 @@ public class PebbleEngineFactory {
|
||||
.cacheActive(this.variableConfiguration.getCacheEnabled())
|
||||
.newLineTrimming(false)
|
||||
.autoEscaping(false);
|
||||
|
||||
|
||||
if (this.variableConfiguration.getCacheEnabled()) {
|
||||
builder = builder.templateCache(new PebbleLruCache(this.variableConfiguration.getCacheSize()));
|
||||
PebbleLruCache cache = new PebbleLruCache(this.variableConfiguration.getCacheSize());
|
||||
cache.register(meterRegistry);
|
||||
builder = builder.templateCache(cache);
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
|
||||
|
||||
private Extension extensionWithMaskedFunctions(VariableRenderer renderer, Extension initialExtension, List<String> maskedFunctions) {
|
||||
return (Extension) Proxy.newProxyInstance(
|
||||
initialExtension.getClass().getClassLoader(),
|
||||
@@ -74,16 +87,16 @@ public class PebbleEngineFactory {
|
||||
} else if (RenderingFunctionInterface.class.isAssignableFrom(entry.getValue().getClass())) {
|
||||
return Map.entry(entry.getKey(), this.variableRendererProxy(renderer, entry.getValue()));
|
||||
}
|
||||
|
||||
|
||||
return entry;
|
||||
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
}
|
||||
|
||||
|
||||
return method.invoke(initialExtension, methodArgs);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
private Function variableRendererProxy(VariableRenderer renderer, Function initialFunction) {
|
||||
return (Function) Proxy.newProxyInstance(
|
||||
initialFunction.getClass().getClassLoader(),
|
||||
@@ -96,7 +109,7 @@ public class PebbleEngineFactory {
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
private Function maskedFunctionProxy(Function initialFunction) {
|
||||
return (Function) Proxy.newProxyInstance(
|
||||
initialFunction.getClass().getClassLoader(),
|
||||
|
||||
@@ -1,29 +1,29 @@
|
||||
package io.kestra.core.runners.pebble;
|
||||
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.github.benmanes.caffeine.cache.Cache;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.binder.cache.CaffeineCacheMetrics;
|
||||
import io.pebbletemplates.pebble.cache.PebbleCache;
|
||||
import io.pebbletemplates.pebble.template.PebbleTemplate;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.function.Function;
|
||||
|
||||
@Slf4j
|
||||
public class PebbleLruCache implements PebbleCache<Object, PebbleTemplate> {
|
||||
Cache<Object, PebbleTemplate> cache;
|
||||
private final Cache<Object, PebbleTemplate> cache;
|
||||
|
||||
public PebbleLruCache(int maximumSize) {
|
||||
cache = CacheBuilder.newBuilder()
|
||||
cache = Caffeine.newBuilder()
|
||||
.initialCapacity(250)
|
||||
.maximumSize(maximumSize)
|
||||
.recordStats()
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public PebbleTemplate computeIfAbsent(Object key, Function<? super Object, ? extends PebbleTemplate> mappingFunction) {
|
||||
try {
|
||||
return cache.get(key, () -> mappingFunction.apply(key));
|
||||
return cache.get(key, mappingFunction);
|
||||
} catch (Exception e) {
|
||||
// we retry the mapping function in order to let the exception be thrown instead of being capture by cache
|
||||
return mappingFunction.apply(key);
|
||||
@@ -34,4 +34,8 @@ public class PebbleLruCache implements PebbleCache<Object, PebbleTemplate> {
|
||||
public void invalidateAll() {
|
||||
cache.invalidateAll();
|
||||
}
|
||||
|
||||
public void register(MeterRegistry meterRegistry) {
|
||||
CaffeineCacheMetrics.monitor(meterRegistry, cache, "pebble-template");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,11 +2,8 @@ package io.kestra.core.runners.pebble.functions;
|
||||
|
||||
import io.kestra.core.runners.LocalPath;
|
||||
import io.kestra.core.runners.LocalPathFactory;
|
||||
import io.kestra.core.services.FlowService;
|
||||
import io.kestra.core.storages.InternalNamespace;
|
||||
import io.kestra.core.storages.Namespace;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.kestra.core.storages.StorageInterface;
|
||||
import io.kestra.core.services.NamespaceService;
|
||||
import io.kestra.core.storages.*;
|
||||
import io.kestra.core.utils.Slugify;
|
||||
import io.micronaut.context.annotation.Value;
|
||||
import io.pebbletemplates.pebble.error.PebbleException;
|
||||
@@ -36,7 +33,7 @@ abstract class AbstractFileFunction implements Function {
|
||||
private static final Pattern EXECUTION_FILE = Pattern.compile(".*/.*/executions/.*/tasks/.*/.*");
|
||||
|
||||
@Inject
|
||||
protected FlowService flowService;
|
||||
protected NamespaceService namespaceService;
|
||||
|
||||
@Inject
|
||||
protected StorageInterface storageInterface;
|
||||
@@ -44,6 +41,9 @@ abstract class AbstractFileFunction implements Function {
|
||||
@Inject
|
||||
protected LocalPathFactory localPathFactory;
|
||||
|
||||
@Inject
|
||||
protected NamespaceFactory namespaceFactory;
|
||||
|
||||
@Value("${" + LocalPath.ENABLE_FILE_FUNCTIONS_CONFIG + ":true}")
|
||||
protected boolean enableFileProtocol;
|
||||
|
||||
@@ -81,23 +81,21 @@ abstract class AbstractFileFunction implements Function {
|
||||
} else if (str.startsWith(LocalPath.FILE_PROTOCOL)) {
|
||||
fileUri = URI.create(str);
|
||||
namespace = checkEnabledLocalFileAndReturnNamespace(args, flow);
|
||||
} else if(str.startsWith(Namespace.NAMESPACE_FILE_SCHEME)) {
|
||||
URI nsFileUri = URI.create(str);
|
||||
namespace = checkedAllowedNamespaceAndReturnNamespace(args, nsFileUri, tenantId, flow);
|
||||
InternalNamespace internalNamespace = new InternalNamespace(flow.get(TENANT_ID), namespace, storageInterface);
|
||||
fileUri = internalNamespace.get(Path.of(nsFileUri.getPath())).uri();
|
||||
} else if (str.startsWith(Namespace.NAMESPACE_FILE_SCHEME)) {
|
||||
fileUri = URI.create(str);
|
||||
namespace = checkedAllowedNamespaceAndReturnNamespace(args, fileUri, tenantId, flow);
|
||||
} else if (URI_PATTERN.matcher(str).matches()) {
|
||||
// it is an unsupported URI
|
||||
throw new IllegalArgumentException(SCHEME_NOT_SUPPORTED_ERROR.formatted(str));
|
||||
} else {
|
||||
fileUri = URI.create(Namespace.NAMESPACE_FILE_SCHEME + ":///" + str);
|
||||
namespace = (String) Optional.ofNullable(args.get(NAMESPACE)).orElse(flow.get(NAMESPACE));
|
||||
fileUri = URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.namespaceFilePrefix(namespace) + "/" + str);
|
||||
flowService.checkAllowedNamespace(tenantId, namespace, tenantId, flow.get(NAMESPACE));
|
||||
namespaceService.checkAllowedNamespace(tenantId, namespace, tenantId, flow.get(NAMESPACE));
|
||||
}
|
||||
} else {
|
||||
throw new PebbleException(null, "Unable to read the file " + path, lineNumber, self.getName());
|
||||
}
|
||||
return fileFunction(context, fileUri, namespace, tenantId);
|
||||
return fileFunction(context, fileUri, namespace, tenantId, args);
|
||||
} catch (IOException e) {
|
||||
throw new PebbleException(e, e.getMessage(), lineNumber, self.getName());
|
||||
}
|
||||
@@ -110,7 +108,7 @@ abstract class AbstractFileFunction implements Function {
|
||||
|
||||
protected abstract String getErrorMessage();
|
||||
|
||||
protected abstract Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId) throws IOException;
|
||||
protected abstract Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId, Map<String, Object> args) throws IOException;
|
||||
|
||||
boolean isFileUriValid(String namespace, String flowId, String executionId, URI path) {
|
||||
// Internal storage URI should be: kestra:///$namespace/$flowId/executions/$executionId/tasks/$taskName/$taskRunId/$random.ion or kestra:///$namespace/$flowId/executions/$executionId/trigger/$triggerName/$random.ion
|
||||
@@ -177,7 +175,7 @@ abstract class AbstractFileFunction implements Function {
|
||||
// 5. replace '/' with '.'
|
||||
namespace = namespace.replace("/", ".");
|
||||
|
||||
flowService.checkAllowedNamespace(tenantId, namespace, tenantId, fromNamespace);
|
||||
namespaceService.checkAllowedNamespace(tenantId, namespace, tenantId, fromNamespace);
|
||||
|
||||
return namespace;
|
||||
}
|
||||
@@ -198,7 +196,7 @@ abstract class AbstractFileFunction implements Function {
|
||||
// we will transform nsfile URI into a kestra URI so it is handled seamlessly by all functions
|
||||
String customNs = Optional.ofNullable((String) args.get(NAMESPACE)).orElse(nsFileUri.getAuthority());
|
||||
if (customNs != null) {
|
||||
flowService.checkAllowedNamespace(tenantId, customNs, tenantId, flow.get(NAMESPACE));
|
||||
namespaceService.checkAllowedNamespace(tenantId, customNs, tenantId, flow.get(NAMESPACE));
|
||||
}
|
||||
return Optional.ofNullable(customNs).orElse(flow.get(NAMESPACE));
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ package io.kestra.core.runners.pebble.functions;
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.models.tasks.retrys.Exponential;
|
||||
import io.kestra.core.runners.pebble.PebbleUtils;
|
||||
import io.kestra.core.services.LogService;
|
||||
import io.kestra.core.services.ExecutionLogService;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.core.utils.RetryUtils;
|
||||
import io.micronaut.context.annotation.Requires;
|
||||
@@ -23,14 +23,11 @@ import java.util.Map;
|
||||
@Requires(property = "kestra.repository.type")
|
||||
public class ErrorLogsFunction implements Function {
|
||||
@Inject
|
||||
private LogService logService;
|
||||
private ExecutionLogService logService;
|
||||
|
||||
@Inject
|
||||
private PebbleUtils pebbleUtils;
|
||||
|
||||
@Inject
|
||||
private RetryUtils retryUtils;
|
||||
|
||||
@Override
|
||||
public List<String> getArgumentNames() {
|
||||
return Collections.emptyList();
|
||||
@@ -46,7 +43,7 @@ public class ErrorLogsFunction implements Function {
|
||||
Map<String, String> flow = (Map<String, String>) context.getVariable("flow");
|
||||
Map<String, String> execution = (Map<String, String>) context.getVariable("execution");
|
||||
|
||||
RetryUtils.Instance<List<LogEntry>, Throwable> retry = retryUtils.of(Exponential.builder()
|
||||
RetryUtils.Instance<List<LogEntry>, Throwable> retry = RetryUtils.of(Exponential.builder()
|
||||
.delayFactor(2.0)
|
||||
.interval(Duration.ofMillis(100))
|
||||
.maxInterval(Duration.ofSeconds(1))
|
||||
|
||||
@@ -1,22 +1,30 @@
|
||||
package io.kestra.core.runners.pebble.functions;
|
||||
|
||||
import io.kestra.core.runners.LocalPath;
|
||||
import io.kestra.core.storages.Namespace;
|
||||
import io.kestra.core.storages.NamespaceFile;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.pebbletemplates.pebble.template.EvaluationContext;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Map;
|
||||
|
||||
@Singleton
|
||||
public class FileExistsFunction extends AbstractFileFunction {
|
||||
private static final String ERROR_MESSAGE = "The 'fileExists' function expects an argument 'path' that is a path to the internal storage URI.";
|
||||
|
||||
@Override
|
||||
protected Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId) throws IOException {
|
||||
protected Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId, Map<String, Object> args) throws IOException {
|
||||
return switch (path.getScheme()) {
|
||||
case StorageContext.KESTRA_SCHEME -> storageInterface.exists(tenantId, namespace, path);
|
||||
case LocalPath.FILE_SCHEME -> localPathFactory.createLocalPath().exists(path);
|
||||
case Namespace.NAMESPACE_FILE_SCHEME -> {
|
||||
Namespace namespaceStorage = namespaceFactory.of(tenantId, namespace, storageInterface);
|
||||
yield namespaceStorage.exists(NamespaceFile.normalize(Path.of(path.getPath()), true));
|
||||
}
|
||||
default -> throw new IllegalArgumentException(SCHEME_NOT_SUPPORTED_ERROR.formatted(path));
|
||||
};
|
||||
}
|
||||
|
||||
@@ -2,19 +2,23 @@ package io.kestra.core.runners.pebble.functions;
|
||||
|
||||
import io.kestra.core.runners.LocalPath;
|
||||
import io.kestra.core.storages.FileAttributes;
|
||||
import io.kestra.core.storages.Namespace;
|
||||
import io.kestra.core.storages.NamespaceFile;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.pebbletemplates.pebble.template.EvaluationContext;
|
||||
import jakarta.inject.Singleton;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.attribute.BasicFileAttributes;
|
||||
import java.util.Map;
|
||||
|
||||
@Singleton
|
||||
public class FileSizeFunction extends AbstractFileFunction {
|
||||
private static final String ERROR_MESSAGE = "The 'fileSize' function expects an argument 'path' that is a path to the internal storage URI.";
|
||||
|
||||
@Override
|
||||
protected Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId) throws IOException {
|
||||
protected Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId, Map<String, Object> args) throws IOException {
|
||||
return switch (path.getScheme()) {
|
||||
case StorageContext.KESTRA_SCHEME -> {
|
||||
FileAttributes fileAttributes = storageInterface.getAttributes(tenantId, namespace, path);
|
||||
@@ -24,6 +28,12 @@ public class FileSizeFunction extends AbstractFileFunction {
|
||||
BasicFileAttributes fileAttributes = localPathFactory.createLocalPath().getAttributes(path);
|
||||
yield fileAttributes.size();
|
||||
}
|
||||
case Namespace.NAMESPACE_FILE_SCHEME -> {
|
||||
FileAttributes fileAttributes = namespaceFactory
|
||||
.of(tenantId, namespace, storageInterface)
|
||||
.getFileMetadata(NamespaceFile.normalize(Path.of(path.getPath()), true));
|
||||
yield fileAttributes.getSize();
|
||||
}
|
||||
default -> throw new IllegalArgumentException(SCHEME_NOT_SUPPORTED_ERROR.formatted(path));
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1,19 +1,24 @@
|
||||
package io.kestra.core.runners.pebble.functions;
|
||||
|
||||
import io.kestra.core.runners.LocalPath;
|
||||
import io.kestra.core.storages.FileAttributes;
|
||||
import io.kestra.core.storages.Namespace;
|
||||
import io.kestra.core.storages.NamespaceFile;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.pebbletemplates.pebble.template.EvaluationContext;
|
||||
import jakarta.inject.Singleton;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Map;
|
||||
|
||||
@Singleton
|
||||
public class IsFileEmptyFunction extends AbstractFileFunction {
|
||||
private static final String ERROR_MESSAGE = "The 'isFileEmpty' function expects an argument 'path' that is a path to a namespace file or an internal storage URI.";
|
||||
|
||||
@Override
|
||||
protected Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId) throws IOException {
|
||||
protected Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId, Map<String, Object> args) throws IOException {
|
||||
return switch (path.getScheme()) {
|
||||
case StorageContext.KESTRA_SCHEME -> {
|
||||
try (InputStream inputStream = storageInterface.get(tenantId, namespace, path)) {
|
||||
@@ -27,6 +32,12 @@ public class IsFileEmptyFunction extends AbstractFileFunction {
|
||||
yield inputStream.read(buffer, 0, 1) <= 0;
|
||||
}
|
||||
}
|
||||
case Namespace.NAMESPACE_FILE_SCHEME -> {
|
||||
FileAttributes fileAttributes = namespaceFactory
|
||||
.of(tenantId, namespace, storageInterface)
|
||||
.getFileMetadata(NamespaceFile.normalize(Path.of(path.getPath()), true));
|
||||
yield fileAttributes.getSize() <= 0;
|
||||
}
|
||||
default -> throw new IllegalArgumentException(SCHEME_NOT_SUPPORTED_ERROR.formatted(path));
|
||||
};
|
||||
}
|
||||
@@ -35,4 +46,4 @@ public class IsFileEmptyFunction extends AbstractFileFunction {
|
||||
protected String getErrorMessage() {
|
||||
return ERROR_MESSAGE;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,20 +1,37 @@
|
||||
package io.kestra.core.runners.pebble.functions;
|
||||
|
||||
import io.kestra.core.runners.LocalPath;
|
||||
import io.kestra.core.storages.Namespace;
|
||||
import io.kestra.core.storages.NamespaceFile;
|
||||
import io.kestra.core.storages.StorageContext;
|
||||
import io.pebbletemplates.pebble.template.EvaluationContext;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@Singleton
|
||||
public class ReadFileFunction extends AbstractFileFunction {
|
||||
public static final String VERSION = "version";
|
||||
|
||||
private static final String ERROR_MESSAGE = "The 'read' function expects an argument 'path' that is a path to a namespace file or an internal storage URI.";
|
||||
|
||||
@Override
|
||||
protected Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId) throws IOException {
|
||||
public List<String> getArgumentNames() {
|
||||
return Stream.concat(
|
||||
super.getArgumentNames().stream(),
|
||||
Stream.of(VERSION)
|
||||
).toList();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object fileFunction(EvaluationContext context, URI path, String namespace, String tenantId, Map<String, Object> args) throws IOException {
|
||||
return switch (path.getScheme()) {
|
||||
case StorageContext.KESTRA_SCHEME -> {
|
||||
try (InputStream inputStream = storageInterface.get(tenantId, namespace, path)) {
|
||||
@@ -26,12 +43,30 @@ public class ReadFileFunction extends AbstractFileFunction {
|
||||
yield new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
|
||||
}
|
||||
}
|
||||
case Namespace.NAMESPACE_FILE_SCHEME -> {
|
||||
try (InputStream inputStream = contentInputStream(path, namespace, tenantId, args)) {
|
||||
yield new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
|
||||
}
|
||||
}
|
||||
default -> throw new IllegalArgumentException(SCHEME_NOT_SUPPORTED_ERROR.formatted(path));
|
||||
};
|
||||
}
|
||||
|
||||
private InputStream contentInputStream(URI path, String namespace, String tenantId, Map<String, Object> args) throws IOException {
|
||||
Namespace namespaceStorage = namespaceFactory.of(tenantId, namespace, storageInterface);
|
||||
|
||||
if (args.containsKey(VERSION)) {
|
||||
return namespaceStorage.getFileContent(
|
||||
NamespaceFile.normalize(Path.of(path.getPath()), true),
|
||||
Integer.parseInt(args.get(VERSION).toString())
|
||||
);
|
||||
}
|
||||
|
||||
return namespaceStorage.getFileContent(NamespaceFile.normalize(Path.of(path.getPath()), true));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getErrorMessage() {
|
||||
return ERROR_MESSAGE;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import io.kestra.core.secret.SecretNotFoundException;
|
||||
import io.kestra.core.secret.SecretService;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.services.FlowService;
|
||||
import io.kestra.core.services.NamespaceService;
|
||||
import io.pebbletemplates.pebble.error.PebbleException;
|
||||
import io.pebbletemplates.pebble.extension.Function;
|
||||
import io.pebbletemplates.pebble.template.EvaluationContext;
|
||||
@@ -36,7 +37,7 @@ public class SecretFunction implements Function {
|
||||
private SecretService secretService;
|
||||
|
||||
@Inject
|
||||
private FlowService flowService;
|
||||
private NamespaceService namespaceService;
|
||||
|
||||
@Override
|
||||
public List<String> getArgumentNames() {
|
||||
@@ -56,7 +57,7 @@ public class SecretFunction implements Function {
|
||||
if (namespace == null) {
|
||||
namespace = flowNamespace;
|
||||
} else {
|
||||
flowService.checkAllowedNamespace(flowTenantId, namespace, flowTenantId, flowNamespace);
|
||||
namespaceService.checkAllowedNamespace(flowTenantId, namespace, flowTenantId, flowNamespace);
|
||||
}
|
||||
|
||||
try {
|
||||
|
||||
@@ -26,7 +26,14 @@ public class ListOrMapOfLabelDeserializer extends JsonDeserializer<List<Label>>
|
||||
else if (p.hasToken(JsonToken.START_ARRAY)) {
|
||||
// deserialize as list
|
||||
List<Map<String, String>> ret = ctxt.readValue(p, List.class);
|
||||
return ret.stream().map(map -> new Label(map.get("key"), map.get("value"))).toList();
|
||||
return ret.stream().map(map -> {
|
||||
Object value = map.get("value");
|
||||
if (isAllowedType(value)) {
|
||||
return new Label(map.get("key"), String.valueOf(value));
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unsupported type for key: " + map.get("key") + ", value: " + value);
|
||||
}
|
||||
}).toList();
|
||||
}
|
||||
else if (p.hasToken(JsonToken.START_OBJECT)) {
|
||||
// deserialize as map
|
||||
|
||||
@@ -35,6 +35,10 @@ public final class YamlParser {
|
||||
return read(input, cls, type(cls));
|
||||
}
|
||||
|
||||
public static <T> T parse(String input, Class<T> cls, Boolean strict) {
|
||||
return strict ? read(input, cls, type(cls)) : readNonStrict(input, cls, type(cls));
|
||||
}
|
||||
|
||||
public static <T> T parse(Map<String, Object> input, Class<T> cls, Boolean strict) {
|
||||
ObjectMapper currentMapper = strict ? STRICT_MAPPER : NON_STRICT_MAPPER;
|
||||
|
||||
@@ -81,7 +85,31 @@ public final class YamlParser {
|
||||
throw toConstraintViolationException(input, resource, e);
|
||||
}
|
||||
}
|
||||
|
||||
private static <T> T readNonStrict(String input, Class<T> objectClass, String resource) {
|
||||
try {
|
||||
return NON_STRICT_MAPPER.readValue(input, objectClass);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw toConstraintViolationException(input, resource, e);
|
||||
}
|
||||
}
|
||||
private static String formatYamlErrorMessage(String originalMessage, JsonProcessingException e) {
|
||||
StringBuilder friendlyMessage = new StringBuilder();
|
||||
if (originalMessage.contains("Expected a field name")) {
|
||||
friendlyMessage.append("YAML syntax error: Invalid structure. Check indentation and ensure all fields are properly formatted.");
|
||||
} else if (originalMessage.contains("MappingStartEvent")) {
|
||||
friendlyMessage.append("YAML syntax error: Unexpected mapping start. Verify that scalar values are properly quoted if needed.");
|
||||
} else if (originalMessage.contains("Scalar value")) {
|
||||
friendlyMessage.append("YAML syntax error: Expected a simple value but found complex structure. Check for unquoted special characters.");
|
||||
} else {
|
||||
friendlyMessage.append("YAML parsing error: ").append(originalMessage.replaceAll("org\\.yaml\\.snakeyaml.*", "").trim());
|
||||
}
|
||||
if (e.getLocation() != null) {
|
||||
int line = e.getLocation().getLineNr();
|
||||
friendlyMessage.append(String.format(" (at line %d)", line));
|
||||
}
|
||||
// Return a generic but cleaner message for other YAML errors
|
||||
return friendlyMessage.toString();
|
||||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T> ConstraintViolationException toConstraintViolationException(T target, String resource, JsonProcessingException e) {
|
||||
if (e.getCause() instanceof ConstraintViolationException constraintViolationException) {
|
||||
@@ -121,11 +149,12 @@ public final class YamlParser {
|
||||
)
|
||||
));
|
||||
} else {
|
||||
String userFriendlyMessage = formatYamlErrorMessage(e.getMessage(), e);
|
||||
return new ConstraintViolationException(
|
||||
"Illegal " + resource + " source: " + e.getMessage(),
|
||||
"Illegal " + resource + " source: " + userFriendlyMessage,
|
||||
Collections.singleton(
|
||||
ManualConstraintViolation.of(
|
||||
e.getCause() == null ? e.getMessage() : e.getMessage() + "\nCaused by: " + e.getCause().getMessage(),
|
||||
userFriendlyMessage,
|
||||
target,
|
||||
(Class<T>) target.getClass(),
|
||||
"yaml",
|
||||
@@ -136,4 +165,3 @@ public final class YamlParser {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,7 +4,6 @@ import com.cronutils.utils.VisibleForTesting;
|
||||
import io.kestra.core.exceptions.InternalException;
|
||||
import io.kestra.core.models.conditions.Condition;
|
||||
import io.kestra.core.models.conditions.ConditionContext;
|
||||
import io.kestra.core.models.conditions.ScheduleCondition;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.Flow;
|
||||
import io.kestra.core.models.flows.FlowInterface;
|
||||
@@ -65,16 +64,6 @@ public class ConditionService {
|
||||
return this.valid(flow, conditions, conditionContext);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that all conditions are valid.
|
||||
* Warning, this method throws if a condition cannot be evaluated.
|
||||
*/
|
||||
public boolean isValid(List<ScheduleCondition> conditions, ConditionContext conditionContext) throws InternalException {
|
||||
return conditions
|
||||
.stream()
|
||||
.allMatch(throwPredicate(condition -> condition.test(conditionContext)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that all conditions are valid.
|
||||
* Warning, this method throws if a condition cannot be evaluated.
|
||||
|
||||
@@ -2,12 +2,15 @@ package io.kestra.core.services;
|
||||
|
||||
import io.kestra.core.models.executions.LogEntry;
|
||||
import io.kestra.core.repositories.LogRepositoryInterface;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import io.micronaut.data.model.Sort;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.InputStream;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
@@ -17,9 +20,42 @@ import java.util.stream.Stream;
|
||||
*/
|
||||
@Singleton
|
||||
public class ExecutionLogService {
|
||||
|
||||
private final LogRepositoryInterface logRepository;
|
||||
|
||||
@Inject
|
||||
private LogRepositoryInterface logRepository;
|
||||
public ExecutionLogService(LogRepositoryInterface logRepository) {
|
||||
this.logRepository = logRepository;
|
||||
}
|
||||
|
||||
/**
|
||||
* Purges log entries matching the given criteria.
|
||||
*
|
||||
* @param tenantId the tenant identifier
|
||||
* @param namespace the namespace of the flow
|
||||
* @param flowId the flow identifier
|
||||
* @param executionId the execution identifier
|
||||
* @param logLevels the list of log levels to delete
|
||||
* @param startDate the start of the date range
|
||||
* @param endDate the end of the date range.
|
||||
* @return the number of log entries deleted
|
||||
*/
|
||||
public int purge(String tenantId, String namespace, String flowId, String executionId, List<Level> logLevels, ZonedDateTime startDate, ZonedDateTime endDate) {
|
||||
return logRepository.deleteByQuery(tenantId, namespace, flowId, executionId, logLevels, startDate, endDate);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Fetches the error logs of an execution.
|
||||
* <p>
|
||||
* This method limits the results to the first 25 error logs, ordered by timestamp asc.
|
||||
*
|
||||
* @return the log entries
|
||||
*/
|
||||
public List<LogEntry> errorLogs(String tenantId, String executionId) {
|
||||
return logRepository.findByExecutionId(tenantId, executionId, Level.ERROR, Pageable.from(1, 25, Sort.of(Sort.Order.asc("timestamp"))));
|
||||
}
|
||||
|
||||
public InputStream getExecutionLogsAsStream(String tenantId,
|
||||
String executionId,
|
||||
Level minLevel,
|
||||
|
||||
@@ -754,7 +754,7 @@ public class ExecutionService {
|
||||
var parentTaskRun = execution.findTaskRunByTaskRunId(taskRun.getParentTaskRunId());
|
||||
Execution newExecution = execution;
|
||||
if (parentTaskRun.getState().getCurrent() != State.Type.KILLED) {
|
||||
newExecution = newExecution.withTaskRun(parentTaskRun.withState(State.Type.KILLED));
|
||||
newExecution = newExecution.withTaskRun(parentTaskRun.withStateAndAttempt(State.Type.KILLED));
|
||||
}
|
||||
if (parentTaskRun.getParentTaskRunId() != null) {
|
||||
return killParentTaskruns(parentTaskRun, newExecution);
|
||||
|
||||
@@ -2,8 +2,10 @@ package io.kestra.core.services;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import io.kestra.core.exceptions.FlowProcessingException;
|
||||
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
|
||||
import io.kestra.core.models.executions.Execution;
|
||||
import io.kestra.core.models.flows.*;
|
||||
import io.kestra.core.models.flows.check.Check;
|
||||
import io.kestra.core.models.tasks.RunnableTask;
|
||||
import io.kestra.core.models.topologies.FlowTopology;
|
||||
import io.kestra.core.models.triggers.AbstractTrigger;
|
||||
@@ -12,10 +14,13 @@ import io.kestra.core.models.validations.ValidateConstraintViolation;
|
||||
import io.kestra.core.plugins.PluginRegistry;
|
||||
import io.kestra.core.repositories.FlowRepositoryInterface;
|
||||
import io.kestra.core.repositories.FlowTopologyRepositoryInterface;
|
||||
import io.kestra.core.runners.RunContext;
|
||||
import io.kestra.core.runners.RunContextFactory;
|
||||
import io.kestra.core.serializers.JacksonMapper;
|
||||
import io.kestra.core.utils.ListUtils;
|
||||
import io.kestra.plugin.core.flow.Pause;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Provider;
|
||||
import jakarta.inject.Singleton;
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -54,6 +59,9 @@ public class FlowService {
|
||||
@Inject
|
||||
Optional<FlowTopologyRepositoryInterface> flowTopologyRepository;
|
||||
|
||||
@Inject
|
||||
Provider<RunContextFactory> runContextFactory; // Lazy init: avoid circular dependency error.
|
||||
|
||||
/**
|
||||
* Validates and creates the given flow.
|
||||
* <p>
|
||||
@@ -84,6 +92,57 @@ public class FlowService {
|
||||
return flowRepository
|
||||
.orElseThrow(() -> new IllegalStateException("Cannot perform operation on flow. Cause: No FlowRepository"));
|
||||
}
|
||||
private static String formatValidationError(String message) {
|
||||
if (message.startsWith("Illegal flow source:")) {
|
||||
// Already formatted by YamlParser, return as-is
|
||||
return message;
|
||||
}
|
||||
// For other validation errors, provide context
|
||||
return "Validation error: " + message;
|
||||
}
|
||||
/**
|
||||
* Evaluates all checks defined in the given flow using the provided inputs.
|
||||
* <p>
|
||||
* Each check's {@link Check#getCondition()} is evaluated in the context of the flow.
|
||||
* If a condition evaluates to {@code false} or fails to evaluate due to a
|
||||
* variable error, the corresponding {@link Check} is added to the returned list.
|
||||
* </p>
|
||||
*
|
||||
* @param flow the flow containing the checks to evaluate
|
||||
* @param inputs the input values used when evaluating the conditions
|
||||
* @return a list of checks whose conditions evaluated to {@code false} or failed to evaluate
|
||||
*/
|
||||
public List<Check> getFailedChecks(Flow flow, Map<String, Object> inputs) {
|
||||
if (!ListUtils.isEmpty(flow.getChecks())) {
|
||||
RunContext runContext = runContextFactory.get().of(flow, Map.of("inputs", inputs));
|
||||
List<Check> falseConditions = new ArrayList<>();
|
||||
for (Check check : flow.getChecks()) {
|
||||
try {
|
||||
boolean result = Boolean.TRUE.equals(runContext.renderTyped(check.getCondition()));
|
||||
if (!result) {
|
||||
falseConditions.add(check);
|
||||
}
|
||||
} catch (IllegalVariableEvaluationException e) {
|
||||
log.debug("[tenant: {}] [namespace: {}] [flow: {}] Failed to evaluate check condition. Cause.: {}",
|
||||
flow.getTenantId(),
|
||||
flow.getNamespace(),
|
||||
flow.getId(),
|
||||
e.getMessage(),
|
||||
e
|
||||
);
|
||||
falseConditions.add(Check
|
||||
.builder()
|
||||
.message("Failed to evaluate check condition. Cause: " + e.getMessage())
|
||||
.behavior(Check.Behavior.BLOCK_EXECUTION)
|
||||
.style(Check.Style.ERROR)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
}
|
||||
return falseConditions;
|
||||
}
|
||||
return List.of();
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the given flow source.
|
||||
@@ -122,10 +181,12 @@ public class FlowService {
|
||||
modelValidator.validate(pluginDefaultService.injectAllDefaults(flow, false));
|
||||
|
||||
} catch (ConstraintViolationException e) {
|
||||
validateConstraintViolationBuilder.constraints(e.getMessage());
|
||||
String friendlyMessage = formatValidationError(e.getMessage());
|
||||
validateConstraintViolationBuilder.constraints(friendlyMessage);
|
||||
} catch (FlowProcessingException e) {
|
||||
if (e.getCause() instanceof ConstraintViolationException) {
|
||||
validateConstraintViolationBuilder.constraints(e.getMessage());
|
||||
if (e.getCause() instanceof ConstraintViolationException cve) {
|
||||
String friendlyMessage = formatValidationError(cve.getMessage());
|
||||
validateConstraintViolationBuilder.constraints(friendlyMessage);
|
||||
} else {
|
||||
Throwable cause = e.getCause() != null ? e.getCause() : e;
|
||||
validateConstraintViolationBuilder.constraints("Unable to validate the flow: " + cause.getMessage());
|
||||
@@ -456,50 +517,6 @@ public class FlowService {
|
||||
return flowRepository.get().delete(flow);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true if the namespace is allowed from the namespace denoted by 'fromTenant' and 'fromNamespace'.
|
||||
* As namespace restriction is an EE feature, this will always return true in OSS.
|
||||
*/
|
||||
public boolean isAllowedNamespace(String tenant, String namespace, String fromTenant, String fromNamespace) {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that the namespace is allowed from the namespace denoted by 'fromTenant' and 'fromNamespace'.
|
||||
* If not, throw an IllegalArgumentException.
|
||||
*/
|
||||
public void checkAllowedNamespace(String tenant, String namespace, String fromTenant, String fromNamespace) {
|
||||
if (!isAllowedNamespace(tenant, namespace, fromTenant, fromNamespace)) {
|
||||
throw new IllegalArgumentException("Namespace " + namespace + " is not allowed.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true if the namespace is allowed from all the namespace in the 'fromTenant' tenant.
|
||||
* As namespace restriction is an EE feature, this will always return true in OSS.
|
||||
*/
|
||||
public boolean areAllowedAllNamespaces(String tenant, String fromTenant, String fromNamespace) {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that the namespace is allowed from all the namespace in the 'fromTenant' tenant.
|
||||
* If not, throw an IllegalArgumentException.
|
||||
*/
|
||||
public void checkAllowedAllNamespaces(String tenant, String fromTenant, String fromNamespace) {
|
||||
if (!areAllowedAllNamespaces(tenant, fromTenant, fromNamespace)) {
|
||||
throw new IllegalArgumentException("All namespaces are not allowed, you should either filter on a namespace or configure all namespaces to allow your namespace.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true if require existing namespace is enabled and the namespace didn't already exist.
|
||||
* As namespace management is an EE feature, this will always return false in OSS.
|
||||
*/
|
||||
public boolean requireExistingNamespace(String tenant, String namespace) {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the executable flow for the given namespace, id, and revision.
|
||||
* Warning: this method bypasses ACL so someone with only execution right can create a flow execution
|
||||
@@ -571,4 +588,4 @@ public class FlowService {
|
||||
private IllegalStateException noRepositoryException() {
|
||||
return new IllegalStateException("No repository found. Make sure the `kestra.repository.type` property is set.");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -20,9 +20,6 @@ public class KVStoreService {
|
||||
@Inject
|
||||
private StorageInterface storageInterface;
|
||||
|
||||
@Inject
|
||||
private FlowService flowService;
|
||||
|
||||
@Inject
|
||||
private NamespaceService namespaceService;
|
||||
|
||||
@@ -38,7 +35,7 @@ public class KVStoreService {
|
||||
boolean isNotSameNamespace = fromNamespace != null && !namespace.equals(fromNamespace);
|
||||
if (isNotSameNamespace && isNotParentNamespace(namespace, fromNamespace)) {
|
||||
try {
|
||||
flowService.checkAllowedNamespace(tenant, namespace, tenant, fromNamespace);
|
||||
namespaceService.checkAllowedNamespace(tenant, namespace, tenant, fromNamespace);
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new KVStoreException(String.format(
|
||||
"Cannot access the KV store. Access to '%s' namespace is not allowed from '%s'.", namespace, fromNamespace)
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user