Compare commits

...

134 Commits

Author SHA1 Message Date
YannC.
27f0bfe1ac chore(version): upgrade version to v0.17.26 2025-05-20 14:46:02 +02:00
Loïc Mathieu
0143faed71 fix(system)*: reset the trigger into the KafkaScheduler instead of the ExecutorMain 2025-05-19 12:05:34 +02:00
brian.mulier
20891bf911 chore: upgrade to version 0.17.25 2025-04-09 18:24:27 +02:00
brian.mulier
093ca322bb fix(core)!: prevent failing execution in case of duplicate label upon inheritance 2025-04-09 18:23:56 +02:00
nKwiatkowski
b3b788ca38 feat(Unit Tests): add assertj dependency 2025-04-08 16:06:24 +02:00
nKwiatkowski
a3c85e3c19 fix(test): change EOL centOS docker image 2025-03-19 14:24:06 +01:00
YannC
debca279c0 fix(): align to EE 2025-03-06 10:39:21 +01:00
Florian Hussonnois
254486df63 chore: upgrade to version 0.17.24 2025-01-29 10:59:31 +01:00
Florian Hussonnois
5e6a22c78c fix(webserver): ensure queues are not closed in nioEventLoop 2025-01-29 10:49:00 +01:00
brian.mulier
5613f74b93 chore(version): version 0.17.23 2024-12-19 11:29:38 +01:00
Loïc Mathieu
6a915cb08b chore(version): version v0.17.22 2024-12-13 11:17:44 +01:00
Loïc Mathieu
3056e9c402 feat(core,jdbc): small trigger / scheduler improvements 2024-12-13 11:16:30 +01:00
Loïc Mathieu
dc7fef20b6 Revert "feat(core): remove the execution state from the scheduler (#1588)"
This reverts commit f7d3d0bcd4.
2024-12-13 10:48:54 +01:00
Loïc Mathieu
678205cfc0 fix(core): serialize default inputs 2024-12-13 10:35:09 +01:00
YannC
87acffa052 chore: upgrade version to v0.17.21 2024-10-03 14:27:29 +02:00
Yoann Vernageau
c7f8132bfe feat(core): add label rendering when an execution is triggered (flow and schedule only) (#5025)
* feat: add label rendering when an execution is triggered (flow and schedule only)

* fix: handle IllegalVariableEvaluationException when rendering

* test: add unit tests

* fix: omit label if rendering fails
2024-10-03 14:18:47 +02:00
Loïc Mathieu
af2a73f397 chore: version v0.17.20 2024-09-19 14:28:49 +02:00
Loïc Mathieu
b0cff46b81 chore: version 0.17.19 2024-08-22 16:32:42 +02:00
Loïc Mathieu
5ec053a824 feat(jdbc): avoid loading all excutions in memory 2024-08-22 16:32:42 +02:00
Loïc Mathieu
c2cf0a90e5 feat(jdbc): avoid loading all excutions in memory
Fixes https://github.com/kestra-io/kestra-ee/issues/1262
2024-08-22 16:32:42 +02:00
YannC
0303e62b3d fix(core): In nested flowable, return 0 instead of null output for iterationCount in waitFor task (#4700)
close #4657
2024-08-22 16:32:41 +02:00
Loïc Mathieu
96b67b73bd fix(jdbc-postgres): remove not-needed serialization in Postgres queue 2024-08-22 16:32:41 +02:00
Loïc Mathieu
6cced7a9e7 chore: upgrade to version 0.17.18 2024-08-08 17:46:14 +02:00
Miloš Paunović
9259a805d8 fix(ui): graciously handle situation with no tutorial flows loaded (#4609) 2024-08-08 17:35:57 +02:00
eric
595459fc12 fix(ui): improve date handling per locale (#4600) 2024-08-08 17:33:24 +02:00
YannC
42a637de7c chore: upgrade to 0.17.17 2024-08-01 14:22:10 +02:00
YannC
ba7d4d9501 fix(): allow create flow and allow save when namespace is allowed (#4512)
close kestraio/kestra-ee#1443
2024-08-01 14:20:50 +02:00
Miloš Paunović
76b5022c08 fix(ui): casting boolean values on resume dialog (#4498) 2024-08-01 14:20:43 +02:00
Miloš Paunović
1620c21c36 fix(ui): show tabs on blueprints for custom ones (#4493) 2024-08-01 14:20:37 +02:00
Miloš Paunović
202c321a8a fix(ui): amend editor height under namespace tabs (#4492) 2024-08-01 14:20:19 +02:00
Miloš Paunović
ea0b4e7469 fix(ui): handle invalid task run when showing logs fro execution topology view (#4482) 2024-08-01 14:20:10 +02:00
Miloš Paunović
c0975773a3 fix(ui): filter out falsy values for icons list (#4481) 2024-08-01 14:19:35 +02:00
Miloš Paunović
23e7af0d77 fix(ui): properly set key attribute to log-line component to render required content (#4468) 2024-08-01 14:19:02 +02:00
Miloš Paunović
dcfc9acf74 fix(ui): properly update low code editor on metadata editing (#4464) 2024-08-01 14:18:03 +02:00
Loïc Mathieu
b790d5e1d1 chore: remove -break-system-packages 2024-07-30 16:27:46 +02:00
Loïc Mathieu
5213945a41 chore: upgrade to 0.17.16 2024-07-30 10:33:34 +02:00
Loïc Mathieu
150eed2eff chore: use 21-jre-jammy base image 2024-07-30 10:32:35 +02:00
Loïc Mathieu
887a3a5dc4 chore: update to version 0.17.15 2024-07-29 09:34:02 +02:00
Miloš Paunović
d7109aa375 fix(ui): show proper code value in blueprint detail editor (#4444) 2024-07-29 09:33:20 +02:00
Loïc Mathieu
f2a33ebbca Revert "fix(script): task runner override"
This reverts commit b2d04925cb.
2024-07-29 09:32:37 +02:00
YannC
b130ac8de9 fix: dockerfile python --break-system-packages option 2024-07-26 08:58:05 +02:00
YannC
8f43ec13f3 chore: update to version 0.17.14 2024-07-25 18:42:15 +02:00
Loïc Mathieu
5ae985ab16 fix(core): HTTP tests 2024-07-25 18:42:15 +02:00
Loïc Mathieu
b2d04925cb fix(script): task runner override
Lombok is buggy when working with `@Builder.Default` it didn't take into accont overrides from properties so you should use getter everywhere.

Fixes https://github.com/kestra-io/plugin-dbt/issues/114
2024-07-25 14:03:51 +02:00
Miloš Paunović
37ff254c9c fix(ui): passing valid attributes to method which handles edit event in low code editor (#4409) 2024-07-25 14:02:24 +02:00
Miloš Paunović
7e968d16b6 fix(ui): properly load flows for editor 2024-07-25 14:02:19 +02:00
Frank Tianyu Zeng
2e4bf3338a chore(ui): enhanced email and password length constraints in basic authentication (#4261) 2024-07-25 14:00:17 +02:00
Miloš Paunović
0f3c455afc fix(ui): allowing files without extensions to be created (#4390) 2024-07-25 13:59:57 +02:00
Miloš Paunović
7439ea4a66 fix(ui): only stringify inputs if they're not strings already (#4391)
* fix(ui): only stringify inputs if they're not strings already

* chore(ui): removed obsolete code
2024-07-25 13:59:20 +02:00
Loïc Mathieu
6546ce49f6 fix(ui): template migration guide has moved 2024-07-25 13:59:17 +02:00
Loïc Mathieu
57bc235db6 chore: update to version 0.17.13 2024-07-18 14:42:42 +02:00
YannC
2f416daac0 fix: allow null enum when required is false 2024-07-18 14:25:53 +02:00
Miloš Paunović
97779f3bc4 fix(ui): handle file names without extensions (#4352) 2024-07-18 14:25:31 +02:00
Loïc Mathieu
54c4f1d702 fix(ui): proprely handle json type inputs when prefilling executions (#4333) 2024-07-18 14:25:09 +02:00
Miloš Paunović
f2cc5c0da6 fix(ui): properly handle array input types (#4331) 2024-07-18 14:23:54 +02:00
Yoann Vernageau
c2fc728414 fix(core): keep key order when rendering a Map 2024-07-18 14:23:03 +02:00
Loïc Mathieu
90492354e2 fix: merge issue 2024-07-11 15:00:19 +02:00
Loïc Mathieu
856a5f5a73 chore: upgrade to version v0.17.12 2024-07-11 14:33:02 +02:00
Loïc Mathieu
f09fa74129 fix(core, jdbc): purge deleted executions
part-of: https://github.com/kestra-io/kestra/issues/3961
2024-07-11 14:32:01 +02:00
Loïc Mathieu
936e4019b6 fix(core): add trigger type to worker trigger metrics
Fixes #4259
2024-07-11 14:31:43 +02:00
Loïc Mathieu
4c44090462 feat(core): add global jobs metrics to the server
Those metrics will allow to autoscale Worker pod based
2024-07-11 14:31:20 +02:00
Loïc Mathieu
9feac234f6 chore: upgrade to version 0.17.11 2024-07-08 14:43:42 +02:00
Florian Hussonnois
935212344c chore: upgrade to version 'v0.17.10' 2024-07-05 16:37:53 +02:00
Florian Hussonnois
5d4c4dc214 fix(core): fix processing of execution killed with tenant (#4173) 2024-07-05 16:33:23 +02:00
Miloš Paunović
a152354bcd chore(ui): redirect to welcome page if there are no flows (#4246)
* chore(ui): redirect to welcome page if there are no flows

* refactor(ui): removed duplicated home route definition
2024-07-05 16:32:50 +02:00
brian.mulier
08455bbdf0 chore: version 0.17.9 2024-07-05 11:35:18 +02:00
Miloš Paunović
64e9d8d43f fix(ui): increase the width of absolute date filter (#4242) 2024-07-05 11:34:48 +02:00
yuri
dd1b435720 feat(ui): humanize cron expressions (#4233)
* Added the **cRonstrue** dependency.
* Enabled English/French localization.
* Enabled the readable cron expression on Flow listing.
* Refactored the app locale config access.
2024-07-05 11:34:48 +02:00
Miloš Paunović
9fca8c9148 fix(ui): make sure labels have both key and value fields filled in (#4227) 2024-07-05 11:15:08 +02:00
Miloš Paunović
09d6e0f092 fix(ui): only validate graph is the flow is valid (#4161) 2024-07-05 11:14:58 +02:00
Loïc Mathieu
c3abfe08ad fix(core): FileService render twice the input files
Fixes #4093
2024-07-05 11:14:47 +02:00
Miloš Paunović
f2d5df082d fix(ui): parsing the date input using the proper value format (#4153) 2024-07-05 11:14:33 +02:00
Miloš Paunović
16c15116fe fix(ui): properly handle view type on namespace files (#4138) 2024-07-05 11:14:16 +02:00
Loïc Mathieu
f9f8d93ad7 chore: version 0.17.8 2024-06-27 11:26:22 +02:00
Loïc Mathieu
c3b1ceb289 Revert "fix(ui): properly handle view type on namespace files (#4138)"
This reverts commit dcec04e20e.
2024-06-27 11:21:57 +02:00
Loïc Mathieu
f9f33b96c8 chore: version 0.17.7 2024-06-27 10:40:31 +02:00
Miloš Paunović
658f847c48 chore(ui): improved the errors page (#4149) 2024-06-27 10:39:57 +02:00
Miloš Paunović
dcec04e20e fix(ui): properly handle view type on namespace files (#4138) 2024-06-27 10:38:39 +02:00
Miloš Paunović
b38ec8a21b fix(ui): only append date values to inputs form if they exist (#4135) 2024-06-27 10:38:13 +02:00
Miloš Paunović
35d801cbc2 chore: allow importing multiple files (#4128) 2024-06-27 10:37:35 +02:00
brian.mulier
511a10a65c fix(core): flows from one tenant don't erase those from others 2024-06-27 10:37:01 +02:00
Loïc Mathieu
c5daebe4aa fix(core): use the flow revision from the condition context instead of the trigger context
Fixes #3970
May also fix #4059 but we need to reproduce before to be sure
2024-06-27 10:36:45 +02:00
Florian Hussonnois
bb800948e5 chore: upgrade to version 'v0.17.6' 2024-06-25 16:42:44 +02:00
Florian Hussonnois
65f921d456 chore: add optional sequential id to executor
This commit adds an optional seqId property to the
Executor class that can be used to detect concurrent/stale updates
on execution.
2024-06-25 16:40:58 +02:00
YannC
ded21b0902 fix(jdbc): add timezone in JDBC url as it was using default JVM timezone (#4114) 2024-06-25 16:40:40 +02:00
Loïc Mathieu
7b109baac2 fix(core): the Request task can crash the worker
Fixes #4115
2024-06-25 16:40:28 +02:00
Loïc Mathieu
c11fe3466f fix(script): bad merge 2024-06-21 16:56:35 +02:00
Loïc Mathieu
94c5b7a6e4 chore: version 0.17.5 2024-06-21 16:54:16 +02:00
Loïc Mathieu
99ab5be8b9 fix(ui): amended editor content when switching from topology view (#4091) 2024-06-21 16:51:35 +02:00
Loïc Mathieu
aaa3a0ace0 fix(core): possible NPE on namespace files usage
Fixes #4078
2024-06-21 16:50:18 +02:00
brian.mulier
acc5a24d9a fix(core): add secret consumer when rendering variables for subflows
closes kestra-io/kestra-ee#1259
2024-06-20 15:04:32 +02:00
brian.mulier
892bb114ca chore: upgrade to version 'v0.17.4' 2024-06-17 12:41:03 +02:00
YannC
30e4fe4e0b fix(): namespaces files working in script tasks on Windows 2024-06-17 12:41:03 +02:00
brian.mulier
fcada08edd fix(core): nullable tenants & executions for execution skips 2024-06-17 12:32:01 +02:00
brian.mulier
a2df125a62 feat(*): skip executions for a tenant
part of kestra-io/kestra-ee#1247
2024-06-17 12:31:05 +02:00
brian.mulier
9d9c5dc1d1 fix(*): add tenant id to namespace identifier for skip execution by namespace
part of kestra-io/kestra-ee#1247
2024-06-17 12:30:55 +02:00
brian.mulier
dbb1a8eaa5 feat(*): skip executions for a namespace
part of kestra-io/kestra-ee#1247
2024-06-17 12:30:45 +02:00
YannC
1db6b57091 chore: upgrade to version 'v0.17.3' 2024-06-14 22:35:27 +02:00
Loïc Mathieu
934ea201a5 fix(webserver): add plugin alias icons
Fixes #4030
2024-06-14 22:21:29 +02:00
YannC
5dcd5b5af8 fix(ui): no stringify of json inputs (already string as coming form string input)
close #4033
2024-06-14 22:19:56 +02:00
YannC
000124f3dd fix(ui): only throw flow/execution not found on sse error when execution is not populated
close #4034
2024-06-14 22:19:53 +02:00
YannC
c37f104446 feat(ui): better handle of no permission (#4004) 2024-06-14 09:19:19 +02:00
yuri
3cfa48987f fix(ui): allow colon mark in label value (#4027) 2024-06-14 08:59:59 +02:00
Miloš Paunović
79c22ee22c chore(ui): only validate yaml files if they are flows (#4017) 2024-06-14 08:59:42 +02:00
Loïc Mathieu
d3a2fa13a5 fix(webserver): trim the flow when importing
Otherwhise the flows (except the first one) will have an empty line at the begining of the surce due to the way we split multiple flows.

Fixes #3915
2024-06-14 08:59:31 +02:00
Frank Tianyu Zeng
20078f1e19 fix(ui): improve the readability of error message in flows (#3901) 2024-06-14 08:59:19 +02:00
Miloš Paunović
72b86d9edf chore(ui): editor improvements (#4005)
* fix(ui): editor cursor position on windows is now recalculated properly

* fix(ui): binding file explorer context menu to id property instead of name
2024-06-14 08:58:57 +02:00
Miloš Paunović
59634133bc fix(ui): properly parsing json files in the editor (#4007) 2024-06-14 08:58:47 +02:00
Miloš Paunović
1e34a5528b chore(ui): added note for timezone settings (#3968) 2024-06-14 08:58:01 +02:00
Miloš Paunović
4aa3bd3ef2 chore(ui): added min and max values for int and float input types on flow execution (#3956) 2024-06-14 08:57:56 +02:00
YannC
f6581de304 fix(): replace Windows \ for / in LocalStorage 2024-06-14 08:57:42 +02:00
Florian Hussonnois
fd225d87b4 fix(core): properly inject pluginConfiguration for WorkingDirectory task (#4006)
fix: #4006
2024-06-14 08:57:36 +02:00
brian-mulier-p
9bb3f576ee fix(core): decrypt outputs for tasks within WorkingDirectory (#4001)
closes #4000
2024-06-14 08:57:28 +02:00
Florian Hussonnois
30cdb373cc fix(core): add unique prefix identifier for output files (#3991)
fix: #3991
2024-06-14 08:57:16 +02:00
YannC
59c7d6a567 chore: upgrade to version 'v0.17.2' 2024-06-10 16:24:37 +02:00
brian.mulier
9e4e5f891e fix(ui): namespace files calls were not including tenant 2024-06-10 16:23:42 +02:00
Milos Paunovic
ea3ba991d1 fix(ui): amended output preview for sqs trigger messages for ion files 2024-06-10 16:23:11 +02:00
Miloš Paunović
1024c77289 chore(ui): showing ee tooltip only on click (#3951) 2024-06-10 16:23:04 +02:00
Miloš Paunović
36b29d6065 chore(ui): showing ee tooltip on hover only once, then, just on click (#3944) 2024-06-10 16:22:57 +02:00
YannC
1c8177e185 chore: upgrade to version 0.17.1 2024-06-05 22:38:53 +02:00
brian.mulier
3dd5d6bb71 fix(ui): prevent the need of loading all flows for Flow tab to be displayed in editor 2024-06-05 22:38:53 +02:00
YannC
16a641693a fix(ui): avoid 404 with autocomplete when flow does not exist 2024-06-05 22:21:07 +02:00
YannC
efdb075155 fix(core): Now accept an extension for the file input
close #3858
2024-06-05 22:21:02 +02:00
Miloš Paunović
a99d52a406 fix(ui): added safety checks for all tour related calls (#3938) 2024-06-05 22:20:53 +02:00
YannC
852edea36e fix(ui): dont count flow in tutorial namespace 2024-06-05 22:20:45 +02:00
brian.mulier
defa426259 fix(ui): null-safe guided tour access in TriggerFlow.vue 2024-06-05 22:20:38 +02:00
Miloš Paunović
3aadcfd683 fix(ui): flow default inputs are now properly populated (#3934) 2024-06-05 22:20:30 +02:00
YannC
0f5d59103a fix(core): remove @NotEmpty
close #3920
2024-06-05 22:20:16 +02:00
YannC
50b9120434 fix(core): UploadFiles now handle subfolders 2024-06-05 22:19:53 +02:00
Anna Geller
896c761502 feat: switch from contact-us to demo 2024-06-05 22:19:39 +02:00
Loïc Mathieu
381d1b381f chore: fix docker image build 2024-06-04 15:29:51 +02:00
Loïc Mathieu
72a428a439 core: add default 'true' to docker task 2024-06-04 14:45:57 +02:00
Loïc Mathieu
7447e61dbc chore: fix docker workflow variable computation 2024-06-04 14:45:52 +02:00
Loïc Mathieu
45ffc3cc22 fix: Maven description 2024-06-04 11:11:48 +02:00
150 changed files with 2492 additions and 1318 deletions

View File

@@ -7,14 +7,7 @@ on:
description: 'Retag latest Docker images'
required: true
type: string
options:
- "true"
- "false"
skip-test:
description: 'Skip test'
required: false
type: string
default: "false"
default: "true"
options:
- "true"
- "false"
@@ -125,6 +118,16 @@ jobs:
packages: python3 python3-venv python-is-python3 python3-pip nodejs npm curl zip unzip
python-libs: kestra
steps:
- uses: actions/checkout@v4
# Vars
- name: Set image name
id: vars
run: |
TAG=${GITHUB_REF#refs/*/}
echo "tag=${TAG}" >> $GITHUB_OUTPUT
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
# Download release
- name: Download release
uses: robinraju/release-downloader@v1.10
@@ -137,14 +140,6 @@ jobs:
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
# Vars
- name: Set image name
id: vars
run: |
TAG=${GITHUB_REF#refs/*/}
echo "tag=${TAG}" >> $GITHUB_OUTPUT
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
# Docker setup
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
@@ -179,7 +174,7 @@ jobs:
- name: Retag to latest
if: github.event.inputs.retag-latest == 'true'
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{1}', matrix.image.name) }}
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
end:
runs-on: ubuntu-latest

View File

@@ -1,4 +1,4 @@
FROM eclipse-temurin:21-jre
FROM eclipse-temurin:21-jre-jammy
ARG KESTRA_PLUGINS=""
ARG APT_PACKAGES=""

View File

@@ -204,6 +204,8 @@ subprojects {
testImplementation 'org.hamcrest:hamcrest:2.2'
testImplementation 'org.hamcrest:hamcrest-library:2.2'
testImplementation group: 'org.exparity', name: 'hamcrest-date', version: '2.0.8'
testImplementation 'org.assertj:assertj-core:3.27.3'
}
test {
@@ -454,7 +456,7 @@ subprojects {
}
maven.pom {
description 'The modern, scalable orchestrator & scheduler open source platform'
description = 'The modern, scalable orchestrator & scheduler open source platform'
developers {
developer {

View File

@@ -37,6 +37,12 @@ public class ExecutorCommand extends AbstractServerCommand {
@CommandLine.Option(names = {"--skip-flows"}, split=",", description = "a list of flow identifiers (tenant|namespace|flowId) to skip, separated by a coma; for troubleshooting purpose only")
private List<String> skipFlows = Collections.emptyList();
@CommandLine.Option(names = {"--skip-namespaces"}, split=",", description = "a list of namespace identifiers (tenant|namespace) to skip, separated by a coma; for troubleshooting purpose only")
private List<String> skipNamespaces = Collections.emptyList();
@CommandLine.Option(names = {"--skip-tenants"}, split=",", description = "a list of tenants to skip, separated by a coma; for troubleshooting purpose only")
private List<String> skipTenants = Collections.emptyList();
@CommandLine.Option(names = {"--start-executors"}, split=",", description = "a list of Kafka Stream executors to start, separated by a command. Use it only with the Kafka queue, for debugging purpose.")
private List<String> startExecutors = Collections.emptyList();
@@ -54,6 +60,8 @@ public class ExecutorCommand extends AbstractServerCommand {
public Integer call() throws Exception {
this.skipExecutionService.setSkipExecutions(skipExecutions);
this.skipExecutionService.setSkipFlows(skipFlows);
this.skipExecutionService.setSkipNamespaces(skipNamespaces);
this.skipExecutionService.setSkipTenants(skipTenants);
this.startExecutorService.applyOptions(startExecutors, notStartExecutors);

View File

@@ -32,7 +32,7 @@ public class LocalCommand extends StandAloneCommand {
"kestra.queue.type", "h2",
"kestra.storage.type", "local",
"kestra.storage.local.base-path", data.toString(),
"datasources.h2.url", "jdbc:h2:file:" + data.resolve("database") + ";DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE;LOCK_TIMEOUT=30000",
"datasources.h2.url", "jdbc:h2:file:" + data.resolve("database") + ";TIME ZONE=UTC;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE;LOCK_TIMEOUT=30000",
"datasources.h2.username", "sa",
"datasources.h2.password", "",
"datasources.h2.driverClassName", "org.h2.Driver",

View File

@@ -49,6 +49,12 @@ public class StandAloneCommand extends AbstractServerCommand {
@CommandLine.Option(names = {"--skip-flows"}, split=",", description = "a list of flow identifiers (namespace.flowId) to skip, separated by a coma; for troubleshooting purpose only")
private List<String> skipFlows = Collections.emptyList();
@CommandLine.Option(names = {"--skip-namespaces"}, split=",", description = "a list of namespace identifiers (tenant|namespace) to skip, separated by a coma; for troubleshooting purpose only")
private List<String> skipNamespaces = Collections.emptyList();
@CommandLine.Option(names = {"--skip-tenants"}, split=",", description = "a list of tenants to skip, separated by a coma; for troubleshooting purpose only")
private List<String> skipTenants = Collections.emptyList();
@CommandLine.Option(names = {"--no-tutorials"}, description = "Flag to disable auto-loading of tutorial flows.")
boolean tutorialsDisabled = false;
@@ -74,6 +80,8 @@ public class StandAloneCommand extends AbstractServerCommand {
public Integer call() throws Exception {
this.skipExecutionService.setSkipExecutions(skipExecutions);
this.skipExecutionService.setSkipFlows(skipFlows);
this.skipExecutionService.setSkipNamespaces(skipNamespaces);
this.skipExecutionService.setSkipTenants(skipTenants);
this.startExecutorService.applyOptions(startExecutors, notStartExecutors);

View File

@@ -23,10 +23,12 @@ import org.apache.commons.lang3.ArrayUtils;
@Singleton
@Slf4j
public class MetricRegistry {
public final static String METRIC_WORKER_JOB_PENDING_COUNT = "worker.job.pending";
public final static String METRIC_WORKER_JOB_RUNNING_COUNT = "worker.job.running";
public final static String METRIC_WORKER_JOB_THREAD_COUNT = "worker.job.thread";
public final static String METRIC_WORKER_RUNNING_COUNT = "worker.running.count";
public final static String METRIC_WORKER_QUEUED_DURATION = "worker.queued.duration";
public final static String METRIC_WORKER_STARTED_COUNT = "worker.started.count";
public final static String METRIC_WORKER_RETRYED_COUNT = "worker.retryed.count";
public final static String METRIC_WORKER_TIMEOUT_COUNT = "worker.timeout.count";
public final static String METRIC_WORKER_ENDED_COUNT = "worker.ended.count";
public final static String METRIC_WORKER_ENDED_DURATION = "worker.ended.duration";
@@ -143,7 +145,7 @@ public class MetricRegistry {
*
* @param workerTask the current WorkerTask
* @param workerGroup the worker group, optional
* @return tags to applied to metrics
* @return tags to apply to metrics
*/
public String[] tags(WorkerTask workerTask, String workerGroup, String... tags) {
var baseTags = ArrayUtils.addAll(
@@ -164,7 +166,7 @@ public class MetricRegistry {
*
* @param workerTrigger the current WorkerTask
* @param workerGroup the worker group, optional
* @return tags to applied to metrics
* @return tags to apply to metrics
*/
public String[] tags(WorkerTrigger workerTrigger, String workerGroup, String... tags) {
var baseTags = ArrayUtils.addAll(
@@ -184,7 +186,7 @@ public class MetricRegistry {
* Return tags for current {@link WorkerTaskResult}
*
* @param workerTaskResult the current WorkerTaskResult
* @return tags to applied to metrics
* @return tags to apply to metrics
*/
public String[] tags(WorkerTaskResult workerTaskResult, String... tags) {
var baseTags = ArrayUtils.addAll(
@@ -200,7 +202,7 @@ public class MetricRegistry {
* Return tags for current {@link WorkerTaskResult}
*
* @param subflowExecutionResult the current WorkerTaskResult
* @return tags to applied to metrics
* @return tags to apply to metrics
*/
public String[] tags(SubflowExecutionResult subflowExecutionResult, String... tags) {
var baseTags = ArrayUtils.addAll(
@@ -216,7 +218,7 @@ public class MetricRegistry {
* Return tags for current {@link Task}
*
* @param task the current Task
* @return tags to applied to metrics
* @return tags to apply to metrics
*/
public String[] tags(Task task) {
return new String[]{
@@ -240,7 +242,7 @@ public class MetricRegistry {
* Return tags for current {@link Execution}
*
* @param execution the current Execution
* @return tags to applied to metrics
* @return tags to apply to metrics
*/
public String[] tags(Execution execution) {
var baseTags = new String[]{
@@ -255,33 +257,21 @@ public class MetricRegistry {
* Return tags for current {@link TriggerContext}
*
* @param triggerContext the current TriggerContext
* @param workerGroup the worker group, optional
* @return tags to applied to metrics
* @return tags to apply to metrics
*/
public String[] tags(TriggerContext triggerContext, String workerGroup) {
public String[] tags(TriggerContext triggerContext) {
var baseTags = new String[]{
TAG_FLOW_ID, triggerContext.getFlowId(),
TAG_NAMESPACE_ID, triggerContext.getNamespace()
};
baseTags = workerGroup == null ? baseTags : ArrayUtils.addAll(baseTags, TAG_WORKER_GROUP, workerGroup);
return triggerContext.getTenantId() == null ? baseTags : ArrayUtils.addAll(baseTags, TAG_TENANT_ID, triggerContext.getTenantId());
}
/**
* Return tags for current {@link TriggerContext}
*
* @param triggerContext the current TriggerContext
* @return tags to applied to metrics
*/
public String[] tags(TriggerContext triggerContext) {
return tags(triggerContext, null);
}
/**
* Return tags for current {@link SchedulerExecutionWithTrigger}.
*
* @param schedulerExecutionWithTrigger the current SchedulerExecutionWithTrigger
* @return tags to applied to metrics
* @return tags to apply to metrics
*/
public String[] tags(SchedulerExecutionWithTrigger schedulerExecutionWithTrigger, String... tags) {
return ArrayUtils.addAll(
@@ -294,7 +284,7 @@ public class MetricRegistry {
/**
* Return globals tags
*
* @return tags to applied to metrics
* @return tags to apply to metrics
*/
public Tags tags(String... tags) {
return Tags.of(tags);

View File

@@ -9,6 +9,22 @@ import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
/**
* The Kestra event for killing an execution. A {@link ExecutionKilled} can be in two states:
* <p>
* <pre>
* - {@link State#REQUESTED}: The event was requested either by an Executor or by an external request.
* - {@link State#EXECUTED}: The event was consumed and processed by the Executor.
* </pre>
*
* A {@link ExecutionKilled} will always transit from {@link State#REQUESTED} to {@link State#EXECUTED}
* regardless of whether the associated execution exist or not to ensure that Workers will be notified for the tasks
* to be killed no matter what the circumstances.
* <p>
* IMPORTANT: A {@link ExecutionKilled} is considered to be a fire-and-forget event. As a result, we do not manage a
* COMPLETED state, i.e., the Executor will never wait for Workers to process an executed {@link ExecutionKilled}
* before considering an execution to be KILLED.
*/
@Getter
@SuperBuilder
@EqualsAndHashCode

View File

@@ -8,22 +8,6 @@ import jakarta.validation.constraints.NotNull;
import lombok.*;
import lombok.experimental.SuperBuilder;
/**
* The Kestra event for killing an execution. A {@link ExecutionKilledExecution} can be in two states:
* <p>
* <pre>
* - {@link State#REQUESTED}: The event was requested either by an Executor or by an external request.
* - {@link State#EXECUTED}: The event was consumed and processed by the Executor.
* </pre>
*
* A {@link ExecutionKilledExecution} will always transit from {@link State#REQUESTED} to {@link State#EXECUTED}
* regardless of whether the associated execution exist or not to ensure that Workers will be notified for the tasks
* to be killed no matter what the circumstances.
* <p>
* IMPORTANT: A {@link ExecutionKilledExecution} is considered to be a fire-and-forget event. As a result, we do not manage a
* COMPLETED state, i.e., the Executor will never wait for Workers to process an executed {@link ExecutionKilledExecution}
* before considering an execution to be KILLED.
*/
@Getter
@SuperBuilder
@EqualsAndHashCode
@@ -47,8 +31,9 @@ public class ExecutionKilledExecution extends ExecutionKilled implements TenantI
Boolean isOnKillCascade;
public boolean isEqual(WorkerTask workerTask) {
return (workerTask.getTaskRun().getTenantId() == null || (workerTask.getTaskRun().getTenantId() != null && workerTask.getTaskRun().getTenantId().equals(this.tenantId))) &&
workerTask.getTaskRun().getExecutionId().equals(this.executionId);
String taskTenantId = workerTask.getTaskRun().getTenantId();
String taskExecutionId = workerTask.getTaskRun().getExecutionId();
return (taskTenantId == null || taskTenantId.equals(this.tenantId)) && taskExecutionId.equals(this.executionId);
}
@Override

View File

@@ -1,6 +1,5 @@
package io.kestra.core.models.flows;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
@@ -39,7 +38,6 @@ import jakarta.validation.constraints.Pattern;
@JsonSubTypes.Type(value = TimeInput.class, name = "TIME"),
@JsonSubTypes.Type(value = URIInput.class, name = "URI")
})
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public abstract class Input<T> implements Data {
@NotNull
@NotBlank

View File

@@ -24,7 +24,7 @@ public class EnumInput extends Input<String> {
@Override
public void validate(String input) throws ConstraintViolationException {
if (!values.contains(input)) {
if (!values.contains(input) & this.getRequired()) {
throw ManualConstraintViolation.toConstraintViolationException(
"it must match the values `" + values + "`",
this,

View File

@@ -1,18 +1,21 @@
package io.kestra.core.models.flows.input;
import io.kestra.core.models.flows.Input;
import jakarta.validation.ConstraintViolationException;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.net.URI;
import jakarta.validation.ConstraintViolationException;
@SuperBuilder
@Getter
@NoArgsConstructor
public class FileInput extends Input<URI> {
@Builder.Default
public String extension = ".upl";
@Override
public void validate(URI input) throws ConstraintViolationException {
// no validation yet

View File

@@ -26,9 +26,6 @@ public class Trigger extends TriggerContext {
@Nullable
private String executionId;
@Nullable
private State.Type executionCurrentState;
@Nullable
private Instant updatedDate;
@@ -38,7 +35,6 @@ public class Trigger extends TriggerContext {
protected Trigger(TriggerBuilder<?, ?> b) {
super(b);
this.executionId = b.executionId;
this.executionCurrentState = b.executionCurrentState;
this.updatedDate = b.updatedDate;
this.evaluateRunningDate = b.evaluateRunningDate;
}
@@ -79,7 +75,7 @@ public class Trigger extends TriggerContext {
}
public String flowUid() {
return Flow.uid(this.getTenantId(), this.getNamespace(), this.getFlowId(), Optional.of(this.getFlowRevision()));
return Flow.uidWithoutRevision(this.getTenantId(), this.getNamespace(), this.getFlowId());
}
/**
@@ -90,7 +86,6 @@ public class Trigger extends TriggerContext {
.tenantId(flow.getTenantId())
.namespace(flow.getNamespace())
.flowId(flow.getId())
.flowRevision(flow.getRevision())
.triggerId(abstractTrigger.getId())
.stopAfter(abstractTrigger.getStopAfter())
.build();
@@ -137,12 +132,10 @@ public class Trigger extends TriggerContext {
.tenantId(execution.getTenantId())
.namespace(execution.getNamespace())
.flowId(execution.getFlowId())
.flowRevision(execution.getFlowRevision())
.triggerId(execution.getTrigger().getId())
.date(trigger.getDate())
.nextExecutionDate(trigger.getNextExecutionDate())
.executionId(execution.getId())
.executionCurrentState(execution.getState().getCurrent())
.updatedDate(Instant.now())
.backfill(trigger.getBackfill())
.stopAfter(trigger.getStopAfter())
@@ -175,7 +168,6 @@ public class Trigger extends TriggerContext {
.tenantId(flow.getTenantId())
.namespace(flow.getNamespace())
.flowId(flow.getId())
.flowRevision(flow.getRevision())
.triggerId(abstractTrigger.getId())
.date(ZonedDateTime.now().truncatedTo(ChronoUnit.SECONDS))
.nextExecutionDate(nextDate)
@@ -225,7 +217,6 @@ public class Trigger extends TriggerContext {
.tenantId(this.getTenantId())
.namespace(this.getNamespace())
.flowId(this.getFlowId())
.flowRevision(this.getFlowRevision())
.triggerId(this.getTriggerId())
.date(this.getDate())
.nextExecutionDate(nextExecutionDate)
@@ -240,7 +231,6 @@ public class Trigger extends TriggerContext {
.tenantId(this.getTenantId())
.namespace(this.getNamespace())
.flowId(this.getFlowId())
.flowRevision(this.getFlowRevision())
.triggerId(this.getTriggerId())
.date(this.getDate())
.nextExecutionDate(this.getNextExecutionDate())
@@ -301,7 +291,6 @@ public class Trigger extends TriggerContext {
.tenantId(triggerContext.getTenantId())
.namespace(triggerContext.getNamespace())
.flowId(triggerContext.getFlowId())
.flowRevision(triggerContext.getFlowRevision())
.triggerId(triggerContext.getTriggerId())
.date(triggerContext.getDate())
.backfill(triggerContext.getBackfill())

View File

@@ -29,9 +29,6 @@ public class TriggerContext {
@NotNull
private String flowId;
@NotNull
private Integer flowRevision;
@NotNull
private String triggerId;
@@ -53,7 +50,6 @@ public class TriggerContext {
this.tenantId = b.tenantId;
this.namespace = b.namespace;
this.flowId = b.flowId;
this.flowRevision = b.flowRevision;
this.triggerId = b.triggerId;
this.date = b.date;
this.nextExecutionDate = b.nextExecutionDate;

View File

@@ -1,7 +1,5 @@
package io.kestra.core.models.triggers;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.Label;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionTrigger;
@@ -22,7 +20,7 @@ public abstract class TriggerService {
ExecutionTrigger executionTrigger = ExecutionTrigger.of(trigger, variables);
RunContext runContext = conditionContext.getRunContext();
return generateExecution(runContext.getTriggerExecutionId(), trigger, context, executionTrigger);
return generateExecution(runContext.getTriggerExecutionId(), trigger, context, executionTrigger, conditionContext.getFlow().getRevision());
}
public static Execution generateExecution(
@@ -34,30 +32,32 @@ public abstract class TriggerService {
ExecutionTrigger executionTrigger = ExecutionTrigger.of(trigger, output);
RunContext runContext = conditionContext.getRunContext();
return generateExecution(runContext.getTriggerExecutionId(), trigger, context, executionTrigger);
return generateExecution(runContext.getTriggerExecutionId(), trigger, context, executionTrigger, conditionContext.getFlow().getRevision());
}
public static Execution generateRealtimeExecution(
AbstractTrigger trigger,
ConditionContext conditionContext,
TriggerContext context,
Output output
) {
ExecutionTrigger executionTrigger = ExecutionTrigger.of(trigger, output);
return generateExecution(IdUtils.create(), trigger, context, executionTrigger);
return generateExecution(IdUtils.create(), trigger, context, executionTrigger, conditionContext.getFlow().getRevision());
}
private static Execution generateExecution(
String id,
AbstractTrigger trigger,
TriggerContext context,
ExecutionTrigger executionTrigger
ExecutionTrigger executionTrigger,
Integer flowRevision
) {
return Execution.builder()
.id(id)
.namespace(context.getNamespace())
.flowId(context.getFlowId())
.flowRevision(context.getFlowRevision())
.flowRevision(flowRevision)
.state(new State())
.trigger(executionTrigger)
.labels(trigger.getLabels() == null ? null : trigger.getLabels())

View File

@@ -51,7 +51,7 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
@Nullable ChildFilter childFilter
);
Flux<Execution> find(
default Flux<Execution> find(
@Nullable String query,
@Nullable String tenantId,
@Nullable String namespace,
@@ -62,6 +62,22 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
@Nullable Map<String, String> labels,
@Nullable String triggerExecutionId,
@Nullable ChildFilter childFilter
) {
return find(query, tenantId, namespace, flowId, startDate, endDate, state, labels, triggerExecutionId, childFilter, false);
}
Flux<Execution> find(
@Nullable String query,
@Nullable String tenantId,
@Nullable String namespace,
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
@Nullable List<State.Type> state,
@Nullable Map<String, String> labels,
@Nullable String triggerExecutionId,
@Nullable ChildFilter childFilter,
boolean allowDeleted
);
ArrayListTotal<TaskRun> findTaskRun(

View File

@@ -35,6 +35,11 @@ public class Executor {
private ExecutionResumed executionResumed;
private ExecutionResumed joinedExecutionResumed;
/**
* The sequence id should be incremented each time the execution is persisted after mutation.
*/
private long seqId = 0L;
/**
* List of {@link ExecutionKilled} to be propagated part of the execution.
*/
@@ -45,6 +50,12 @@ public class Executor {
this.offset = offset;
}
public Executor(Execution execution, Long offset, long seqId) {
this.execution = execution;
this.offset = offset;
this.seqId = seqId;
}
public Executor(WorkerTaskResult workerTaskResult) {
this.joinedWorkerTaskResult = workerTaskResult;
}
@@ -148,7 +159,18 @@ public class Executor {
public Executor serialize() {
return new Executor(
this.execution,
this.offset
this.offset,
this.seqId
);
}
/**
* Increments and returns the execution sequence id.
*
* @return the sequence id.
*/
public long incrementAndGetSeqId() {
this.seqId++;
return seqId;
}
}

View File

@@ -2,6 +2,7 @@ package io.kestra.core.runners;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.tasks.runners.PluginUtilsService;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
@@ -44,17 +45,16 @@ public abstract class FilesService {
file.getParentFile().mkdirs();
}
var fileContent = runContext.render(input, additionalVars);
if (fileContent == null) {
if (input == null) {
file.createNewFile();
} else {
if (fileContent.startsWith("kestra://")) {
try (var is = runContext.storage().getFile(URI.create(fileContent));
if (input.startsWith("kestra://")) {
try (var is = runContext.storage().getFile(URI.create(input));
var out = new FileOutputStream(file)) {
IOUtils.copyLarge(is, out);
}
} else {
Files.write(file.toPath(), fileContent.getBytes());
Files.write(file.toPath(), input.getBytes());
}
}
}));
@@ -85,10 +85,14 @@ public abstract class FilesService {
.filter(path -> pathMatcher.matches(runContext.tempDir().relativize(path)))
.map(throwFunction(path -> new AbstractMap.SimpleEntry<>(
runContext.tempDir().relativize(path).toString(),
runContext.storage().putFile(path.toFile())
runContext.storage().putFile(path.toFile(), resolveUniqueNameForFile(path))
)))
.toList()
.stream();
}
}
private static String resolveUniqueNameForFile(final Path path) {
return IdUtils.from(path.toString()) + "-" + path.toFile().getName();
}
}

View File

@@ -8,6 +8,7 @@ import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.Input;
import io.kestra.core.models.flows.Type;
import io.kestra.core.models.flows.input.ArrayInput;
import io.kestra.core.models.flows.input.FileInput;
import io.kestra.core.models.tasks.common.EncryptedString;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.StorageInterface;
@@ -85,7 +86,9 @@ public class FlowInputOutput {
.subscribeOn(Schedulers.boundedElastic())
.map(throwFunction(input -> {
if (input instanceof CompletedFileUpload fileUpload) {
File tempFile = File.createTempFile(fileUpload.getFilename() + "_", ".upl");
String fileExtension = inputs.stream().filter(flowInput -> flowInput instanceof FileInput && flowInput.getId().equals(fileUpload.getFilename())).map(flowInput -> ((FileInput) flowInput).getExtension()).findFirst().orElse(".upl");
fileExtension = fileExtension.startsWith(".") ? fileExtension : "." + fileExtension;
File tempFile = File.createTempFile(fileUpload.getFilename() + "_", fileExtension);
try (var inputStream = fileUpload.getInputStream();
var outputStream = new FileOutputStream(tempFile)) {
long transferredBytes = inputStream.transferTo(outputStream);

View File

@@ -14,6 +14,7 @@ import io.kestra.core.services.FlowListenersInterface;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@@ -98,13 +99,13 @@ public class FlowListeners implements FlowListenersInterface {
private Optional<Flow> previous(Flow flow) {
return flows
.stream()
.filter(r -> r.getNamespace().equals(flow.getNamespace()) && r.getId().equals(flow.getId()))
.filter(r -> Objects.equals(r.getTenantId(), flow.getTenantId()) && r.getNamespace().equals(flow.getNamespace()) && r.getId().equals(flow.getId()))
.findFirst();
}
private boolean remove(Flow flow) {
synchronized (this) {
boolean remove = flows.removeIf(r -> r.getNamespace().equals(flow.getNamespace()) && r.getId().equals(flow.getId()));
boolean remove = flows.removeIf(r -> Objects.equals(r.getTenantId(), flow.getTenantId()) && r.getNamespace().equals(flow.getNamespace()) && r.getId().equals(flow.getId()));
if (!remove && flow.isDeleted()) {
log.warn("Can't remove flow {}.{}", flow.getNamespace(), flow.getId());
}

View File

@@ -31,7 +31,7 @@ public class NamespaceFilesService {
private StorageInterface storageInterface;
public List<URI> inject(RunContext runContext, String tenantId, String namespace, Path basePath, NamespaceFiles namespaceFiles) throws Exception {
if (!namespaceFiles.getEnabled()) {
if (!Boolean.TRUE.equals(namespaceFiles.getEnabled())) {
return Collections.emptyList();
}

View File

@@ -339,7 +339,7 @@ public class RunContext {
if (execution.getTaskRunList() != null) {
Map<String, Object> outputs = new HashMap<>(execution.outputs());
if (decryptVariables) {
decryptOutputs(outputs);
outputs = decryptOutputs(outputs);
}
builder.put("outputs", outputs);
}
@@ -401,25 +401,37 @@ public class RunContext {
));
}
if (this.runContextLogger != null) {
builder.put("addSecretConsumer", (Consumer<String>) s -> this.runContextLogger.usedSecret(s));
}
return builder.build();
}
private void decryptOutputs(Map<String, Object> outputs) {
for (var entry: outputs.entrySet()) {
private Map<String, Object> decryptOutputs(Map<String, Object> mapToDecrypt) {
if (mapToDecrypt == null) {
return null;
}
Map<String, Object> decryptedMap = new HashMap<>();
for (var entry: mapToDecrypt.entrySet()) {
decryptedMap.put(entry.getKey(), entry.getValue());
if (entry.getValue() instanceof Map map) {
// if some outputs are of type EncryptedString we decode them and replace the object
// if some value are of type EncryptedString we decode them and replace the object
if (EncryptedString.TYPE.equalsIgnoreCase((String)map.get("type"))) {
try {
String decoded = decrypt((String) map.get("value"));
outputs.put(entry.getKey(), decoded);
decryptedMap.put(entry.getKey(), decoded);
} catch (GeneralSecurityException e) {
throw new RuntimeException(e);
}
} else {
decryptOutputs((Map<String, Object>) map);
decryptedMap.put(entry.getKey(), decryptOutputs((Map<String, Object>) map));
}
}
}
return decryptedMap;
}
private Map<String, Object> variables(TaskRun taskRun) {
@@ -502,6 +514,8 @@ public class RunContext {
runContext.runContextLogger = this.runContextLogger;
runContext.tempBasedPath = this.tempBasedPath;
runContext.temporaryDirectory = this.temporaryDirectory;
runContext.pluginConfiguration = this.pluginConfiguration;
runContext.secretKey = this.secretKey;
return runContext;
}
@@ -583,6 +597,17 @@ public class RunContext {
return newContext;
}
public RunContext forWorkingDirectoryTask(final Task task) {
Map<String, Object> decryptedVariables = new HashMap<>(this.variables);
if (this.variables.get("outputs") != null) {
decryptedVariables.put("outputs", decryptOutputs((Map<String, Object>) this.variables.get("outputs")));
}
RunContext newRunContext = this.clone(decryptedVariables);
newRunContext.initPluginConfiguration(applicationContext, task.getClass(), task.getType());
return newRunContext;
}
public RunContext forTaskRunner(TaskRunner taskRunner) {
this.initPluginConfiguration(applicationContext, taskRunner.getClass(), taskRunner.getType());

View File

@@ -166,7 +166,7 @@ public class VariableRenderer {
}
public Map<String, Object> render(Map<String, Object> in, Map<String, Object> variables, boolean recursive) throws IllegalVariableEvaluationException {
Map<String, Object> map = new HashMap<>();
Map<String, Object> map = new LinkedHashMap<>();
for (Map.Entry<String, Object> r : in.entrySet()) {
String key = this.render(r.getKey(), variables);

View File

@@ -34,6 +34,7 @@ import io.micronaut.context.annotation.Parameter;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.core.annotation.Introspected;
import io.micronaut.core.annotation.Nullable;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Named;
@@ -126,6 +127,9 @@ public class Worker implements Service, Runnable, AutoCloseable {
private final AtomicReference<ServiceState> state = new AtomicReference<>();
private final Integer numThreads;
private final AtomicInteger pendingJobCount = new AtomicInteger(0);
private final AtomicInteger runningJobCount = new AtomicInteger(0);
/**
* Creates a new {@link Worker} instance.
*
@@ -143,6 +147,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
ExecutorsUtils executorsUtils
) {
this.id = workerId;
this.numThreads = numThreads;
this.workerGroup = workerGroupService.resolveGroupFromKey(workerGroupKey);
this.eventPublisher = eventPublisher;
this.executorService = executorsUtils.maxCachedThreadPool(numThreads, "worker");
@@ -168,6 +173,15 @@ public class Worker implements Service, Runnable, AutoCloseable {
context.inject(this);
}
@PostConstruct
void initMetrics() {
String[] tags = this.workerGroup == null ? new String[0] : new String[] { MetricRegistry.TAG_WORKER_GROUP, this.workerGroup };
// create metrics to store thread count, pending jobs and running jobs, so we can have autoscaling easily
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_THREAD_COUNT, numThreads, tags);
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_PENDING_COUNT, pendingJobCount, tags);
this.metricRegistry.gauge(MetricRegistry.METRIC_WORKER_JOB_RUNNING_COUNT, runningJobCount, tags);
}
@Override
public void run() {
setState(ServiceState.RUNNING);
@@ -208,19 +222,29 @@ public class Worker implements Service, Runnable, AutoCloseable {
this.workerGroup,
Worker.class,
either -> {
pendingJobCount.incrementAndGet();
executorService.execute(() -> {
if (either.isRight()) {
log.error("Unable to deserialize a worker job: {}", either.getRight().getMessage());
handleDeserializationError(either.getRight());
return;
pendingJobCount.decrementAndGet();
runningJobCount.incrementAndGet();
try {
if (either.isRight()) {
log.error("Unable to deserialize a worker job: {}", either.getRight().getMessage());
handleDeserializationError(either.getRight());
return;
}
WorkerJob workerTask = either.getLeft();
if (workerTask instanceof WorkerTask task) {
handleTask(task);
} else if (workerTask instanceof WorkerTrigger trigger) {
handleTrigger(trigger);
}
} finally {
runningJobCount.decrementAndGet();
}
WorkerJob workerTask = either.getLeft();
if (workerTask instanceof WorkerTask task) {
handleTask(task);
} else if (workerTask instanceof WorkerTrigger trigger) {
handleTrigger(trigger);
}
});
}
);
@@ -285,7 +309,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
WorkerTask currentWorkerTask = workingDirectory.workerTask(
workerTask.getTaskRun(),
currentTask,
runContext
runContext.forWorkingDirectoryTask(currentTask)
);
// all tasks will be handled immediately by the worker
@@ -378,13 +402,13 @@ public class Worker implements Service, Runnable, AutoCloseable {
.increment();
this.metricRegistry
.timer(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION, metricRegistry.tags(workerTrigger.getTriggerContext(), workerGroup))
.timer(MetricRegistry.METRIC_WORKER_TRIGGER_DURATION, metricRegistry.tags(workerTrigger, workerGroup))
.record(() -> {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
this.evaluateTriggerRunningCount.computeIfAbsent(workerTrigger.getTriggerContext().uid(), s -> metricRegistry
.gauge(MetricRegistry.METRIC_WORKER_TRIGGER_RUNNING_COUNT, new AtomicInteger(0), metricRegistry.tags(workerTrigger.getTriggerContext(), workerGroup)));
.gauge(MetricRegistry.METRIC_WORKER_TRIGGER_RUNNING_COUNT, new AtomicInteger(0), metricRegistry.tags(workerTrigger, workerGroup)));
this.evaluateTriggerRunningCount.get(workerTrigger.getTriggerContext().uid()).addAndGet(1);
try {

View File

@@ -70,12 +70,14 @@ public abstract class AbstractScheduler implements Scheduler, Service {
private final PluginDefaultService pluginDefaultService;
private final WorkerGroupService workerGroupService;
private final LogService logService;
protected SchedulerExecutionStateInterface executionState;
// must be volatile as it's updated by the flow listener thread and read by the scheduleExecutor thread
private volatile Boolean isReady = false;
private final ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
@Getter
protected SchedulerTriggerStateInterface triggerState;
// schedulable and schedulableNextDate must be volatile and their access synchronized as they are updated and read by different threads.
@@ -178,7 +180,6 @@ public abstract class AbstractScheduler implements Scheduler, Service {
if (abstractTrigger instanceof WorkerTriggerInterface) {
RunContext runContext = runContextFactory.of(flow, abstractTrigger);
ConditionContext conditionContext = conditionService.conditionContext(runContext, flow, null);
Trigger trigger = Trigger.of(flow, abstractTrigger);
try {
this.triggerState.update(flow, abstractTrigger, conditionContext);
@@ -186,6 +187,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
logError(conditionContext, flow, abstractTrigger, e);
}
Trigger trigger = Trigger.of(flow, abstractTrigger);
this.executionKilledQueue.emit(ExecutionKilledTrigger
.builder()
.tenantId(trigger.getTenantId())
@@ -255,7 +257,6 @@ public abstract class AbstractScheduler implements Scheduler, Service {
.tenantId(flowAndTrigger.flow().getTenantId())
.namespace(flowAndTrigger.flow().getNamespace())
.flowId(flowAndTrigger.flow().getId())
.flowRevision(flowAndTrigger.flow().getRevision())
.triggerId(flowAndTrigger.trigger().getId())
.date(now())
.nextExecutionDate(nextExecutionDate)
@@ -343,7 +344,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
logError(conditionContext, flow, abstractTrigger, e);
return null;
}
this.triggerState.save(triggerContext, scheduleContext);
this.triggerState.save(triggerContext, scheduleContext, "/kestra/services/scheduler/compute-schedulable/save/lastTrigger-nextDate-null");
} else {
triggerContext = lastTrigger;
}
@@ -368,7 +369,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
public List<FlowWithTriggers> schedulerTriggers() {
Map<String, Flow> flows = this.flowListeners.flows()
.stream()
.collect(Collectors.toMap(Flow::uid, Function.identity()));
.collect(Collectors.toMap(Flow::uidWithoutRevision, Function.identity()));
return this.triggerState.findAllForAllTenants().stream()
.filter(trigger -> flows.containsKey(trigger.flowUid()))
@@ -432,11 +433,6 @@ public abstract class AbstractScheduler implements Scheduler, Service {
)
.build()
)
.peek(f -> {
if (f.getTriggerContext().getEvaluateRunningDate() != null || !isExecutionNotRunning(f)) {
this.triggerState.unlock(f.getTriggerContext());
}
})
.filter(f -> f.getTriggerContext().getEvaluateRunningDate() == null)
.filter(this::isExecutionNotRunning)
.map(FlowWithWorkerTriggerNextDate::of)
@@ -472,7 +468,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
Trigger triggerRunning = Trigger.of(f.getTriggerContext(), now);
try {
this.triggerState.save(triggerRunning, scheduleContext);
this.triggerState.save(triggerRunning, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-true/polling");
this.sendWorkerTriggerToWorker(f);
} catch (InternalException e) {
logService.logTrigger(
@@ -497,7 +493,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
schedule.nextEvaluationDate(f.getConditionContext(), Optional.of(f.getTriggerContext()))
);
trigger = trigger.checkBackfill();
this.triggerState.save(trigger, scheduleContext);
this.triggerState.save(trigger, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-true/schedule");
}
} else {
logService.logTrigger(
@@ -515,7 +511,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
logError(f, e);
}
var trigger = f.getTriggerContext().toBuilder().nextExecutionDate(nextExecutionDate).build().checkBackfill();
this.triggerState.save(trigger, scheduleContext);
this.triggerState.save(trigger, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-false");
}
} catch (InternalException ie) {
// validate schedule condition can fail to render variables
@@ -526,13 +522,12 @@ public abstract class AbstractScheduler implements Scheduler, Service {
.tenantId(f.getTriggerContext().getTenantId())
.namespace(f.getTriggerContext().getNamespace())
.flowId(f.getTriggerContext().getFlowId())
.flowRevision(f.getTriggerContext().getFlowRevision())
.labels(f.getFlow().getLabels())
.state(new State().withState(State.Type.FAILED))
.build();
ZonedDateTime nextExecutionDate = this.nextEvaluationDate(f.getAbstractTrigger());
var trigger = f.getTriggerContext().resetExecution(State.Type.FAILED, nextExecutionDate);
this.saveLastTriggerAndEmitExecution(execution, trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext));
this.saveLastTriggerAndEmitExecution(execution, trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext, "/kestra/services/scheduler/handle/save/on-error"));
}
});
});
@@ -572,7 +567,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
// Schedule triggers are being executed directly from the handle method within the context where triggers are locked.
// So we must save them by passing the scheduleContext.
this.saveLastTriggerAndEmitExecution(result.getExecution(), trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext));
this.saveLastTriggerAndEmitExecution(result.getExecution(), trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext, "/kestra/services/scheduler/handleEvaluateSchedulingTriggerResult/save"));
}
protected void saveLastTriggerAndEmitExecution(Execution execution, Trigger trigger, Consumer<Trigger> saveAction) {
@@ -593,8 +588,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
return true;
}
// The execution is not yet started, we skip
if (lastTrigger.getExecutionCurrentState() == null) {
Optional<Execution> execution = executionState.findById(lastTrigger.getTenantId(), lastTrigger.getExecutionId());
// executionState hasn't received the execution, we skip
if (execution.isEmpty()) {
if (lastTrigger.getUpdatedDate() != null) {
metricRegistry
.timer(MetricRegistry.SCHEDULER_EXECUTION_MISSING_DURATION, metricRegistry.tags(lastTrigger))
@@ -621,6 +618,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
.record(Duration.between(lastTrigger.getUpdatedDate(), Instant.now()));
}
// TODO if we set the state in the trigger after it has been started we can avoid getting the execution and
// check that if an executionId but no state, this means the execution is not started
// we need to have {@code lastTrigger.getExecutionId() == null} to be tell the execution is not running.
// the scheduler will clean the execution from the trigger and we don't keep only terminated state as an end.
if (log.isDebugEnabled()) {
logService.logTrigger(
f.getTriggerContext(),
@@ -628,7 +629,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
Level.DEBUG,
"Execution '{}' is still '{}', updated at '{}'",
lastTrigger.getExecutionId(),
lastTrigger.getExecutionCurrentState(),
execution.get().getState().getCurrent(),
lastTrigger.getUpdatedDate()
);
}
@@ -849,7 +850,6 @@ public abstract class AbstractScheduler implements Scheduler, Service {
.tenantId(f.getTriggerContext().getTenantId())
.namespace(f.getTriggerContext().getNamespace())
.flowId(f.getTriggerContext().getFlowId())
.flowRevision(f.getTriggerContext().getFlowRevision())
.triggerId(f.getTriggerContext().getTriggerId())
.date(f.getTriggerContext().getNextExecutionDate())
.nextExecutionDate(f.getTriggerContext().getNextExecutionDate())

View File

@@ -1,4 +1,11 @@
package io.kestra.core.schedulers;
import java.util.function.Consumer;
// For tests purpose
public class DefaultScheduleContext implements ScheduleContextInterface {}
public class DefaultScheduleContext implements ScheduleContextInterface {
@Override
public void doInTransaction(Consumer<ScheduleContextInterface> consumer) {
consumer.accept(this);
}
}

View File

@@ -36,10 +36,12 @@ public class DefaultScheduler extends AbstractScheduler {
public DefaultScheduler(
ApplicationContext applicationContext,
FlowListenersInterface flowListeners,
SchedulerExecutionStateInterface executionState,
SchedulerTriggerStateInterface triggerState
) {
super(applicationContext, flowListeners);
this.triggerState = triggerState;
this.executionState = executionState;
this.conditionService = applicationContext.getBean(ConditionService.class);
this.flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);

View File

@@ -1,4 +1,14 @@
package io.kestra.core.schedulers;
import java.util.function.Consumer;
/**
* This context is used by the Scheduler to allow evaluating and updating triggers in a transaction from the main evaluation loop.
* See AbstractScheduler.handle().
*/
public interface ScheduleContextInterface {
/**
* Do trigger retrieval and updating in a single transaction.
*/
void doInTransaction(Consumer<ScheduleContextInterface> consumer);
}

View File

@@ -0,0 +1,19 @@
package io.kestra.core.schedulers;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import java.util.Optional;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@Singleton
public class SchedulerExecutionState implements SchedulerExecutionStateInterface {
@Inject
private ExecutionRepositoryInterface executionRepository;
@Override
public Optional<Execution> findById(String tenantId, String id) {
return executionRepository.findById(tenantId, id);
}
}

View File

@@ -0,0 +1,9 @@
package io.kestra.core.schedulers;
import io.kestra.core.models.executions.Execution;
import java.util.Optional;
public interface SchedulerExecutionStateInterface {
Optional<Execution> findById(String tenantId, String id);
}

View File

@@ -20,19 +20,22 @@ public interface SchedulerTriggerStateInterface {
Trigger create(Trigger trigger) throws ConstraintViolationException;
Trigger save(Trigger trigger, ScheduleContextInterface scheduleContext, String headerContent) throws ConstraintViolationException;
Trigger create(Trigger trigger, String headerContent) throws ConstraintViolationException;
Trigger update(Trigger trigger);
Trigger update(Flow flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) throws Exception;
/**
* Used by the JDBC implementation: find triggers in all tenants.
*/
List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContext);
/**
* Required for Kafka
* Used by the Kafka implementation: find triggers in the scheduler assigned flow (as in Kafka partition assignment).
*/
List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext);
/**
* Required for Kafka
*/
void unlock(Trigger trigger);
}

View File

@@ -345,7 +345,8 @@ public class ExecutionService {
state,
null,
null,
null
null,
true
)
.map(throwFunction(execution -> {
PurgeResult.PurgeResultBuilder<?, ?> builder = PurgeResult.builder();
@@ -478,6 +479,7 @@ public class ExecutionService {
.executionId(childExecution.getId())
.isOnKillCascade(true)
.state(ExecutionKilled.State.REQUESTED) // Event will be reentrant in the Executor.
.tenantId(tenantId)
.build()
);
}

View File

@@ -12,13 +12,23 @@ import java.util.List;
public class SkipExecutionService {
private volatile List<String> skipExecutions = Collections.emptyList();
private volatile List<FlowId> skipFlows = Collections.emptyList();
private volatile List<NamespaceId> skipNamespaces = Collections.emptyList();
private volatile List<String> skipTenants = Collections.emptyList();
public synchronized void setSkipExecutions(List<String> skipExecutions) {
this.skipExecutions = skipExecutions;
this.skipExecutions = skipExecutions == null ? Collections.emptyList() : skipExecutions;
}
public synchronized void setSkipFlows(List<String> skipFlows) {
this.skipFlows = skipFlows == null ? Collections.emptyList() : skipFlows.stream().map(flow -> FlowId.from(flow)).toList();
this.skipFlows = skipFlows == null ? Collections.emptyList() : skipFlows.stream().map(FlowId::from).toList();
}
public synchronized void setSkipNamespaces(List<String> skipNamespaces) {
this.skipNamespaces = skipNamespaces == null ? Collections.emptyList() : skipNamespaces.stream().map(NamespaceId::from).toList();
}
public synchronized void setSkipTenants(List<String> skipTenants) {
this.skipTenants = skipTenants == null ? Collections.emptyList() : skipTenants;
}
/**
@@ -38,17 +48,30 @@ public class SkipExecutionService {
@VisibleForTesting
boolean skipExecution(String tenant, String namespace, String flow, String executionId) {
return skipExecutions.contains(executionId) ||
skipFlows.contains(new FlowId(tenant, namespace, flow));
return (tenant != null && skipTenants.contains(tenant)) ||
skipNamespaces.contains(new NamespaceId(tenant, namespace)) ||
skipFlows.contains(new FlowId(tenant, namespace, flow)) ||
(executionId != null && skipExecutions.contains(executionId));
}
private static String[] splitIdParts(String id) {
return id.split("\\|");
}
record FlowId(String tenant, String namespace, String flow) {
static FlowId from(String flowId) {
String[] parts = flowId.split("\\|");
String[] parts = SkipExecutionService.splitIdParts(flowId);
if (parts.length == 3) {
return new FlowId(parts[0], parts[1], parts[2]);
}
return new FlowId(null, parts[0], parts[1]);
}
};
record NamespaceId(String tenant, String namespace) {
static NamespaceId from(String namespaceId) {
String[] parts = SkipExecutionService.splitIdParts(namespaceId);
return new NamespaceId(parts[0], parts[1]);
}
};
}

View File

@@ -152,7 +152,6 @@ abstract public class TestsUtils {
.triggerId(trigger.getId())
.flowId(flow.getId())
.namespace(flow.getNamespace())
.flowRevision(flow.getRevision())
.date(ZonedDateTime.now())
.build();

View File

@@ -11,30 +11,15 @@ import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.ExecutableUtils;
import io.kestra.core.runners.FlowExecutorInterface;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.SubflowExecution;
import io.kestra.core.runners.SubflowExecutionResult;
import io.kestra.core.runners.*;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.Min;
import lombok.experimental.SuperBuilder;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.util.*;
import java.util.stream.Collectors;
@SuperBuilder
@@ -153,6 +138,7 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
if (this.labels != null) {
for (Map.Entry<String, String> entry : this.labels.entrySet()) {
labels.removeIf(label -> label.key().equals(entry.getKey()));
labels.add(new Label(entry.getKey(), runContext.render(entry.getValue())));
}
}

View File

@@ -160,7 +160,9 @@ public class WaitFor extends Task implements FlowableTask<WaitFor.Output> {
return false;
}
Integer iterationCount = (Integer) parentTaskRun.getOutputs().get("iterationCount");
Integer iterationCount = Optional.ofNullable(parentTaskRun.getOutputs())
.map(outputs -> (Integer) outputs.get("iterationCount"))
.orElse(0);
if (this.checkFrequency.maxIterations != null && iterationCount != null && iterationCount > this.checkFrequency.maxIterations) {
if (printLog) {logger.warn("Max iterations reached");}
return true;
@@ -236,7 +238,9 @@ public class WaitFor extends Task implements FlowableTask<WaitFor.Output> {
public WaitFor.Output outputs(TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
String value = parentTaskRun != null ?
parentTaskRun.getOutputs().get("iterationCount").toString() : "0";
String.valueOf(Optional.ofNullable(parentTaskRun.getOutputs())
.map(outputs -> outputs.get("iterationCount"))
.orElse("0")) : "0";
return Output.builder()
.iterationCount(Integer.parseInt(value) + 1)

View File

@@ -270,7 +270,7 @@ public class WorkingDirectory extends Sequential implements NamespaceFilesInterf
}
}
if (this.namespaceFiles != null ) {
if (this.namespaceFiles != null && Boolean.TRUE.equals(this.namespaceFiles.getEnabled())) {
NamespaceFilesService namespaceFilesService = runContext.getApplicationContext().getBean(NamespaceFilesService.class);
namespaceFilesService.inject(runContext, taskRun.getTenantId(), taskRun.getNamespace(), runContext.tempDir(), this.namespaceFiles);
}

View File

@@ -21,6 +21,7 @@ import java.net.URI;
import java.security.GeneralSecurityException;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
@SuperBuilder
@ToString
@@ -122,6 +123,16 @@ public class Request extends AbstractHttp implements RunnableTask<Request.Output
response = client
.toBlocking()
.exchange(request, Argument.STRING, Argument.STRING);
// check that the string is a valid Unicode string
if (response.getBody().isPresent()) {
OptionalInt illegalChar = response.body().chars().filter(c -> !Character.isDefined(c)).findFirst();
if (illegalChar.isPresent()) {
throw new IllegalArgumentException("Illegal unicode code point in request body: " + illegalChar.getAsInt() +
", the Request task only support valid Unicode strings as body.\n" +
"You can try using the Download task instead.");
}
}
} catch (HttpClientResponseException e) {
if (!allowFailed) {
throw e;

View File

@@ -80,7 +80,6 @@ public class DeleteFiles extends Task implements RunnableTask<DeleteFiles.Output
private String namespace;
@NotNull
@NotEmpty
@Schema(
title = "A file or a list of files from the given namespace.",
description = "String or a list of strings; each string can either be a regex glob pattern or a file path URI.",

View File

@@ -11,7 +11,6 @@ import io.kestra.core.runners.RunContext;
import io.kestra.core.services.FlowService;
import io.kestra.core.utils.Rethrow;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotEmpty;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -84,7 +83,6 @@ public class DownloadFiles extends Task implements RunnableTask<DownloadFiles.Ou
private String namespace;
@NotNull
@NotEmpty
@Schema(
title = "A file or a list of files from the given namespace.",
description = "String or a list of strings; each string can either be a regex glob pattern or a file path URI.",
@@ -93,11 +91,19 @@ public class DownloadFiles extends Task implements RunnableTask<DownloadFiles.Ou
@PluginProperty(dynamic = true)
private Object files;
@Schema(
title = "The folder where the downloaded files will be stored"
)
@PluginProperty(dynamic = true)
@Builder.Default
private String destination = "";
@Override
public Output run(RunContext runContext) throws Exception {
Logger logger = runContext.logger();
String renderedNamespace = runContext.render(namespace);
String renderedDestination = runContext.render(destination);
// Check if namespace is allowed
RunContext.FlowInfo flowInfo = runContext.flowInfo();
FlowService flowService = runContext.getApplicationContext().getBean(FlowService.class);
@@ -120,7 +126,7 @@ public class DownloadFiles extends Task implements RunnableTask<DownloadFiles.Ou
namespaceFilesService.recursiveList(flowInfo.tenantId(), renderedNamespace, null).forEach(Rethrow.throwConsumer(uri -> {
if (patterns.stream().anyMatch(p -> p.matches(Path.of(uri.getPath())))) {
try (InputStream inputStream = namespaceFilesService.content(flowInfo.tenantId(), renderedNamespace, uri)) {
downloaded.put(uri.getPath(), runContext.storage().putFile(inputStream, uri.getPath()));
downloaded.put(uri.getPath(), runContext.storage().putFile(inputStream, destination + uri.getPath()));
logger.debug(String.format("Downloaded %s", uri));
}
}

View File

@@ -138,10 +138,10 @@ public class UploadFiles extends Task implements RunnableTask<UploadFiles.Output
});
// check for file in current tempDir that match regexs
List<PathMatcher> patterns = regexs.stream().map(reg -> FileSystems.getDefault().getPathMatcher("glob:" + reg)).toList();
for (File file : Objects.requireNonNull(runContext.tempDir().toFile().listFiles())) {
List<PathMatcher> patterns = regexs.stream().map(reg -> FileSystems.getDefault().getPathMatcher("glob:" + runContext.tempDir().toString() + checkLeadingSlash(reg))).toList();
for (File file : Objects.requireNonNull(listFilesRecursively(runContext.tempDir().toFile()))) {
if (patterns.stream().anyMatch(p -> p.matches(Path.of(file.toURI().getPath())))) {
String newFilePath = buildPath(renderedDestination, file.getName());
String newFilePath = buildPath(renderedDestination, file.getPath().replace(runContext.tempDir().toString(), ""));
storeNewFile(logger, runContext, storageInterface, flowInfo.tenantId(), newFilePath, new FileInputStream(file));
}
}
@@ -199,6 +199,24 @@ public class UploadFiles extends Task implements RunnableTask<UploadFiles.Output
}
}
private List<File> listFilesRecursively(File directory) throws IOException {
List<File> files = new ArrayList<>();
if (directory == null || !directory.isDirectory()) {
return files; // Handle invalid directory or not a directory
}
for (File file : directory.listFiles()) {
if (file.isFile()) {
files.add(file);
} else {
// Recursively call for subdirectories
files.addAll(listFilesRecursively(file));
}
}
return files;
}
@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {

View File

@@ -1,7 +1,11 @@
package io.kestra.plugin.core.trigger;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.Label;
import io.kestra.core.models.annotations.PluginProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import java.util.ArrayList;
import java.util.List;
import lombok.*;
import lombok.experimental.SuperBuilder;
import io.kestra.core.models.annotations.Example;
@@ -91,7 +95,7 @@ public class Flow extends AbstractTrigger implements TriggerOutput<Flow.Output>
.namespace(flow.getNamespace())
.flowId(flow.getId())
.flowRevision(flow.getRevision())
.labels(flow.getLabels())
.labels(generateLabels(runContext, flow))
.state(new State())
.trigger(ExecutionTrigger.of(
this,
@@ -128,6 +132,34 @@ public class Flow extends AbstractTrigger implements TriggerOutput<Flow.Output>
}
}
private List<Label> generateLabels(RunContext runContext, io.kestra.core.models.flows.Flow flow) {
final List<Label> labels = new ArrayList<>();
if (flow.getLabels() != null) {
labels.addAll(flow.getLabels()); // no need for rendering
}
if (this.getLabels() != null) {
for (Label label : this.getLabels()) {
final var value = renderLabelValue(runContext, label);
if (value != null) {
labels.add(new Label(label.key(), value));
}
}
}
return labels;
}
private String renderLabelValue(RunContext runContext, Label label) {
try {
return runContext.render(label.value());
} catch (IllegalVariableEvaluationException e) {
runContext.logger().warn("Failed to render label '{}', it will be omitted", label.key(), e);
return null;
}
}
@Builder
@ToString
@EqualsAndHashCode

View File

@@ -5,6 +5,7 @@ import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.model.time.ExecutionTime;
import com.cronutils.parser.CronParser;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.Label;
import io.kestra.core.models.annotations.Example;
@@ -349,14 +350,13 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
// validate schedule condition can fail to render variables
// in this case, we return a failed execution so the trigger is not evaluated each second
runContext.logger().error("Unable to evaluate the Schedule trigger '{}'", this.getId(), ie);
List<Label> labels = generateLabels(conditionContext, backfill);
Execution execution = Execution.builder()
.id(runContext.getTriggerExecutionId())
.tenantId(triggerContext.getTenantId())
.namespace(triggerContext.getNamespace())
.flowId(triggerContext.getFlowId())
.flowRevision(triggerContext.getFlowRevision())
.labels(labels)
.flowRevision(conditionContext.getFlow().getRevision())
.labels(generateLabels(runContext, conditionContext, backfill))
.state(new State().withState(State.Type.FAILED))
.build();
return Optional.of(execution);
@@ -390,7 +390,7 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
} else {
variables = scheduleDates.toMap();
}
List<Label> labels = generateLabels(conditionContext, backfill);
List<Label> labels = generateLabels(runContext, conditionContext, backfill);
ExecutionTrigger executionTrigger = ExecutionTrigger.of(this, variables);
@@ -399,7 +399,7 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
.tenantId(triggerContext.getTenantId())
.namespace(triggerContext.getNamespace())
.flowId(triggerContext.getFlowId())
.flowRevision(triggerContext.getFlowRevision())
.flowRevision(conditionContext.getFlow().getRevision())
.labels(labels)
.state(new State())
.trigger(executionTrigger)
@@ -425,19 +425,29 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
.orElse(RecoverMissedSchedules.ALL);
}
private List<Label> generateLabels(ConditionContext conditionContext, Backfill backfill) {
private List<Label> generateLabels(RunContext runContext, ConditionContext conditionContext, Backfill backfill) throws IllegalVariableEvaluationException {
List<Label> labels = new ArrayList<>();
if (conditionContext.getFlow().getLabels() != null) {
labels.addAll(conditionContext.getFlow().getLabels());
labels.addAll(conditionContext.getFlow().getLabels()); // no need for rendering
}
if (backfill != null && backfill.getLabels() != null) {
labels.addAll(backfill.getLabels());
for (Label label : backfill.getLabels()) {
final var value = runContext.render(label.value());
if (value != null) {
labels.add(new Label(label.key(), value));
}
}
}
if (this.getLabels() != null) {
labels.addAll(this.getLabels());
for (Label label : this.getLabels()) {
final var value = runContext.render(label.value());
if (value != null) {
labels.add(new Label(label.key(), value));
}
}
}
return labels;

View File

@@ -26,7 +26,6 @@ public abstract class AbstractTriggerRepositoryTest {
return Trigger.builder()
.flowId(IdUtils.create())
.namespace(TEST_NAMESPACE)
.flowRevision(1)
.triggerId(IdUtils.create())
.executionId(IdUtils.create())
.date(ZonedDateTime.now());

View File

@@ -88,8 +88,7 @@ public class DeserializationIssuesCaseTest {
"date": "2023-11-24T15:48:57.632881597Z",
"flowId": "http-trigger",
"namespace": "dev",
"triggerId": "http",
"flowRevision": 3
"triggerId": "http"
},
"conditionContext": {
"flow": {

View File

@@ -34,6 +34,13 @@ class FilesServiceTest {
assertThat(content.get("file.txt"), is("Hello World"));
}
@Test
void renderRawFile() throws Exception {
RunContext runContext = runContextFactory.of(Map.of("filename", "file.txt", "content", "Hello World"));
Map<String, String> content = FilesService.inputFiles(runContext, Map.of("{{filename}}", "{% raw %}{{content}}{% endraw %}"));
assertThat(content.get("file.txt"), is("{{content}}"));
}
@Test
void outputFiles() throws Exception {
RunContext runContext = runContextFactory.of();

View File

@@ -104,6 +104,13 @@ abstract public class FlowListenersTest {
assertThat(count.get(), is(2));
assertThat(flowListenersService.flows().size(), is(2));
});
Flow withTenant = first.toBuilder().tenantId("some-tenant").build();
flowRepository.create(withTenant, withTenant.generateSource(), pluginDefaultService.injectDefaults(withTenant));
wait(ref, () -> {
assertThat(count.get(), is(3));
assertThat(flowListenersService.flows().size(), is(3));
});
}
public static class Ref {

View File

@@ -1,9 +1,13 @@
package io.kestra.core.runners;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.micronaut.context.ApplicationContext;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import java.util.LinkedHashMap;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -18,6 +22,9 @@ class VariableRendererTest {
@Inject
VariableRenderer.VariableConfiguration variableConfiguration;
@Inject
VariableRenderer variableRenderer;
@Test
void shouldRenderUsingAlternativeRendering() throws IllegalVariableEvaluationException {
TestVariableRenderer renderer = new TestVariableRenderer(applicationContext, variableConfiguration);
@@ -25,6 +32,25 @@ class VariableRendererTest {
Assertions.assertEquals("result", render);
}
@Test
void shouldKeepKeyOrderWhenRenderingMap() throws IllegalVariableEvaluationException {
final Map<String, Object> input = new LinkedHashMap<>();
input.put("foo-1", "A");
input.put("foo-2", "B");
final Map<String, Object> input_value3 = new LinkedHashMap<>();
input_value3.put("bar-1", "C");
input_value3.put("bar-2", "D");
input_value3.put("bar-3", "E");
//
input.put("foo-3", input_value3);
final Map<String, Object> result = variableRenderer.render(input, Map.of());
assertThat(result.keySet(), contains("foo-1", "foo-2", "foo-3"));
final Map<String, Object> result_value3 = (Map<String, Object>) result.get("foo-3");
assertThat(result_value3.keySet(), contains("bar-1", "bar-2", "bar-3"));
}
public static class TestVariableRenderer extends VariableRenderer {

View File

@@ -104,7 +104,7 @@ abstract public class AbstractSchedulerTest {
.id(IdUtils.create())
.namespace(context.getNamespace())
.flowId(context.getFlowId())
.flowRevision(context.getFlowRevision())
.flowRevision(conditionContext.getFlow().getRevision())
.state(new State())
.trigger(ExecutionTrigger.builder()
.id(this.getId())

View File

@@ -7,7 +7,6 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.plugin.core.trigger.Schedule;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.runners.TestMethodScopedWorker;
import io.kestra.core.runners.Worker;
import io.kestra.core.utils.IdUtils;
import jakarta.inject.Inject;
@@ -18,13 +17,13 @@ import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.*;
class SchedulerConditionTest extends AbstractSchedulerTest {
@Inject
@@ -33,6 +32,9 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
@Inject
protected SchedulerTriggerStateInterface triggerState;
@Inject
protected SchedulerExecutionStateInterface executionState;
private static Flow createScheduleFlow() {
Schedule schedule = Schedule.builder()
.id("hourly")
@@ -58,6 +60,7 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
void schedule() throws Exception {
// mock flow listeners
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
SchedulerExecutionStateInterface executionRepositorySpy = spy(this.executionState);
CountDownLatch queueCount = new CountDownLatch(4);
Flow flow = createScheduleFlow();
@@ -65,7 +68,6 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
triggerState.create(Trigger.builder()
.namespace(flow.getNamespace())
.flowId(flow.getId())
.flowRevision(flow.getRevision())
.triggerId("hourly")
.date(ZonedDateTime.parse("2021-09-06T02:00:00+01:00[Europe/Paris]"))
.build()
@@ -75,12 +77,22 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
.when(flowListenersServiceSpy)
.flows();
// mock the backfill execution is ended
doAnswer(invocation -> Optional.of(Execution.builder().state(new State().withState(State.Type.SUCCESS)).build()))
.when(executionRepositorySpy)
.findById(any(), any());
// start the worker as it execute polling triggers
Worker worker = new Worker(applicationContext, 8, null);
worker.run();
// scheduler
try (AbstractScheduler scheduler = new DefaultScheduler(
applicationContext,
flowListenersServiceSpy,
triggerState);
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null)) {
executionRepositorySpy,
triggerState
)) {
// wait for execution
Runnable assertionStop = executionQueue.receive(SchedulerConditionTest.class, either -> {
Execution execution = either.getLeft();
@@ -97,8 +109,6 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
scheduler.run();
queueCount.await(15, TimeUnit.SECONDS);
// needed for RetryingTest to work since there is no context cleaning between method => we have to clear assertion receiver manually
assertionStop.run();
assertThat(queueCount.getCount(), is(0L));
}

View File

@@ -36,6 +36,9 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
@Inject
private SchedulerTriggerStateInterface triggerState;
@Inject
private SchedulerExecutionState schedulerExecutionState;
@Inject
private FlowListeners flowListenersService;
@@ -188,6 +191,7 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
return new DefaultScheduler(
applicationContext,
flowListenersServiceSpy,
schedulerExecutionState,
triggerState
);
}

View File

@@ -35,6 +35,9 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
@Inject
protected SchedulerTriggerStateInterface triggerState;
@Inject
protected SchedulerExecutionStateInterface executionState;
@Inject
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
protected QueueInterface<LogEntry> logQueue;
@@ -62,10 +65,11 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
.truncatedTo(ChronoUnit.HOURS);
}
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
return new DefaultScheduler(
applicationContext,
flowListenersServiceSpy,
executionStateSpy,
triggerState
);
}
@@ -75,6 +79,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
void schedule() throws Exception {
// mock flow listeners
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
SchedulerExecutionStateInterface executionStateSpy = spy(this.executionState);
CountDownLatch queueCount = new CountDownLatch(6);
CountDownLatch invalidLogCount = new CountDownLatch(1);
Set<String> date = new HashSet<>();
@@ -109,7 +114,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
triggerState.create(trigger.toBuilder().triggerId("schedule-invalid").flowId(invalid.getId()).build());
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionStateSpy)) {
// wait for execution
Runnable assertionStop = executionQueue.receive(either -> {
Execution execution = either.getLeft();
@@ -169,7 +174,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
triggerState.create(trigger);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
scheduler.run();
Await.until(() -> {
@@ -203,7 +208,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
CountDownLatch queueCount = new CountDownLatch(1);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
// wait for execution
Runnable assertionStop = executionQueue.receive(either -> {
Execution execution = either.getLeft();
@@ -249,7 +254,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
CountDownLatch queueCount = new CountDownLatch(1);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
// wait for execution
Runnable assertionStop = executionQueue.receive(either -> {
Execution execution = either.getLeft();
@@ -293,7 +298,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
triggerState.create(lastTrigger);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
scheduler.run();
Await.until(() -> scheduler.isReady(), Duration.ofMillis(100), Duration.ofSeconds(5));
@@ -324,7 +329,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
.build();
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
scheduler.run();
Await.until(() -> {
@@ -389,7 +394,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
triggerState.create(trigger);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
scheduler.run();
// Wait 3s to see if things happen
@@ -427,7 +432,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
CountDownLatch queueCount = new CountDownLatch(2);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
// wait for execution
Runnable assertionStop = executionQueue.receive(either -> {
Execution execution = either.getLeft();
@@ -488,7 +493,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
CountDownLatch queueCount = new CountDownLatch(1);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
// wait for execution
Runnable assertionStop = executionQueue.receive(either -> {
Execution execution = either.getLeft();

View File

@@ -42,6 +42,9 @@ public class SchedulerStreamingTest extends AbstractSchedulerTest {
@Inject
protected SchedulerTriggerStateInterface triggerState;
@Inject
protected SchedulerExecutionStateInterface executionState;
private static Flow createFlow(Boolean failed) {
RealtimeUnitTest schedule = RealtimeUnitTest.builder()
.id("stream")
@@ -75,6 +78,7 @@ public class SchedulerStreamingTest extends AbstractSchedulerTest {
AbstractScheduler scheduler = new DefaultScheduler(
applicationContext,
flowListenersServiceSpy,
executionState,
triggerState
);
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null)

View File

@@ -16,14 +16,14 @@ import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.*;
public class SchedulerThreadTest extends AbstractSchedulerTest {
@Inject
@@ -32,6 +32,9 @@ public class SchedulerThreadTest extends AbstractSchedulerTest {
@Inject
protected SchedulerTriggerStateInterface triggerState;
@Inject
protected SchedulerExecutionStateInterface executionState;
public static Flow createThreadFlow() {
return createThreadFlow(null);
}
@@ -72,17 +75,23 @@ public class SchedulerThreadTest extends AbstractSchedulerTest {
// mock flow listeners
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
SchedulerExecutionStateInterface schedulerExecutionStateSpy = spy(this.executionState);
doReturn(Collections.singletonList(flow))
.when(flowListenersServiceSpy)
.flows();
// mock the backfill execution is ended
doAnswer(invocation -> Optional.of(Execution.builder().state(new State().withState(State.Type.SUCCESS)).build()))
.when(schedulerExecutionStateSpy)
.findById(any(), any());
// scheduler
try (
AbstractScheduler scheduler = new DefaultScheduler(
applicationContext,
flowListenersServiceSpy,
schedulerExecutionStateSpy,
triggerState
);
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null)

View File

@@ -52,6 +52,9 @@ public class SchedulerTriggerChangeTest extends AbstractSchedulerTest {
@Inject
protected SchedulerTriggerStateInterface triggerState;
@Inject
protected SchedulerExecutionStateInterface executionState;
public static Flow createFlow(Duration sleep) {
SleepTriggerTest schedule = SleepTriggerTest.builder()
.id("sleep")
@@ -101,6 +104,7 @@ public class SchedulerTriggerChangeTest extends AbstractSchedulerTest {
AbstractScheduler scheduler = new DefaultScheduler(
applicationContext,
flowListenersService,
executionState,
triggerState
);
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null)

View File

@@ -21,7 +21,6 @@ public abstract class SchedulerTriggerStateInterfaceTest {
return Trigger.builder()
.flowId(IdUtils.create())
.namespace("io.kestra.unittest")
.flowRevision(1)
.triggerId(IdUtils.create())
.executionId(IdUtils.create())
.date(ZonedDateTime.now());

View File

@@ -123,10 +123,10 @@ class YamlFlowParserTest {
void inputs() {
Flow flow = this.parse("flows/valids/inputs.yaml");
assertThat(flow.getInputs().size(), is(27));
assertThat(flow.getInputs().stream().filter(Input::getRequired).count(), is(9L));
assertThat(flow.getInputs().size(), is(28));
assertThat(flow.getInputs().stream().filter(Input::getRequired).count(), is(10L));
assertThat(flow.getInputs().stream().filter(r -> !r.getRequired()).count(), is(18L));
assertThat(flow.getInputs().stream().filter(r -> r.getDefaults() != null).count(), is(1L));
assertThat(flow.getInputs().stream().filter(r -> r.getDefaults() != null).count(), is(2L));
assertThat(flow.getInputs().stream().filter(r -> r instanceof StringInput && ((StringInput)r).getValidator() != null).count(), is(1L));
}

View File

@@ -4,6 +4,7 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.List;
@@ -16,6 +17,14 @@ class SkipExecutionServiceTest {
@Inject
private SkipExecutionService skipExecutionService;
@BeforeEach
void resetAll() {
skipExecutionService.setSkipExecutions(null);
skipExecutionService.setSkipFlows(null);
skipExecutionService.setSkipNamespaces(null);
skipExecutionService.setSkipTenants(null);
}
@Test
void skipExecutionByExecutionId() {
var executionToSkip = "aaabbbccc";
@@ -65,4 +74,25 @@ class SkipExecutionServiceTest {
assertThat(skipExecutionService.skipExecution(null, "namespace", "not_skipped", "random"), is(false));
assertThat(skipExecutionService.skipExecution("tenant", "namespace", "not_skipped", "random"), is(false));
}
@Test
void skipExecutionByNamespace() {
skipExecutionService.setSkipNamespaces(List.of("tenant|namespace"));
assertThat(skipExecutionService.skipExecution("tenant", "namespace", "someFlow", "someExecution"), is(true));
assertThat(skipExecutionService.skipExecution(null, "namespace", "someFlow", "someExecution"), is(false));
assertThat(skipExecutionService.skipExecution("anotherTenant", "namespace", "someFlow", "someExecution"), is(false));
assertThat(skipExecutionService.skipExecution("tenant", "namespace", "anotherFlow", "anotherExecution"), is(true));
assertThat(skipExecutionService.skipExecution("tenant", "other.namespace", "someFlow", "someExecution"), is(false));
}
@Test
void skipExecutionByTenantId() {
skipExecutionService.setSkipTenants(List.of("tenant"));
assertThat(skipExecutionService.skipExecution("tenant", "namespace", "someFlow", "someExecution"), is(true));
assertThat(skipExecutionService.skipExecution("anotherTenant", "namespace", "someFlow", "someExecution"), is(false));
assertThat(skipExecutionService.skipExecution("tenant", "another.namespace", "someFlow", "someExecution"), is(true));
assertThat(skipExecutionService.skipExecution("anotherTenant", "another.namespace", "someFlow", "someExecution"), is(false));
}
}

View File

@@ -6,22 +6,23 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.common.EncryptedString;
import io.kestra.core.runners.AbstractMemoryRunnerTest;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.storages.InternalStorage;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.IdUtils;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.RetryingTest;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Files;
import java.security.GeneralSecurityException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
@@ -30,7 +31,6 @@ import java.util.concurrent.TimeoutException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.io.FileMatchers.anExistingFile;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -38,6 +38,9 @@ public class WorkingDirectoryTest extends AbstractMemoryRunnerTest {
@Inject
Suite suite;
@Inject
RunContextFactory runContextFactory;
@Test
void success() throws TimeoutException {
suite.success(runnerUtils);
@@ -83,6 +86,11 @@ public class WorkingDirectoryTest extends AbstractMemoryRunnerTest {
suite.outputFiles(runnerUtils);
}
@Test
void encryption() throws Exception {
suite.encryption(runnerUtils, runContextFactory);
}
@Singleton
public static class Suite {
@Inject
@@ -154,8 +162,15 @@ public class WorkingDirectoryTest extends AbstractMemoryRunnerTest {
storageContext
, storageInterface
);
URI fileURI = URI.create("kestra:" + storageContext.getContextStorageURI() + "/input.txt");
assertThat(new String(storage.getFile(fileURI).readAllBytes()), is("Hello World"));
TaskRun taskRun = execution.getTaskRunList().get(1);
Map<String, Object> outputs = taskRun.getOutputs();
assertThat(outputs, hasKey("uris"));
URI uri = URI.create(((Map<String, String>) outputs.get("uris")).get("input.txt"));
assertTrue(uri.toString().endsWith("input.txt"));
assertThat(new String(storage.getFile(uri).readAllBytes()), is("Hello World"));
}
@SuppressWarnings("unchecked")
@@ -236,6 +251,18 @@ public class WorkingDirectoryTest extends AbstractMemoryRunnerTest {
assertThat(execution.findTaskRunsByTaskId("t3").get(0).getOutputs().get("value"), is("third"));
}
public void encryption(RunnerUtils runnerUtils, RunContextFactory runContextFactory) throws TimeoutException, GeneralSecurityException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "working-directory-taskrun-encrypted");
assertThat(execution.getTaskRunList(), hasSize(3));
Map<String, Object> encryptedString = (Map<String, Object>) execution.findTaskRunsByTaskId("encrypted").get(0).getOutputs().get("value");
assertThat(encryptedString.get("type"), is(EncryptedString.TYPE));
String encryptedValue = (String) encryptedString.get("value");
assertThat(encryptedValue, is(not("Hello World")));
assertThat(runContextFactory.of().decrypt(encryptedValue), is("Hello World"));
assertThat(execution.findTaskRunsByTaskId("decrypted").get(0).getOutputs().get("value"), is("Hello World"));
}
private void put(String path, String content) throws IOException {
storageInterface.put(
null,

View File

@@ -30,7 +30,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
@MicronautTest
class DownloadTest {
public static final String FILE = "http://speedtest.ftp.otenet.gr/files/test1Mb.db";
public static final String FILE = "https://sampletestfile.com/wp-content/uploads/2023/07/500KB-CSV.csv";
@Inject
private RunContextFactory runContextFactory;
@@ -56,11 +56,11 @@ class DownloadTest {
IOUtils.toString(this.storageInterface.get(null, output.getUri()), StandardCharsets.UTF_8),
is(IOUtils.toString(new URI(FILE).toURL().openStream(), StandardCharsets.UTF_8))
);
assertThat(output.getUri().toString(), endsWith(".db"));
assertThat(output.getUri().toString(), endsWith(".csv"));
}
@Test
void noResponse() throws Exception {
void noResponse() {
EmbeddedServer embeddedServer = applicationContext.getBean(EmbeddedServer.class);
embeddedServer.start();
@@ -100,7 +100,7 @@ class DownloadTest {
}
@Test
void error() throws Exception {
void error() {
EmbeddedServer embeddedServer = applicationContext.getBean(EmbeddedServer.class);
embeddedServer.start();

View File

@@ -11,6 +11,7 @@ import io.micronaut.context.ApplicationContext;
import io.micronaut.http.*;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.annotation.Head;
import io.micronaut.http.annotation.Post;
import io.micronaut.http.multipart.StreamingFileUpload;
import io.micronaut.runtime.server.EmbeddedServer;
@@ -67,7 +68,7 @@ class RequestTest {
@Test
void head() throws Exception {
final String url = "http://speedtest.ftp.otenet.gr/files/test100Mb.db";
final String url = "https://sampletestfile.com/wp-content/uploads/2023/07/500KB-CSV.csv";
Request task = Request.builder()
.id(RequestTest.class.getSimpleName())
@@ -81,7 +82,7 @@ class RequestTest {
Request.Output output = task.run(runContext);
assertThat(output.getUri(), is(URI.create(url)));
assertThat(output.getHeaders().get("Content-Length").get(0), is("104857600"));
assertThat(output.getHeaders().get("content-length").getFirst(), is("512789"));
}
@Test
@@ -260,6 +261,11 @@ class RequestTest {
return HttpResponse.ok("{ \"hello\": \"world\" }");
}
@Head("/hello")
HttpResponse<String> head() {
return HttpResponse.ok();
}
@Get("/hello417")
HttpResponse<String> hello417() {
return HttpResponse.status(HttpStatus.EXPECTATION_FAILED).body("{ \"hello\": \"world\" }");

View File

@@ -7,6 +7,7 @@ import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.core.runners.Worker;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.schedulers.DefaultScheduler;
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
import io.kestra.core.schedulers.SchedulerTriggerStateInterface;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.utils.IdUtils;
@@ -31,6 +32,9 @@ class TriggerTest {
@Inject
private SchedulerTriggerStateInterface triggerState;
@Inject
protected SchedulerExecutionStateInterface executionState;
@Inject
private FlowListenersInterface flowListenersService;
@@ -51,6 +55,7 @@ class TriggerTest {
AbstractScheduler scheduler = new DefaultScheduler(
this.applicationContext,
this.flowListenersService,
this.executionState,
this.triggerState
);
Worker worker = applicationContext.createBean(Worker.class, IdUtils.create(), 8, null);
@@ -89,6 +94,7 @@ class TriggerTest {
AbstractScheduler scheduler = new DefaultScheduler(
this.applicationContext,
this.flowListenersService,
this.executionState,
this.triggerState
);
) {

View File

@@ -16,6 +16,7 @@ import java.util.Optional;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
@MicronautTest
@@ -109,4 +110,49 @@ class FlowTest {
assertThat(evaluate.get().getLabels(), hasItem(new Label("flow-label-1", "flow-label-1")));
assertThat(evaluate.get().getLabels(), hasItem(new Label("flow-label-2", "flow-label-2")));
}
@Test
void success_withLabels() {
var flow = io.kestra.core.models.flows.Flow.builder()
.id("flow-with-flow-trigger")
.namespace("io.kestra.unittest")
.revision(1)
.labels(List.of(
new Label("flow-label-1", "flow-label-1"),
new Label("flow-label-2", "flow-label-2")
))
.tasks(Collections.singletonList(Return.builder()
.id("test")
.type(Return.class.getName())
.format("test")
.build()))
.build();
var execution = Execution.builder()
.id(IdUtils.create())
.namespace("io.kestra.unittest")
.flowId("flow-with-flow-trigger")
.flowRevision(1)
.state(State.of(State.Type.RUNNING, Collections.emptyList()))
.build();
var flowTrigger = Flow.builder()
.id("flow")
.type(Flow.class.getName())
.labels(List.of(
new Label("trigger-label-1", "trigger-label-1"),
new Label("trigger-label-2", "{{ 'trigger-label-2' }}"),
new Label("trigger-label-3", "{{ null }}"), // should return an empty string
new Label("trigger-label-4", "{{ foobar }}") // should fail
))
.build();
Optional<Execution> evaluate = flowTrigger.evaluate(runContextFactory.of(), flow, execution);
assertThat(evaluate.isPresent(), is(true));
assertThat(evaluate.get().getLabels(), hasSize(5));
assertThat(evaluate.get().getLabels(), hasItem(new Label("flow-label-1", "flow-label-1")));
assertThat(evaluate.get().getLabels(), hasItem(new Label("flow-label-2", "flow-label-2")));
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-1", "trigger-label-1")));
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-2", "trigger-label-2")));
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-3", "")));
}
}

View File

@@ -65,7 +65,6 @@ class ScheduleTest {
return TriggerContext.builder()
.namespace(flow.getNamespace())
.flowId(flow.getNamespace())
.flowRevision(flow.getRevision())
.triggerId(schedule.getId())
.date(date)
.build();
@@ -131,6 +130,35 @@ class ScheduleTest {
assertThat(inputs.get("input2"), is("default"));
}
@Test
void success_withLabels() throws Exception {
var scheduleTrigger = Schedule.builder()
.id("schedule")
.cron("0 0 1 * *")
.labels(List.of(
new Label("trigger-label-1", "trigger-label-1"),
new Label("trigger-label-2", "{{ 'trigger-label-2' }}"),
new Label("trigger-label-3", "{{ null }}")
))
.build();
var conditionContext = conditionContext(scheduleTrigger);
var date = ZonedDateTime.now()
.withDayOfMonth(1)
.withHour(0)
.withMinute(0)
.withSecond(0)
.truncatedTo(ChronoUnit.SECONDS)
.minusMonths(1);
var triggerContext = triggerContext(date, scheduleTrigger);
Optional<Execution> evaluate = scheduleTrigger.evaluate(conditionContext, triggerContext);
assertThat(evaluate.isPresent(), is(true));
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-1", "trigger-label-1")));
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-2", "trigger-label-2")));
assertThat(evaluate.get().getLabels(), hasItem(new Label("trigger-label-3", "")));
}
@SuppressWarnings("unchecked")
@Test
void everyMinute() throws Exception {

View File

@@ -95,6 +95,11 @@ inputs:
- name: array
type: ARRAY
itemType: INT
# required true and an empty default value will only work if we correctly serialize default values which is what this input is about to test.
- name: empty
type: STRING
defaults: ''
required: true
tasks:
- id: string

View File

@@ -0,0 +1,13 @@
id: working-directory-taskrun-encrypted
namespace: io.kestra.tests
tasks:
- id: workingDir
type: io.kestra.plugin.core.flow.WorkingDirectory
tasks:
- id: encrypted
type: io.kestra.core.tasks.test.Encrypted
format: "Hello World"
- id: decrypted
type: io.kestra.plugin.core.debug.Return
format: "{{outputs.encrypted.value}}"

View File

@@ -1,4 +1,4 @@
version=0.17.0
version=0.17.26
jacksonVersion=2.16.2
micronautVersion=4.4.3
@@ -7,4 +7,4 @@ slf4jVersion=2.0.13
org.gradle.parallel=true
org.gradle.caching=true
org.gradle.priority=low
org.gradle.priority=low

View File

@@ -2,12 +2,13 @@ package io.kestra.schedulers.h2;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
import io.kestra.core.schedulers.SchedulerScheduleTest;
import io.kestra.jdbc.runner.JdbcScheduler;
class H2SchedulerScheduleTest extends SchedulerScheduleTest {
@Override
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
return new JdbcScheduler(
applicationContext,
flowListenersServiceSpy

View File

@@ -1,6 +1,6 @@
datasources:
h2:
url: jdbc:h2:mem:public;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
url: jdbc:h2:mem:public;TIME ZONE=UTC;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
username: sa
password: ""
driverClassName: org.h2.Driver

View File

@@ -2,12 +2,13 @@ package io.kestra.schedulers.mysql;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
import io.kestra.core.schedulers.SchedulerScheduleTest;
import io.kestra.jdbc.runner.JdbcScheduler;
class MysqlSchedulerScheduleTest extends SchedulerScheduleTest {
@Override
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
return new JdbcScheduler(
applicationContext,
flowListenersServiceSpy

View File

@@ -1,5 +1,6 @@
package io.kestra.repository.postgres;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.jdbc.JdbcMapper;
@@ -21,6 +22,7 @@ import org.jooq.Result;
import org.jooq.SelectConditionStep;
import org.jooq.impl.DSL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import jakarta.annotation.Nullable;
@@ -52,12 +54,10 @@ public class PostgresRepository<T> extends io.kestra.jdbc.AbstractJdbcRepository
@SneakyThrows
@Override
public Map<Field<Object>, Object> persistFields(T entity) {
Map<Field<Object>, Object> fields = super.persistFields(entity);
String json = JdbcMapper.of().writeValueAsString(entity);
fields.replace(AbstractJdbcRepository.field("value"), DSL.val(JSONB.valueOf(json)));
return fields;
return new HashMap<>(ImmutableMap
.of(io.kestra.jdbc.repository.AbstractJdbcRepository.field("value"), DSL.val(JSONB.valueOf(json)))
);
}
@SneakyThrows

View File

@@ -2,12 +2,13 @@ package io.kestra.schedulers.postgres;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
import io.kestra.core.schedulers.SchedulerScheduleTest;
import io.kestra.jdbc.runner.JdbcScheduler;
class PostgresSchedulerScheduleTest extends SchedulerScheduleTest {
@Override
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
return new JdbcScheduler(
applicationContext,
flowListenersServiceSpy

View File

@@ -61,6 +61,8 @@ import java.util.stream.Collectors;
@Singleton
public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcRepository implements ExecutionRepositoryInterface, JdbcIndexerInterface<Execution> {
private static final int FETCH_SIZE = 100;
protected final io.kestra.jdbc.AbstractJdbcRepository<Execution> jdbcRepository;
private final ApplicationEventPublisher<CrudEvent<Execution>> eventPublisher;
private final ApplicationContext applicationContext;
@@ -110,10 +112,13 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
.where(this.defaultFilter(tenantId))
.and(field("trigger_execution_id").eq(triggerExecutionId));
select.fetch()
.map(this.jdbcRepository::map)
.forEach(emitter::next);
emitter.complete();
// fetchSize will fetch rows 100 by 100 even for databases where the driver loads all in memory
// using a stream will fetch lazily, otherwise all fetches would be done before starting emitting the items
try (var stream = select.fetchSize(FETCH_SIZE).stream()) {
stream.map(this.jdbcRepository::map).forEach(emitter::next);
} finally {
emitter.complete();
}
}),
FluxSink.OverflowStrategy.BUFFER
);
@@ -172,7 +177,8 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
state,
labels,
triggerExecutionId,
childFilter
childFilter,
false
);
return this.jdbcRepository.fetchPage(context, select, pageable);
@@ -190,7 +196,8 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
@Nullable List<State.Type> state,
@Nullable Map<String, String> labels,
@Nullable String triggerExecutionId,
@Nullable ChildFilter childFilter
@Nullable ChildFilter childFilter,
boolean deleted
) {
return Flux.create(
emitter -> this.jdbcRepository
@@ -209,14 +216,17 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
state,
labels,
triggerExecutionId,
childFilter
childFilter,
deleted
);
select.fetch()
.map(this.jdbcRepository::map)
.forEach(emitter::next);
emitter.complete();
// fetchSize will fetch rows 100 by 100 even for databases where the driver loads all in memory
// using a stream will fetch lazily, otherwise all fetches would be done before starting emitting the items
try (var stream = select.fetchSize(FETCH_SIZE).stream()) {
stream.map(this.jdbcRepository::map).forEach(emitter::next);
} finally {
emitter.complete();
}
}),
FluxSink.OverflowStrategy.BUFFER
);
@@ -233,7 +243,8 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
@Nullable List<State.Type> state,
@Nullable Map<String, String> labels,
@Nullable String triggerExecutionId,
@Nullable ChildFilter childFilter
@Nullable ChildFilter childFilter,
boolean deleted
) {
SelectConditionStep<Record1<Object>> select = context
.select(
@@ -241,7 +252,7 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
)
.hint(context.configuration().dialect() == SQLDialect.MYSQL ? "SQL_CALC_FOUND_ROWS" : null)
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
.where(this.defaultFilter(tenantId, deleted));
select = filteringQuery(select, namespace, flowId, null, query, labels, triggerExecutionId, childFilter);

View File

@@ -23,6 +23,11 @@ public abstract class AbstractJdbcRepository {
return tenant.and(field("deleted", Boolean.class).eq(false));
}
protected Condition defaultFilter(String tenantId, Boolean allowDeleted) {
var tenant = buildTenantCondition(tenantId);
return allowDeleted ? tenant : tenant.and(field("deleted", Boolean.class).eq(false));
}
protected Condition buildTenantCondition(String tenantId) {
return tenantId == null ? field("tenant_id").isNull() : field("tenant_id").eq(tenantId);
}

View File

@@ -169,7 +169,6 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
Trigger current = optionalTrigger.get();
current = current.toBuilder()
.executionId(trigger.getExecutionId())
.executionCurrentState(trigger.getExecutionCurrentState())
.updatedDate(trigger.getUpdatedDate())
.build();
this.save(context, current);

View File

@@ -706,6 +706,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
.executionId(killedExecution.getExecutionId())
.isOnKillCascade(false)
.state(ExecutionKilled.State.EXECUTED)
.tenantId(killedExecution.getTenantId())
.build()
);

View File

@@ -32,10 +32,10 @@ import java.util.function.BiConsumer;
public class JdbcScheduler extends AbstractScheduler {
private final QueueInterface<Execution> executionQueue;
private final TriggerRepositoryInterface triggerRepository;
private final ConditionService conditionService;
private final FlowRepositoryInterface flowRepository;
private final JooqDSLContextWrapper dslContextWrapper;
private final ConditionService conditionService;
@SuppressWarnings("unchecked")
@@ -49,6 +49,7 @@ public class JdbcScheduler extends AbstractScheduler {
executionQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.EXECUTION_NAMED));
triggerRepository = applicationContext.getBean(AbstractJdbcTriggerRepository.class);
triggerState = applicationContext.getBean(SchedulerTriggerStateInterface.class);
executionState = applicationContext.getBean(SchedulerExecutionState.class);
conditionService = applicationContext.getBean(ConditionService.class);
flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
dslContextWrapper = applicationContext.getBean(JooqDSLContextWrapper.class);
@@ -58,6 +59,7 @@ public class JdbcScheduler extends AbstractScheduler {
public void run() {
super.run();
// reset scheduler trigger at end
executionQueue.receive(
Scheduler.class,
either -> {
@@ -76,14 +78,6 @@ public class JdbcScheduler extends AbstractScheduler {
.ifPresent(trigger -> {
this.triggerState.update(trigger.resetExecution(execution.getState().getCurrent()));
});
} else {
// update execution state on each state change so the scheduler knows the execution is running
triggerRepository
.findByExecution(execution)
.filter(trigger -> execution.getState().getCurrent() != trigger.getExecutionCurrentState())
.ifPresent(trigger -> {
((JdbcSchedulerTriggerState) this.triggerState).updateExecution(Trigger.of(execution, trigger));
});
}
}
}
@@ -106,7 +100,7 @@ public class JdbcScheduler extends AbstractScheduler {
public void handleNext(List<Flow> flows, ZonedDateTime now, BiConsumer<List<Trigger>, ScheduleContextInterface> consumer) {
JdbcSchedulerContext schedulerContext = new JdbcSchedulerContext(this.dslContextWrapper);
schedulerContext.startTransaction(scheduleContextInterface -> {
schedulerContext.doInTransaction(scheduleContextInterface -> {
List<Trigger> triggers = this.triggerState.findByNextExecutionDateReadyForAllTenants(now, scheduleContextInterface);
consumer.accept(triggers, scheduleContextInterface);

View File

@@ -18,17 +18,14 @@ public class JdbcSchedulerContext implements ScheduleContextInterface {
this.dslContextWrapper = dslContextWrapper;
}
public void startTransaction(Consumer<ScheduleContextInterface> consumer) {
@Override
public void doInTransaction(Consumer<ScheduleContextInterface> consumer) {
this.dslContextWrapper.transaction(configuration -> {
this.context = DSL.using(configuration);
consumer.accept(this);
this.commit();
this.context.commit();
});
}
public void commit() {
this.context.commit();
}
}

View File

@@ -54,6 +54,18 @@ public class JdbcSchedulerTriggerState implements SchedulerTriggerStateInterface
return trigger;
}
@Override
public Trigger create(Trigger trigger, String headerContent) {
return this.triggerRepository.create(trigger);
}
@Override
public Trigger save(Trigger trigger, ScheduleContextInterface scheduleContextInterface, String headerContent) {
this.triggerRepository.save(trigger, scheduleContextInterface);
return trigger;
}
@Override
public Trigger create(Trigger trigger) {
@@ -84,7 +96,4 @@ public class JdbcSchedulerTriggerState implements SchedulerTriggerStateInterface
public List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext) {
throw new NotImplementedException();
}
@Override
public void unlock(Trigger trigger) {}
}

View File

@@ -38,6 +38,11 @@ public class MemoryExecutionRepository implements ExecutionRepositoryInterface {
return null;
}
@Override
public Flux<Execution> find(@Nullable String query, @Nullable String tenantId, @Nullable String namespace, @Nullable String flowId, @Nullable ZonedDateTime startDate, @Nullable ZonedDateTime endDate, @Nullable List<State.Type> state, @Nullable Map<String, String> labels, @Nullable String triggerExecutionId, @Nullable ChildFilter childFilter, boolean allowDeleted) {
return null;
}
@Override
public ArrayListTotal<TaskRun> findTaskRun(Pageable pageable, @Nullable String query, @Nullable String tenantId, @Nullable String namespace, @Nullable String flowId, @Nullable ZonedDateTime startDate, @Nullable ZonedDateTime endDate, @Nullable List<State.Type> states, @Nullable Map<String, String> labels, @Nullable String triggerExecutionId, @Nullable ChildFilter childFilter) {
throw new UnsupportedOperationException();

View File

@@ -56,6 +56,22 @@ public class MemorySchedulerTriggerState implements SchedulerTriggerStateInterfa
return trigger;
}
@Override
public Trigger save(Trigger trigger, ScheduleContextInterface scheduleContextInterface, String headerContent) {
triggers.put(trigger.uid(), trigger);
triggerQueue.emit(trigger);
return trigger;
}
@Override
public Trigger create(Trigger trigger, String headerContent) {
triggers.put(trigger.uid(), trigger);
triggerQueue.emit(trigger);
return trigger;
}
@Override
public Trigger update(Trigger trigger) {
triggers.put(trigger.uid(), trigger);
@@ -79,7 +95,4 @@ public class MemorySchedulerTriggerState implements SchedulerTriggerStateInterfa
public List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext) {
throw new NotImplementedException();
}
@Override
public void unlock(Trigger trigger) {}
}

View File

@@ -131,10 +131,10 @@ public class CommandsWrapper implements TaskCommands {
@SuppressWarnings("unchecked")
public ScriptOutput run() throws Exception {
List<String> filesToUpload = new ArrayList<>();
if (this.namespaceFiles != null) {
String tenantId = ((Map<String, String>) runContext.getVariables().get("flow")).get("tenantId");
String namespace = ((Map<String, String>) runContext.getVariables().get("flow")).get("namespace");
String tenantId = ((Map<String, String>) runContext.getVariables().get("flow")).get("tenantId");
String namespace = ((Map<String, String>) runContext.getVariables().get("flow")).get("namespace");
if (this.namespaceFiles != null && Boolean.TRUE.equals(this.namespaceFiles.getEnabled())) {
NamespaceFilesService namespaceFilesService = runContext.getApplicationContext().getBean(NamespaceFilesService.class);
List<URI> injectedFiles = namespaceFilesService.inject(
runContext,

View File

@@ -7,6 +7,6 @@ import io.kestra.core.models.tasks.runners.TaskRunner;
class DockerTest extends AbstractTaskRunnerTest {
@Override
protected TaskRunner taskRunner() {
return Docker.builder().image("centos").build();
return Docker.builder().image("rockylinux:9.3-minimal").build();
}
}

View File

@@ -78,7 +78,7 @@ public class LocalStorage implements StorageInterface {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
uris.add(URI.create(file.toString()));
uris.add(URI.create(file.toString().replace("\\", "/")));
return FileVisitResult.CONTINUE;
}
@@ -90,7 +90,7 @@ public class LocalStorage implements StorageInterface {
}
});
URI fsPathUri = URI.create(fsPath.toString());
URI fsPathUri = URI.create(fsPath.toString().replace("\\", "/"));
return uris.stream().sorted(Comparator.reverseOrder())
.map(fsPathUri::relativize)
.map(URI::getPath)
@@ -115,7 +115,7 @@ public class LocalStorage implements StorageInterface {
URI relative = URI.create(
getPath(tenantId, null).relativize(
Path.of(file.toUri())
).toString()
).toString().replace("\\", "/")
);
return getAttributes(tenantId, relative);
}))

1884
ui/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -12,20 +12,20 @@
"lint": "eslint . --ext .vue,.js,.jsx,.cjs,.mjs --fix"
},
"dependencies": {
"@kestra-io/ui-libs": "^0.0.47",
"@kestra-io/ui-libs": "^0.0.48",
"@vue-flow/background": "^1.3.0",
"@vue-flow/controls": "^1.1.1",
"@vue-flow/core": "^1.33.6",
"ansi-to-html": "^0.7.2",
"axios": "^1.6.8",
"axios": "^1.7.2",
"bootstrap": "^5.3.3",
"buffer": "^6.0.3",
"chart.js": "^4.4.2",
"chart.js": "^4.4.3",
"chartjs-chart-treemap": "^2.3.1",
"core-js": "^3.37.0",
"core-js": "^3.37.1",
"dagre": "^0.8.5",
"element-plus": "^2.7.2",
"humanize-duration": "^3.32.0",
"element-plus": "^2.7.5",
"humanize-duration": "^3.32.1",
"js-yaml": "^4.1.0",
"lodash": "^4.17.21",
"markdown-it": "^14.1.0",
@@ -40,45 +40,49 @@
"moment-timezone": "^0.5.45",
"node-modules-polyfill": "^0.1.4",
"nprogress": "^0.2.0",
"posthog-js": "^1.130.2",
"posthog-js": "^1.138.2",
"cronstrue": "^2.50.0",
"throttle-debounce": "^5.0.0",
"vite-plugin-eslint": "^1.8.1",
"vue": "^3.4.26",
"vue": "^3.4.27",
"vue-axios": "3.5.2",
"vue-chartjs": "^5.3.1",
"vue-gtag": "^2.0.1",
"vue-i18n": "^9.13.1",
"vue-material-design-icons": "^5.3.0",
"vue-router": "^4.3.2",
"vue-sidebar-menu": "^5.3.1",
"vue-sidebar-menu": "^5.4.0",
"vue-virtual-scroller": "^2.0.0-beta.8",
"vue3-popper": "^1.5.0",
"vue3-tour": "github:kestra-io/vue3-tour",
"vuex": "^4.1.0",
"xss": "^1.0.15",
"yaml": "^2.4.2"
"yaml": "^2.4.5"
},
"devDependencies": {
"@rushstack/eslint-patch": "^1.10.2",
"@shikijs/markdown-it": "^1.4.0",
"@typescript-eslint/parser": "^7.8.0",
"@vitejs/plugin-vue": "^5.0.4",
"@rushstack/eslint-patch": "^1.10.3",
"@shikijs/markdown-it": "^1.6.3",
"@typescript-eslint/parser": "^7.12.0",
"@vitejs/plugin-vue": "^5.0.5",
"@vue/eslint-config-prettier": "^9.0.0",
"@vue/test-utils": "^2.4.5",
"@vue/test-utils": "^2.4.6",
"decompress": "^4.2.1",
"eslint": "^8.57.0",
"eslint-plugin-vue": "^9.25.0",
"jsdom": "^24.0.0",
"monaco-editor": "^0.48.0",
"monaco-yaml": "^5.1.1",
"prettier": "^3.2.5",
"eslint-plugin-vue": "^9.26.0",
"jsdom": "^24.1.0",
"monaco-editor": "^0.49.0",
"monaco-yaml": "^5.2.0",
"prettier": "^3.3.1",
"rollup-plugin-copy": "^3.5.0",
"rollup-plugin-visualizer": "^5.12.0",
"sass": "^1.76.0",
"sass": "^1.77.4",
"typescript": "^5.4.5",
"vite": "^5.2.11",
"vite-plugin-rewrite-all": "1.0.1",
"vitest": "^1.5.3"
"vite": "^5.2.13",
"vitest": "^1.6.0"
},
"optionalDependencies": {
"@rollup/rollup-linux-x64-gnu": "4.18.0"
},
"overrides": {
"bootstrap": {

View File

@@ -209,7 +209,7 @@
},
watch: {
$route(to) {
if (this.user && to.name === "home" && this.overallTotal === 0) {
if (to.name === "home" && this.overallTotal === 0) {
this.$router.push({
name: "welcome",
params: {

Binary file not shown.

After

Width:  |  Height:  |  Size: 73 KiB

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 4.9 KiB

View File

@@ -1,6 +1,10 @@
<template>
<el-tooltip :persistent="false" :focus-on-show="true" popper-class="ee-tooltip" :disabled="!disabled" :placement="placement">
<el-tooltip :visible="visible" :persistent="false" :focus-on-show="true" popper-class="ee-tooltip" :disabled="!disabled" :placement="placement">
<template #content v-if="link">
<el-button circle class="ee-tooltip-close" @click="changeVisibility(false)">
<Close />
</el-button>
<p>{{ $t("ee-tooltip.features-blocked") }}</p>
<a
@@ -13,7 +17,7 @@
</a>
</template>
<template #default>
<span ref="slot-container">
<span ref="slot-container" class="cursor-pointer" @click="changeVisibility()">
<slot />
<lock v-if="disabled" />
</span>
@@ -22,10 +26,11 @@
</template>
<script>
import Close from "vue-material-design-icons/Close.vue";
import Lock from "vue-material-design-icons/Lock.vue";
export default {
components: {Lock},
components: {Close, Lock},
props: {
top: {
type: Boolean,
@@ -48,6 +53,16 @@
default: undefined
},
},
data() {
return {
visible: false,
}
},
methods: {
changeVisibility(visible = true) {
this.visible = visible
}
},
computed: {
link() {
@@ -83,5 +98,13 @@
:deep(.material-design-icon) > .material-design-icon__svg {
bottom: -0.125em;
}
.ee-tooltip-close {
position: absolute;
top: 0;
right: 0;
border: none;
margin: 0.5rem;
}
</style>

View File

@@ -77,12 +77,6 @@
</router-link>
</template>
</el-table-column>
<el-table-column :label="$t('state')">
<template #default="scope">
<status v-if="scope.row.executionCurrentState" :status="scope.row.executionCurrentState" size="small" />
</template>
</el-table-column>
<el-table-column :label="$t('date')">
<template #default="scope">
<date-ago :inverted="true" :date="scope.row.date" />
@@ -171,7 +165,6 @@
import RefreshButton from "../layout/RefreshButton.vue";
import DateAgo from "../layout/DateAgo.vue";
import Id from "../Id.vue";
import Status from "../Status.vue";
import {mapState} from "vuex";
export default {
@@ -183,7 +176,6 @@
SearchField,
NamespaceSelect,
DateAgo,
Status,
Id,
},
data() {

View File

@@ -20,6 +20,7 @@
<el-form-item
:label="$t('password')"
required
prop="password"
>
<el-input v-model="form.password" type="password" show-password />
</el-form-item>
@@ -62,6 +63,28 @@
trigger: ["blur"],
pattern: "^$|^[a-zA-Z0-9_!#$%&*+/=?`{|}~^.-]+@[a-zA-Z0-9.-]+$"
},
{
validator: (rule, value, callback) => {
if (value && value.length > 256) {
callback(new Error(this.$t("email length constraint")));
} else {
callback();
}
},
trigger: ["blur", "change"]
}
],
password: [
{
validator: (rule, value, callback) => {
if (value && value.length > 256) {
callback(new Error(this.$t("password length constraint")));
} else {
callback();
}
},
trigger: ["blur", "change"]
}
],
confirmPassword: [
{

View File

@@ -7,6 +7,10 @@
<p>
<span v-html="$t('errors.' + code + '.content')" />
</p>
<el-button tag="router-link" :to="{name: 'home'}" type="primary">
{{ $t("back_to_dashboard") }}
</el-button>
</section>
</template>
@@ -42,19 +46,23 @@
<style lang="scss" scoped>
.errors {
h2 {
margin-bottom: calc(var(--spacer) * 2);
}
width: 100%;
margin-top: 10em;
text-align: center;
.img {
display: inline-block;
background: url("../../assets/errors/sorry.svg") no-repeat;
background: url("../../assets/errors/kestra-error.png") no-repeat center;
background-size: contain;
height: 300px;
width: 300px;
}
h2 {
line-height: 30px;
font-size: 20px;
font-weight: 600;
}
p {
line-height: 22px;
font-size: 14px;
}
}
</style>

View File

@@ -69,6 +69,14 @@
if (oldValue.name === newValue.name && this.previousExecutionId !== this.$route.params.id) {
this.follow()
}
// if we change the execution id, we need to close the sse
if (this.$route.params.id != this.execution.id) {
this.closeSSE();
window.removeEventListener("popstate", this.follow)
this.$store.commit("execution/setExecution", undefined);
this.$store.commit("flow/setFlow", undefined);
this.$store.commit("flow/setFlowGraph", undefined);
}
},
},
methods: {
@@ -91,13 +99,16 @@
}
// sse.onerror doesnt return the details of the error
// but as our emitter can only throw an error on 404
// we can safely assume that the error
// we can safely assume that the error is a 404
// if execution is not defined
this.sse.onerror = () => {
this.$store.dispatch("core/showMessage", {
variant: "error",
title: this.$t("error"),
message: this.$t("errors.404.flow or execution"),
});
if (!this.execution) {
this.$store.dispatch("core/showMessage", {
variant: "error",
title: this.$t("error"),
message: this.$t("errors.404.flow or execution"),
});
}
}
});
},

View File

@@ -425,6 +425,8 @@
import {ElMessageBox, ElSwitch, ElFormItem, ElAlert} from "element-plus";
import {h, ref} from "vue";
import {filterLabels} from "./utils"
export default {
mixins: [RouteContext, RestoreUrl, DataTableActions, SelectTableActions],
components: {
@@ -809,6 +811,13 @@
);
},
setLabels() {
const filtered = filterLabels(this.executionLabels)
if(filtered.error) {
this.$toast().error(this.$t("wrong labels"))
return;
}
this.$toast().confirm(
this.$t("bulk set labels", {"executionCount": this.queryBulkAction ? this.total : this.selection.length}),
() => {
@@ -819,7 +828,7 @@
sort: this.$route.query.sort || "state.startDate:desc",
state: this.$route.query.state ? [this.$route.query.state] : this.statuses
}, false),
data: this.executionLabels
data: filtered.labels
})
.then(r => {
this.$toast().success(this.$t("Set labels done", {executionCount: r.data.count}));
@@ -829,7 +838,7 @@
return this.$store
.dispatch("execution/bulkSetLabels", {
executionsId: this.selection,
executionLabels: this.executionLabels
executionLabels: filtered.labels
})
.then(r => {
this.$toast().success(this.$t("Set labels done", {executionCount: r.data.count}));

View File

@@ -53,6 +53,8 @@
import LabelInput from "../../components/labels/LabelInput.vue";
import State from "../../utils/state";
import {filterLabels} from "./utils"
export default {
components: {LabelInput,},
props: {
@@ -71,9 +73,16 @@
},
methods: {
setLabels() {
const filtered = filterLabels(this.executionLabels)
if(filtered.error) {
this.$toast().error(this.$t("wrong labels"))
return;
}
this.isOpen = false;
this.$store.dispatch("execution/setLabels", {
labels: this.executionLabels,
labels: filtered.labels,
executionId: this.execution.id
}).then(response => {
this.$store.commit("execution/setExecution", response.data)

View File

@@ -263,6 +263,8 @@
return this.attempts(taskRun)[this.selectedAttemptNumberByTaskRunId[taskRun.id] ?? 0];
},
taskType(taskRun) {
if(!taskRun) return undefined;
const task = FlowUtils.findTaskById(this.flow, taskRun.taskId);
const parentTaskRunId = taskRun.parentTaskRunId;
if (task === undefined && parentTaskRunId) {

View File

@@ -16,6 +16,7 @@
:start-date="startDate"
:end-date="endDate"
@update:model-value="onAbsFilterChange"
class="w-auto"
/>
<relative-date-select
v-if="selectedFilterType === filterType.RELATIVE"

View File

@@ -0,0 +1,14 @@
interface Label {
key: string | null;
value: string | null;
}
interface FilterResult {
labels: Label[];
error?: boolean;
}
export const filterLabels = (labels: Label[]): FilterResult => {
const invalid = labels.some(label => label.key === null || label.value === null);
return invalid ? {labels, error: true} : {labels};
};

View File

@@ -41,7 +41,7 @@
if (this.$route.query.reset) {
localStorage.setItem("tourDoneOrSkip", undefined);
this.$store.commit("core/setGuidedProperties", {tourStarted: false});
this.$tours["guidedTour"].start();
this.$tours["guidedTour"]?.start();
}
this.setupFlow()
},

Some files were not shown because too many files have changed in this diff Show More