Compare commits

...

133 Commits

Author SHA1 Message Date
YannC.
5c8f3e88c9 chore(version): upgrade version to v0.15.38 2025-05-20 09:46:44 +02:00
Loïc Mathieu
0c4f94a3c9 fix(system)*: reset the trigger into the KafkaScheduler instead of the ExecutorMain 2025-05-19 12:07:02 +02:00
brian.mulier
a6fb5fba87 chore(version): upgrade version to v0.15.37 2025-04-09 18:00:21 +02:00
brian.mulier
ba7f44cf70 fix(core)!: prevent failing execution in case of duplicate label upon inheritance 2025-04-09 17:59:48 +02:00
YannC
b1d31e8de4 chore(version): upgrade version to v0.15.36 2025-03-06 11:38:27 +01:00
YannC
016de55a3b fix(): align to EE (#7703) 2025-03-06 10:38:45 +01:00
Loïc Mathieu
72e13f4be4 fix(CI): QUEMU 2025-02-24 16:01:13 +01:00
Loïc Mathieu
97c05543b4 chore: upgrade to 0.15.35 2025-02-24 13:00:03 +01:00
Florian Hussonnois
a2e02a84e2 chore: add optional sequential id to executor
This commit adds an optional seqId property to the
Executor class that can be used to detect concurrent/stale updates
on execution.
2025-02-24 12:49:17 +01:00
Loïc Mathieu
cdfccc9fff chore: version 0.15.34 2025-02-04 09:35:45 +01:00
Loïc Mathieu
93d70eb958 chore: ignore allure report directory 2025-02-04 09:35:10 +01:00
Loïc Mathieu
336e876d30 fix(core): subflow inherit labels 2025-01-30 17:01:34 +01:00
Florian Hussonnois
9495f7a665 chore: upgrade to version 0.15.33 2025-01-28 14:58:05 +01:00
Florian Hussonnois
2ab23d745a fix(webserver): ensure queues are not closed in nioEventLoop 2025-01-27 13:59:34 +01:00
YannC
12ae42e3c0 chore: upgrade to version 0.15.32 2025-01-22 16:02:19 +01:00
Loïc Mathieu
750e9b916f chore(deps): upgrade micrometer-core to 1.14.3 2025-01-22 15:19:36 +01:00
Loïc Mathieu
8244a1668a fix(ui,webserver): send a start execution to make sure the SSE connection is initialized 2025-01-22 14:12:48 +01:00
brian.mulier
e5ae20d0b0 chore: update to version 0.15.31 2025-01-15 11:39:22 +01:00
Loïc Mathieu
a6874c1bae fix(core, ui): send a "start" event to be sure the UI receive the SSE
The UI only store a reference to the logs SSE when receive the first event.
In case a flow didn't emit any log, or the logs tab is closed before any logs is emitted, the UI will not have any reference to the SSE so the SSE connection would stay alive forever.
Each SSE connection starts a thread via the logs queue, creating a thread leak.

Sending a first "start" event makes sure the UI has a reference to the SSE.
2025-01-15 10:56:24 +01:00
Loïc Mathieu
46fc028404 chore: update to version 0.15.30 2025-01-08 11:03:10 +01:00
Loïc Mathieu
b95e780b4b fix(webserver): discard file upload in case of errors 2025-01-08 10:57:22 +01:00
Loïc Mathieu
eae410a088 chore(webserver): ensure all input streams are closed 2025-01-08 10:53:48 +01:00
Loïc Mathieu
4daf771a04 fix(webserver): properly close the queue on Flux.onFinally
This is a re-do of https://github.com/kestra-io/kestra/pull/6292 for 0.15
2025-01-08 10:44:11 +01:00
YannC
681fe6b5cc chore: upgrade to version v0.15.29 2024-12-24 15:31:30 +01:00
brian.mulier
b56ba5eb6f chore(version): version 0.15.28 2024-12-18 13:06:38 +01:00
Loïc Mathieu
63f12a11a9 chore(version): version 0.15.27 2024-12-12 17:50:42 +01:00
Loïc Mathieu
1bfc68f8b9 feat(core,jdbc): small trigger / scheduler improvements 2024-12-11 15:41:39 +01:00
YannC
569d78a2e7 chore: upgrade to version 0.15.26 2024-12-04 15:55:18 +01:00
Loïc Mathieu
a77248c8a5 chore: version 0.15.25 2024-11-27 17:06:34 +01:00
Loïc Mathieu
65877da404 Revert "feat(core): remove the execution state from the scheduler (#1588)"
This reverts commit f7d3d0bcd4.
2024-11-27 17:06:34 +01:00
Loïc Mathieu
cb17695d94 chore: remove --break-system-packages 2024-11-08 16:53:56 +01:00
Loïc Mathieu
3d28e73415 chore: version 0.15.24 2024-11-08 11:54:58 +01:00
Loïc Mathieu
9a5a417514 fix(cli,core,webserver): better handle input files 2024-11-08 11:54:40 +01:00
YannC
1172109c75 chore: update to version v0.15.23 2024-10-01 17:46:24 +02:00
YannC
7ad3fc6fb4 fix(ui): save button not appearing 2024-10-01 17:46:24 +02:00
YannC
0199229c39 chore: update version to v0.15.22 2024-09-16 18:13:37 +02:00
YannC
5e892a215d fix(ui): template size issue
close kestra-io/kestra-ee#1388
2024-09-16 18:04:23 +02:00
Florian Hussonnois
035e87ad79 chore: update version to v0.15.21 2024-09-12 19:44:17 +02:00
brian.mulier
6f01732f0a fix(core): add flow revision to failed execution in case of trigger evaluation failure 2024-09-12 19:41:56 +02:00
YannC
5b431bf9c6 chore: update version to v0.15.20 2024-09-05 14:47:23 +02:00
YannC
20b3e942e1 fix: adapt MySql commit to Java 17 2024-09-05 14:47:23 +02:00
Florian Hussonnois
749ea3da17 fix(mysql): temproray fix to optimize fetch from queue table
This commit adds a temporary fix to replace FIND_IN_SET with
a IN clause in order to properly use the index on the consumers column
2024-09-05 14:25:43 +02:00
YannC
949208be71 chore: upgrade to version 0.15.19 2024-09-03 16:37:46 +02:00
YannC
69e3ffa152 fix: use jammy image in Dockerfile 2024-09-03 16:37:16 +02:00
YannC
159e9a2a01 fix: Dockerfile python issue 2024-09-02 15:59:01 +02:00
YannC
3724dea797 fix: change docker command in ci 2024-09-02 14:18:27 +02:00
YannC
2a0acb02c5 chore: upgrade to version 0.15.18 2024-09-02 14:09:29 +02:00
brian.mulier
4a8989b197 chore: upgrade to version 0.15.17 2024-06-17 11:37:36 +02:00
brian.mulier
4084855cb0 fix(core): nullable tenants & executions for execution skips 2024-06-17 11:37:36 +02:00
brian.mulier
7d9d5874b0 feat(*): skip executions for a tenant
part of kestra-io/kestra-ee#1247
2024-06-17 11:00:27 +02:00
brian.mulier
7b0a1348d6 fix(*): add tenant id to namespace identifier for skip execution by namespace
part of kestra-io/kestra-ee#1247
2024-06-17 11:00:27 +02:00
brian.mulier
c3a23c86ef feat(*): skip executions for a namespace
part of kestra-io/kestra-ee#1247
2024-06-17 11:00:27 +02:00
Loïc Mathieu
544fc2c0eb feat(*): skip executions based on flow identifiers
Fixes #3383
2024-06-17 11:00:27 +02:00
YannC
bd5ebf3ecb chore: upgrade to version 0.15.16 2024-06-05 21:06:01 +02:00
YannC
74784a430c fix(): handle namespace variable in eval 2024-06-05 20:49:15 +02:00
YannC
b7b43f3ec1 chore: upgrade to version 0.15.15 2024-05-20 12:41:23 +02:00
Ludovic DEHON
990332ddfe refactor(core): don't expose multiple entry on collector service 2024-05-20 12:40:23 +02:00
brian.mulier
de60d13f07 chore: upgrade to version 0.15.14 2024-04-25 18:40:21 +02:00
brian.mulier
2aea92a751 fix(webserver): prevent non-webserver from crashing due to lacking BasicAuthService 2024-04-25 18:39:53 +02:00
YannC
39e0e64c9d chore: upgrade to version 0.15.13 2024-04-23 11:17:36 +02:00
Florian Hussonnois
00172f185b fix(core): VariableRenderer should expose alternativeRender 2024-04-23 11:17:13 +02:00
YannC
2211539873 chore: upgrade to version 0.15.12 2024-04-22 17:58:53 +02:00
Loïc Mathieu
d03f95f722 ci: set plugin version via a variable to allow easily changing it 2024-04-22 17:55:26 +02:00
YannC
73a733dae3 fix(scheduler): better handling of locked triggers (#3603) 2024-04-22 17:46:03 +02:00
YannC
009b737bd4 chore: upgrade to version 0.15.11 2024-04-04 16:25:39 +02:00
YannC
2cd0862802 fix(test): test does not pass during release 2024-04-04 16:25:39 +02:00
YannC
f8a5293e77 fix(webserver): set paused to success if not subtask (#3458) 2024-04-04 15:18:05 +02:00
YannC
3ab499aa2f fix(ui): allow backfill with not required inputs 2024-04-04 15:18:05 +02:00
brian.mulier
ce48979d27 fix(ui): no more editor shrink due to localStorage value missing
closes #3451
2024-04-04 15:18:05 +02:00
brian.mulier
f7c481c3a5 fix(webserver): ability to turn off basic authentication through configuration 2024-04-04 15:18:05 +02:00
Loïc Mathieu
cdf4070993 fix(core): skip directory and un-readable file on WorkingDirectory post action 2024-04-04 15:18:05 +02:00
Loïc Mathieu
a7d793f55b chore: upgrade to version 0.15.10 2024-03-28 14:33:34 +01:00
brian.mulier
f226a98dca fix(core): properly handle deprecation for input's name property to prevent false warnings 2024-03-28 14:31:47 +01:00
Florian Hussonnois
cc1144aa25 fix(core): fix stats webserver (#3408)
Fix: #3408
2024-03-28 14:31:47 +01:00
Loïc Mathieu
693d0d5201 fix(core): improve Execution.findChilds()
When computing output, findChilds is called for each taskrun so we go throught the ist of taskruns for each taskrun which is very time consuming.
Pre-computing a map of taskrun by ID improve things a lot.
I validated it using a CPU profile, before this change the findChilds() methods took up to 18% of CPU time on a contrieved example with a big EachParallel, with this change the new method is not prominent in the profile as it's very quick.
2024-03-28 14:31:47 +01:00
Loïc Mathieu
a6ef3f3f7a fix(core): MapUtils performance 2024-03-28 14:31:47 +01:00
Loïc Mathieu
ce61b1c916 fix(webserver): Triggers page crash when a trigger is deleted
Fixes #3327
2024-03-28 14:31:47 +01:00
brian.mulier
6dafa44aa9 fix(core): add getName() to input to have soft deprecation 2024-03-28 14:31:47 +01:00
brian.mulier
c297fdc6d6 fix(core): variable renderer handle properly raw tags when recursive rendering 2024-03-28 14:31:47 +01:00
YannC
2c9b21cb0c chore(version): update to version 'v0.15.9' 2024-03-25 15:03:48 +01:00
brian.mulier
581ba70743 fix(editor): editor won't fully shrink anymore 2024-03-25 14:26:56 +01:00
YannC
1fd768e122 fix(ui): boolean input label 2024-03-25 14:26:45 +01:00
YannC
12808e7e34 fix(ui): translate enable auth message 2024-03-25 14:25:52 +01:00
YannC
ae3c5045fe chore(ui): split languages in multiples files (#3313) 2024-03-25 14:25:11 +01:00
brian.mulier
196d3078d8 fix(core): runContext decrypt method as public 2024-03-25 14:16:07 +01:00
brian.mulier
ed7b2f1b39 fix(core): prevent crashing if secret input is null
solves https://kestra-io.slack.com/archives/C03FQKXRK3K/p1710777137811119
2024-03-25 14:15:54 +01:00
brian.mulier
c84bf39365 fix(webserver): no longer decrypt secret inputs & outputs when doing an evaluate for an execution taskrun
closes kestra-io/kestra-ee#986
2024-03-25 14:15:46 +01:00
brian.mulier
152b6da8b6 fix(ui): remove Axios' default URL to prevent duplicate context path in called URL
closes #2630
2024-03-25 14:15:40 +01:00
brian.mulier
37accb5bd7 fix(core): remove ScheduleBackfill, Condition & ScheduleCondition from defs
closes kestra-io/docs#732
2024-03-25 14:15:35 +01:00
brian.mulier
a14a5de0bb fix(ui): remove additional warnings 2024-03-25 14:15:27 +01:00
yuri1969
99145933db chore(ui): fix various Vue dev warnings
Flood of Vue warnings made the console hard to work with.

* Attempted to fix various reoccuring issues.
* Migrated el-radio.
2024-03-25 14:14:57 +01:00
brian.mulier
7e785297f0 fix(topology): fixed some edge cases where graph uids were wrong 2024-03-25 14:14:50 +01:00
brian.mulier
c7b0103226 fix(topology): fixed a bug where having a subflow with a trigger with same id than current flow would lead to wrong graph 2024-03-25 14:14:44 +01:00
brian.mulier
ffc8f3cac5 chore(version): update to version 'v0.15.8' 2024-03-15 19:16:30 +01:00
brian.mulier
b73625d666 fix(webserver): override InputStream available method in NamespaceFileController.putNamespaceFile 2024-03-15 19:12:39 +01:00
Loïc Mathieu
f76e55bafb chore: upgrade to version v0.15.7 2024-03-15 12:10:16 +01:00
Florian Hussonnois
686721c7ee chore: bump version to v0.15.6 2024-03-14 14:30:49 +01:00
brian-mulier-p
9f0e3bb14a fix(core): prevent flow validation from crashing (#3278) 2024-03-14 14:27:41 +01:00
Ludovic DEHON
1dc11fb44e fix(core): missing lombok annotation on new storage tasks 2024-03-14 13:53:04 +01:00
Loïc Mathieu
b2e41223b4 fix(core): null label value can crash the executor
A label with a null value will generates an exception while computing the list of variables inside the RunContext, failing to create a RunContext which can crash the executor.
2024-03-14 13:52:49 +01:00
YannC
579d30160c fix(ui): ExecutionRoot marge issue 2024-03-14 13:52:30 +01:00
Loïc Mathieu
bfe3548bd7 feat(core): add default inputs if not already set in variables
Polling triggers will create an execution that didn't contains default inputs, with this change default inputs will always be included in variables.

Fixes https://github.com/kestra-io/plugin-mqtt/issues/37
2024-03-14 13:51:43 +01:00
brian.mulier
eebcc3a010 chore(version): update to version 'v0.15.5' 2024-03-12 16:12:04 +01:00
YannC
6d65dda19a fix(ui): add top margin in tabs components 2024-03-12 16:00:15 +01:00
brian.mulier
6768950515 chore(deps): bump ui-libs to 0.0.39 2024-03-12 15:26:34 +01:00
YannC
93bbc83b5c fix(): set timeout to sse and now display loading/error on UI (#3259) 2024-03-12 15:26:25 +01:00
brian.mulier
7fbfbe1d00 fix(core): Pause task properly handled in restart
closes #2084
2024-03-12 15:26:17 +01:00
YannC
f53f788100 fix(core): Avoid creating empty files when splitting (#3254) 2024-03-12 15:26:07 +01:00
YannC
8e3a9e4380 fix(core): create dependency between forEachItem task and subflow (#3256) 2024-03-12 15:25:58 +01:00
brian.mulier
cb82e9f08e fix(webserver): change cookie decoder to netty one 2024-03-12 15:25:38 +01:00
brian.mulier
9deb02983c fix(webserver): rollback to http 1.1 2024-03-12 15:25:28 +01:00
brian.mulier
9b1939e129 fix(ui): grayed-out triggers when disabled (in source or through API) in topology
closes #3048
2024-03-12 15:25:15 +01:00
brian.mulier
4dfcbca7de fix(ui): revision author is now fetched only once we know which revision to display to prevent inconsistencies
closes kestra-io/kestra-ee#805
2024-03-12 15:25:02 +01:00
brian.mulier
103320e348 chore(version): update to version 'v0.15.4' 2024-03-08 16:24:02 +01:00
YannC
410093cefc chore(version): update to version 'v0.15.3' 2024-03-07 16:02:22 +01:00
YannC
bd936125bd test(core): DocumentationGeneratorTest, deprecated message 2024-03-07 16:01:33 +01:00
brian.mulier
68ded87434 fix(webserver): multi-cookies in a single header decoder
part of #3228
2024-03-07 15:55:07 +01:00
Florian Hussonnois
d76807235f fix(core): pebble render function must render boolean (#3218)
Fix: #3218
2024-03-07 14:32:18 +01:00
YannC
63708a79e3 fix(core): take timezone into account for new schedule triggers (#3230)
closes #3227
2024-03-07 14:32:18 +01:00
YannC
41c0018d4b fix(ui): manage panel for SuperAdmin without tenant (#3225) 2024-03-07 14:32:18 +01:00
YannC
2cbd86c4d1 fix(): quickwins (#3215)
* fix(ui): increase flow input size in blueprints creation

closes #913

* feat(ui): keep the latest pagination size selected

closes #3030

* fix(ui): Do not display revision selector when only one revision

closes #1681

* fix(ui): can not save if flow is same as original

closes #1331

* fix(ui): now load execution before following it

closes #682

* fix(ui): display trigger.date instead of variables.date on execution page

closes #2832

* feat(ui): add new last 48 hours filter

closes #3184
2024-03-07 14:32:17 +01:00
YannC
49b64fa853 fix(ui): make dependencies expand more clear (#3222)
closes #3160
2024-03-07 14:32:17 +01:00
Ludovic DEHON
95113c5e76 fix(ui): don't save settings on page load 2024-03-07 14:32:17 +01:00
Ludovic DEHON
1e5e300974 feat(ui): change menu & icon layout
close kestra-io/kestra#2161
2024-03-07 14:32:17 +01:00
Loïc Mathieu
f69dc3c835 fix(core): DocumentationGeneratorTest.ech() test assertion 2024-03-07 14:32:17 +01:00
Ludovic DEHON
651c7bf589 feat(ui): add full-screen button on drawer
close kestra-io/kestra#2627
2024-03-07 14:32:17 +01:00
YannC
7878bcc281 fix(core): validate task default (#3224)
closes #25
2024-03-07 14:30:00 +01:00
YannC
ee059106b2 fix(controller): return 404 when flow not found in follow API (#3219)
* fix(controller): return 404 when flow not found in follow API

closes #1299

* fix(): review changes
2024-03-07 14:30:00 +01:00
YannC
4d2728a3f6 chore(version): update to version 'v0.15.2' 2024-03-04 21:52:07 +01:00
Loïc Mathieu
87f7cde742 fix(ui): missing check permission to display flow CREATE and EXECUTE button
When a user didn't have FLOW CREATE permission, the 'Create' buton will disapear from the Dashboard and the Flows pages.
When a user didn't have EXECUTION CREATE permission, the 'Execute' button will disapear from the Flow detail and Execution detail pages.
2024-03-04 18:21:03 +01:00
YannC
2e18c87907 fix(ui): change editor width storage key 2024-03-04 18:17:07 +01:00
YannC
58352411b5 fix(controller): fix 404 issue when flow of a trigger has been deleted (#3209) 2024-03-04 17:53:13 +01:00
Ludovic DEHON
438619dd8c chore(version): update to version 'v0.15.1'. 2024-03-01 23:38:36 +01:00
157 changed files with 2845 additions and 1472 deletions

View File

@@ -23,6 +23,11 @@ on:
required: false
type: string
default: "false"
plugin-version:
description: 'Plugin version'
required: false
type: string
default: "LATEST"
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
@@ -51,7 +56,7 @@ jobs:
# Services
- name: Build the docker-compose stack
run: docker-compose -f docker-compose-ci.yml up -d
run: docker compose -f docker-compose-ci.yml up -d
if: ${{ github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '' }}
# Caches
@@ -169,6 +174,8 @@ jobs:
runs-on: ubuntu-latest
needs: check
if: github.ref == 'refs/heads/master' || github.ref == 'refs/heads/develop' || github.ref == 'refs/heads/release' || startsWith(github.ref, 'refs/tags/v')
env:
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || '[0.15,0.16.0-SNAPSHOT)' }}
strategy:
matrix:
image:
@@ -178,85 +185,85 @@ jobs:
python-libs: ""
- name: "-full"
plugins: >-
io.kestra.plugin:plugin-airbyte:LATEST
io.kestra.plugin:plugin-amqp:LATEST
io.kestra.plugin:plugin-ansible:LATEST
io.kestra.plugin:plugin-aws:LATEST
io.kestra.plugin:plugin-azure:LATEST
io.kestra.plugin:plugin-cassandra:LATEST
io.kestra.plugin:plugin-cloudquery:LATEST
io.kestra.plugin:plugin-compress:LATEST
io.kestra.plugin:plugin-couchbase:LATEST
io.kestra.plugin:plugin-crypto:LATEST
io.kestra.plugin:plugin-databricks:LATEST
io.kestra.plugin:plugin-dataform:LATEST
io.kestra.plugin:plugin-dbt:LATEST
io.kestra.plugin:plugin-debezium-mysql:LATEST
io.kestra.plugin:plugin-debezium-postgres:LATEST
io.kestra.plugin:plugin-debezium-sqlserver:LATEST
io.kestra.plugin:plugin-docker:LATEST
io.kestra.plugin:plugin-elasticsearch:LATEST
io.kestra.plugin:plugin-fivetran:LATEST
io.kestra.plugin:plugin-fs:LATEST
io.kestra.plugin:plugin-gcp:LATEST
io.kestra.plugin:plugin-git:LATEST
io.kestra.plugin:plugin-googleworkspace:LATEST
io.kestra.plugin:plugin-hightouch:LATEST
io.kestra.plugin:plugin-jdbc-clickhouse:LATEST
io.kestra.plugin:plugin-jdbc-duckdb:LATEST
io.kestra.plugin:plugin-jdbc-druid:LATEST
io.kestra.plugin:plugin-jdbc-mysql:LATEST
io.kestra.plugin:plugin-jdbc-oracle:LATEST
io.kestra.plugin:plugin-jdbc-pinot:LATEST
io.kestra.plugin:plugin-jdbc-postgres:LATEST
io.kestra.plugin:plugin-jdbc-redshift:LATEST
io.kestra.plugin:plugin-jdbc-rockset:LATEST
io.kestra.plugin:plugin-jdbc-snowflake:LATEST
io.kestra.plugin:plugin-jdbc-sqlserver:LATEST
io.kestra.plugin:plugin-jdbc-trino:LATEST
io.kestra.plugin:plugin-jdbc-vectorwise:LATEST
io.kestra.plugin:plugin-jdbc-vertica:LATEST
io.kestra.plugin:plugin-jdbc-dremio:LATEST
io.kestra.plugin:plugin-jdbc-arrow-flight:LATEST
io.kestra.plugin:plugin-jdbc-sqlite:LATEST
io.kestra.plugin:plugin-kafka:LATEST
io.kestra.plugin:plugin-kubernetes:LATEST
io.kestra.plugin:plugin-malloy:LATEST
io.kestra.plugin:plugin-modal:LATEST
io.kestra.plugin:plugin-mongodb:LATEST
io.kestra.plugin:plugin-mqtt:LATEST
io.kestra.plugin:plugin-nats:LATEST
io.kestra.plugin:plugin-neo4j:LATEST
io.kestra.plugin:plugin-notifications:LATEST
io.kestra.plugin:plugin-openai:LATEST
io.kestra.plugin:plugin-powerbi:LATEST
io.kestra.plugin:plugin-pulsar:LATEST
io.kestra.plugin:plugin-redis:LATEST
io.kestra.plugin:plugin-script-groovy:LATEST
io.kestra.plugin:plugin-script-julia:LATEST
io.kestra.plugin:plugin-script-jython:LATEST
io.kestra.plugin:plugin-script-nashorn:LATEST
io.kestra.plugin:plugin-script-node:LATEST
io.kestra.plugin:plugin-script-powershell:LATEST
io.kestra.plugin:plugin-script-python:LATEST
io.kestra.plugin:plugin-script-r:LATEST
io.kestra.plugin:plugin-script-ruby:LATEST
io.kestra.plugin:plugin-script-shell:LATEST
io.kestra.plugin:plugin-serdes:LATEST
io.kestra.plugin:plugin-servicenow:LATEST
io.kestra.plugin:plugin-singer:LATEST
io.kestra.plugin:plugin-soda:LATEST
io.kestra.plugin:plugin-solace:LATEST
io.kestra.plugin:plugin-spark:LATEST
io.kestra.plugin:plugin-sqlmesh:LATEST
io.kestra.plugin:plugin-surrealdb:LATEST
io.kestra.plugin:plugin-terraform:LATEST
io.kestra.plugin:plugin-tika:LATEST
io.kestra.plugin:plugin-weaviate:LATEST
io.kestra.storage:storage-azure:LATEST
io.kestra.storage:storage-gcs:LATEST
io.kestra.storage:storage-minio:LATEST
io.kestra.storage:storage-s3:LATEST
io.kestra.plugin:plugin-airbyte:$PLUGIN_VERSION
io.kestra.plugin:plugin-amqp:$PLUGIN_VERSION
io.kestra.plugin:plugin-ansible:$PLUGIN_VERSION
io.kestra.plugin:plugin-aws:$PLUGIN_VERSION
io.kestra.plugin:plugin-azure:$PLUGIN_VERSION
io.kestra.plugin:plugin-cassandra:$PLUGIN_VERSION
io.kestra.plugin:plugin-cloudquery:$PLUGIN_VERSION
io.kestra.plugin:plugin-compress:$PLUGIN_VERSION
io.kestra.plugin:plugin-couchbase:$PLUGIN_VERSION
io.kestra.plugin:plugin-crypto:$PLUGIN_VERSION
io.kestra.plugin:plugin-databricks:$PLUGIN_VERSION
io.kestra.plugin:plugin-dataform:$PLUGIN_VERSION
io.kestra.plugin:plugin-dbt:$PLUGIN_VERSION
io.kestra.plugin:plugin-debezium-mysql:$PLUGIN_VERSION
io.kestra.plugin:plugin-debezium-postgres:$PLUGIN_VERSION
io.kestra.plugin:plugin-debezium-sqlserver:$PLUGIN_VERSION
io.kestra.plugin:plugin-docker:$PLUGIN_VERSION
io.kestra.plugin:plugin-elasticsearch:$PLUGIN_VERSION
io.kestra.plugin:plugin-fivetran:$PLUGIN_VERSION
io.kestra.plugin:plugin-fs:$PLUGIN_VERSION
io.kestra.plugin:plugin-gcp:$PLUGIN_VERSION
io.kestra.plugin:plugin-git:$PLUGIN_VERSION
io.kestra.plugin:plugin-googleworkspace:$PLUGIN_VERSION
io.kestra.plugin:plugin-hightouch:$PLUGIN_VERSION
io.kestra.plugin:plugin-jdbc-clickhouse:$PLUGIN_VERSION
io.kestra.plugin:plugin-jdbc-duckdb:$PLUGIN_VERSION
io.kestra.plugin:plugin-jdbc-druid:$PLUGIN_VERSION
io.kestra.plugin:plugin-jdbc-mysql:$PLUGIN_VERSION
io.kestra.plugin:plugin-jdbc-oracle:$PLUGIN_VERSION
io.kestra.plugin:plugin-jdbc-pinot:$PLUGIN_VERSION
io.kestra.plugin:plugin-jdbc-postgres:$PLUGIN_VERSION
io.kestra.plugin:plugin-jdbc-redshift:$PLUGIN_VERSION
io.kestra.plugin:plugin-jdbc-rockset:$PLUGIN_VERSION
io.kestra.plugin:plugin-jdbc-snowflake:$PLUGIN_VERSION
io.kestra.plugin:plugin-jdbc-sqlserver:$PLUGIN_VERSION
io.kestra.plugin:plugin-jdbc-trino:$PLUGIN_VERSION
io.kestra.plugin:plugin-jdbc-vectorwise:$PLUGIN_VERSION
io.kestra.plugin:plugin-jdbc-vertica:$PLUGIN_VERSION
io.kestra.plugin:plugin-jdbc-dremio:$PLUGIN_VERSION
io.kestra.plugin:plugin-jdbc-arrow-flight:$PLUGIN_VERSION
io.kestra.plugin:plugin-jdbc-sqlite:$PLUGIN_VERSION
io.kestra.plugin:plugin-kafka:$PLUGIN_VERSION
io.kestra.plugin:plugin-kubernetes:$PLUGIN_VERSION
io.kestra.plugin:plugin-malloy:$PLUGIN_VERSION
io.kestra.plugin:plugin-modal:$PLUGIN_VERSION
io.kestra.plugin:plugin-mongodb:$PLUGIN_VERSION
io.kestra.plugin:plugin-mqtt:$PLUGIN_VERSION
io.kestra.plugin:plugin-nats:$PLUGIN_VERSION
io.kestra.plugin:plugin-neo4j:$PLUGIN_VERSION
io.kestra.plugin:plugin-notifications:$PLUGIN_VERSION
io.kestra.plugin:plugin-openai:$PLUGIN_VERSION
io.kestra.plugin:plugin-powerbi:$PLUGIN_VERSION
io.kestra.plugin:plugin-pulsar:$PLUGIN_VERSION
io.kestra.plugin:plugin-redis:$PLUGIN_VERSION
io.kestra.plugin:plugin-script-groovy:$PLUGIN_VERSION
io.kestra.plugin:plugin-script-julia:$PLUGIN_VERSION
io.kestra.plugin:plugin-script-jython:$PLUGIN_VERSION
io.kestra.plugin:plugin-script-nashorn:$PLUGIN_VERSION
io.kestra.plugin:plugin-script-node:$PLUGIN_VERSION
io.kestra.plugin:plugin-script-powershell:$PLUGIN_VERSION
io.kestra.plugin:plugin-script-python:$PLUGIN_VERSION
io.kestra.plugin:plugin-script-r:$PLUGIN_VERSION
io.kestra.plugin:plugin-script-ruby:$PLUGIN_VERSION
io.kestra.plugin:plugin-script-shell:$PLUGIN_VERSION
io.kestra.plugin:plugin-serdes:$PLUGIN_VERSION
io.kestra.plugin:plugin-servicenow:$PLUGIN_VERSION
io.kestra.plugin:plugin-singer:$PLUGIN_VERSION
io.kestra.plugin:plugin-soda:$PLUGIN_VERSION
io.kestra.plugin:plugin-solace:$PLUGIN_VERSION
io.kestra.plugin:plugin-spark:$PLUGIN_VERSION
io.kestra.plugin:plugin-sqlmesh:$PLUGIN_VERSION
io.kestra.plugin:plugin-surrealdb:$PLUGIN_VERSION
io.kestra.plugin:plugin-terraform:$PLUGIN_VERSION
io.kestra.plugin:plugin-tika:$PLUGIN_VERSION
io.kestra.plugin:plugin-weaviate:$PLUGIN_VERSION
io.kestra.storage:storage-azure:$PLUGIN_VERSION
io.kestra.storage:storage-gcs:$PLUGIN_VERSION
io.kestra.storage:storage-minio:$PLUGIN_VERSION
io.kestra.storage:storage-s3:$PLUGIN_VERSION
packages: python3 python3-venv python-is-python3 python3-pip nodejs npm curl zip unzip
python-libs: kestra
steps:
@@ -296,6 +303,11 @@ jobs:
if: github.ref == 'refs/heads/master' || github.ref == 'refs/heads/develop' || github.ref == 'refs/heads/release' || startsWith(github.ref, 'refs/tags/v')
uses: docker/setup-qemu-action@v3
- name: Docker - Fix Qemu
shell: bash
run: |
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
- name: Set up Docker Buildx
if: github.ref == 'refs/heads/master' || github.ref == 'refs/heads/develop' || github.ref == 'refs/heads/release' || startsWith(github.ref, 'refs/tags/v')
uses: docker/setup-buildx-action@v3

3
.gitignore vendored
View File

@@ -51,4 +51,5 @@ core/src/main/resources/gradle.properties
.plugins.override
# H2 Database
data
data
core/allure-results/

View File

@@ -1,4 +1,4 @@
FROM eclipse-temurin:17-jre
FROM eclipse-temurin:17-jre-jammy
ARG KESTRA_PLUGINS=""
ARG APT_PACKAGES=""

View File

@@ -118,7 +118,7 @@ allprojects {
implementation "io.micronaut:micronaut-jackson-databind"
implementation "io.micronaut.data:micronaut-data-model"
implementation "io.micronaut:micronaut-management"
implementation "io.micrometer:micrometer-core"
implementation "io.micrometer:micrometer-core:1.14.3"
implementation "io.micronaut.micrometer:micronaut-micrometer-registry-prometheus"
implementation "io.micronaut:micronaut-http-client"
implementation "io.micronaut.reactor:micronaut-reactor-http-client"

View File

@@ -29,6 +29,15 @@ public class ExecutorCommand extends AbstractServerCommand {
@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "a list of execution identifiers to skip, separated by a coma; for troubleshooting purpose only")
private List<String> skipExecutions = Collections.emptyList();
@CommandLine.Option(names = {"--skip-flows"}, split=",", description = "a list of flow identifiers (tenant|namespace|flowId) to skip, separated by a coma; for troubleshooting purpose only")
private List<String> skipFlows = Collections.emptyList();
@CommandLine.Option(names = {"--skip-namespaces"}, split=",", description = "a list of namespace identifiers (tenant|namespace) to skip, separated by a coma; for troubleshooting purpose only")
private List<String> skipNamespaces = Collections.emptyList();
@CommandLine.Option(names = {"--skip-tenants"}, split=",", description = "a list of tenants to skip, separated by a coma; for troubleshooting purpose only")
private List<String> skipTenants = Collections.emptyList();
@SuppressWarnings("unused")
public static Map<String, Object> propertiesOverrides() {
return ImmutableMap.of(
@@ -39,6 +48,9 @@ public class ExecutorCommand extends AbstractServerCommand {
@Override
public Integer call() throws Exception {
this.skipExecutionService.setSkipExecutions(skipExecutions);
this.skipExecutionService.setSkipFlows(skipFlows);
this.skipExecutionService.setSkipNamespaces(skipNamespaces);
this.skipExecutionService.setSkipTenants(skipTenants);
super.call();

View File

@@ -41,6 +41,15 @@ public class StandAloneCommand extends AbstractServerCommand {
@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "a list of execution identifiers to skip, separated by a coma; for troubleshooting purpose only")
private List<String> skipExecutions = Collections.emptyList();
@CommandLine.Option(names = {"--skip-flows"}, split=",", description = "a list of flow identifiers (namespace.flowId) to skip, separated by a coma; for troubleshooting purpose only")
private List<String> skipFlows = Collections.emptyList();
@CommandLine.Option(names = {"--skip-namespaces"}, split=",", description = "a list of namespace identifiers (tenant|namespace) to skip, separated by a coma; for troubleshooting purpose only")
private List<String> skipNamespaces = Collections.emptyList();
@CommandLine.Option(names = {"--skip-tenants"}, split=",", description = "a list of tenants to skip, separated by a coma; for troubleshooting purpose only")
private List<String> skipTenants = Collections.emptyList();
@SuppressWarnings("unused")
public static Map<String, Object> propertiesOverrides() {
return ImmutableMap.of(
@@ -51,6 +60,9 @@ public class StandAloneCommand extends AbstractServerCommand {
@Override
public Integer call() throws Exception {
this.skipExecutionService.setSkipExecutions(skipExecutions);
this.skipExecutionService.setSkipFlows(skipFlows);
this.skipExecutionService.setSkipNamespaces(skipNamespaces);
this.skipExecutionService.setSkipTenants(skipTenants);
super.call();

View File

@@ -16,7 +16,7 @@ micronaut:
max-request-size: 10GB
multipart:
max-file-size: 10GB
disk: true
mixed: true
read-idle-timeout: 60m
write-idle-timeout: 60m
idle-timeout: 60m
@@ -37,11 +37,17 @@ micronaut:
- /ui/.+
- /health
- /prometheus
http-version: HTTP_1_1
caches:
default:
maximum-weight: 10485760
http:
client:
read-idle-timeout: 60s
connect-timeout: 30s
read-timeout: 60s
http-version: HTTP_1_1
services:
api:
url: https://api.kestra.io

View File

@@ -26,6 +26,10 @@ public abstract class AbstractClassDocumentation<T> {
protected Map<String, Object> defs = new TreeMap<>();
protected Map<String, Object> inputs = new TreeMap<>();
protected Map<String, Object> propertiesSchema;
private final List<String> defsExclusions = List.of(
"io.kestra.core.models.conditions.Condition",
"io.kestra.core.models.conditions.ScheduleCondition"
);
@SuppressWarnings("unchecked")
protected AbstractClassDocumentation(JsonSchemaGenerator jsonSchemaGenerator, Class<? extends T> cls, Class<T> baseCls) {
@@ -36,6 +40,7 @@ public abstract class AbstractClassDocumentation<T> {
if (this.propertiesSchema.containsKey("$defs")) {
this.defs.putAll((Map<String, Object>) this.propertiesSchema.get("$defs"));
defsExclusions.forEach(this.defs::remove);
this.propertiesSchema.remove("$defs");
}

View File

@@ -26,6 +26,10 @@ public class EncryptionService {
* The IV is concatenated at the beginning of the string.
*/
public static String encrypt(String key, String plainText) throws GeneralSecurityException {
if (plainText == null || plainText.isEmpty()) {
return plainText;
}
byte[] keyBytes = Base64.getDecoder().decode(key);
SecretKey secretKey = new SecretKeySpec(keyBytes, KEY_ALGORITHM);
Cipher cipher = Cipher.getInstance(CIPHER_ALGORITHM);
@@ -43,6 +47,10 @@ public class EncryptionService {
* The IV is recovered from the beginning of the string.
*/
public static String decrypt(String key, String cipherText) throws GeneralSecurityException {
if (cipherText == null || cipherText.isEmpty()) {
return cipherText;
}
byte[] keyBytes = Base64.getDecoder().decode(key);
SecretKey secretKey = new SecretKeySpec(keyBytes, KEY_ALGORITHM);
byte[] input = Base64.getDecoder().decode(cipherText);

View File

@@ -189,7 +189,7 @@ public class Execution implements DeletedInterface, TenantInterface {
public List<TaskRun> findTaskRunsByTaskId(String id) {
if (this.taskRunList == null) {
return new ArrayList<>();
return Collections.emptyList();
}
return this.taskRunList
@@ -199,7 +199,7 @@ public class Execution implements DeletedInterface, TenantInterface {
}
public TaskRun findTaskRunByTaskRunId(String id) throws InternalException {
Optional<TaskRun> find = (this.taskRunList == null ? new ArrayList<TaskRun>() : this.taskRunList)
Optional<TaskRun> find = (this.taskRunList == null ? Collections.<TaskRun>emptyList() : this.taskRunList)
.stream()
.filter(taskRun -> taskRun.getId().equals(id))
.findFirst();
@@ -212,9 +212,9 @@ public class Execution implements DeletedInterface, TenantInterface {
}
public TaskRun findTaskRunByTaskIdAndValue(String id, List<String> values) throws InternalException {
Optional<TaskRun> find = (this.taskRunList == null ? new ArrayList<TaskRun>() : this.taskRunList)
Optional<TaskRun> find = (this.taskRunList == null ? Collections.<TaskRun>emptyList() : this.taskRunList)
.stream()
.filter(taskRun -> taskRun.getTaskId().equals(id) && findChildsValues(taskRun, true).equals(values))
.filter(taskRun -> taskRun.getTaskId().equals(id) && findParentsValues(taskRun, true).equals(values))
.findFirst();
if (find.isEmpty()) {
@@ -279,7 +279,7 @@ public class Execution implements DeletedInterface, TenantInterface {
public List<TaskRun> findTaskRunByTasks(List<ResolvedTask> resolvedTasks, TaskRun parentTaskRun) {
if (resolvedTasks == null || this.taskRunList == null) {
return new ArrayList<>();
return Collections.emptyList();
}
return this
@@ -543,7 +543,6 @@ public class Execution implements DeletedInterface, TenantInterface {
return null;
}
})
.filter(Objects::nonNull)
.orElseGet(() -> new FailedExecutionWithLog(
this.state.getCurrent() != State.Type.FAILED ? this.withState(State.Type.FAILED) : this,
RunContextLogger.logEntries(loggingEventFromException(e), LogEntry.of(this))
@@ -633,24 +632,29 @@ public class Execution implements DeletedInterface, TenantInterface {
return ImmutableMap.of();
}
Map<String, Object> result = new HashMap<>();
// we pre-compute the map of taskrun by id to avoid traversing the list of all taskrun for each taskrun
Map<String, TaskRun> byIds = this.taskRunList.stream().collect(Collectors.toMap(
taskRun -> taskRun.getId(),
taskRun -> taskRun
));
Map<String, Object> result = new HashMap<>();
for (TaskRun current : this.taskRunList) {
if (current.getOutputs() != null) {
result = MapUtils.merge(result, outputs(current));
result = MapUtils.merge(result, outputs(current, byIds));
}
}
return result;
}
private Map<String, Object> outputs(TaskRun taskRun) {
List<TaskRun> childs = findChilds(taskRun)
private Map<String, Object> outputs(TaskRun taskRun, Map<String, TaskRun> byIds) {
List<TaskRun> parents = findParents(taskRun, byIds)
.stream()
.filter(r -> r.getValue() != null)
.collect(Collectors.toList());
.toList();
if (childs.size() == 0) {
if (parents.isEmpty()) {
if (taskRun.getValue() == null) {
return Map.of(taskRun.getTaskId(), taskRun.getOutputs());
} else {
@@ -658,15 +662,13 @@ public class Execution implements DeletedInterface, TenantInterface {
}
}
Map<String, Object> result = new HashMap<>();
Map<String, Object> result = MapUtils.newHashMap(1);
Map<String, Object> current = result;
for (TaskRun t : childs) {
if (t.getValue() != null) {
HashMap<String, Object> item = new HashMap<>();
current.put(t.getValue(), item);
current = item;
}
for (TaskRun t : parents) {
HashMap<String, Object> item = MapUtils.newHashMap(1);
current.put(t.getValue(), item);
current = item;
}
if (taskRun.getOutputs() != null) {
@@ -684,10 +686,10 @@ public class Execution implements DeletedInterface, TenantInterface {
public List<Map<String, Object>> parents(TaskRun taskRun) {
List<Map<String, Object>> result = new ArrayList<>();
List<TaskRun> childs = findChilds(taskRun);
Collections.reverse(childs);
List<TaskRun> parents = findParents(taskRun);
Collections.reverse(parents);
for (TaskRun childTaskRun : childs) {
for (TaskRun childTaskRun : parents) {
HashMap<String, Object> current = new HashMap<>();
if (childTaskRun.getValue() != null) {
@@ -698,7 +700,7 @@ public class Execution implements DeletedInterface, TenantInterface {
current.put("outputs", childTaskRun.getOutputs());
}
if (current.size() > 0) {
if (!current.isEmpty()) {
result.add(current);
}
}
@@ -707,22 +709,21 @@ public class Execution implements DeletedInterface, TenantInterface {
}
/**
* Find all children from this {@link TaskRun}. The list is starting from deeper child and end on closest child, so
* first element is the task that start first.
* This method don't return the current tasks
* Find all parents from this {@link TaskRun}.
* The list is starting from deeper parent and end on the closest parent,
* so the first element is the task that starts first.
* This method doesn't return the current tasks.
*
* @param taskRun current child
* @return List of parent {@link TaskRun}
*/
public List<TaskRun> findChilds(TaskRun taskRun) {
public List<TaskRun> findParents(TaskRun taskRun) {
if (taskRun.getParentTaskRunId() == null || this.taskRunList == null) {
return new ArrayList<>();
return Collections.emptyList();
}
ArrayList<TaskRun> result = new ArrayList<>();
boolean ended = false;
while (!ended) {
final TaskRun finalTaskRun = taskRun;
Optional<TaskRun> find = this.taskRunList
@@ -743,10 +744,39 @@ public class Execution implements DeletedInterface, TenantInterface {
return result;
}
public List<String> findChildsValues(TaskRun taskRun, boolean withCurrent) {
/**
* Find all parents from this {@link TaskRun}.
* This method does the same as #findParents(TaskRun taskRun) but for performance reason, as it's called a lot,
* we pre-compute the map of taskrun by ID and use it here.
*/
private List<TaskRun> findParents(TaskRun taskRun, Map<String, TaskRun> taskRunById) {
if (taskRun.getParentTaskRunId() == null || taskRunById.isEmpty()) {
return Collections.emptyList();
}
ArrayList<TaskRun> result = new ArrayList<>();
boolean ended = false;
while (!ended) {
final TaskRun finalTaskRun = taskRun;
TaskRun find = taskRunById.get(finalTaskRun.getParentTaskRunId());
if (find != null) {
result.add(find);
taskRun = find;
} else {
ended = true;
}
}
Collections.reverse(result);
return result;
}
public List<String> findParentsValues(TaskRun taskRun, boolean withCurrent) {
return (withCurrent ?
Stream.concat(findChilds(taskRun).stream(), Stream.of(taskRun)) :
findChilds(taskRun).stream()
Stream.concat(findParents(taskRun).stream(), Stream.of(taskRun)) :
findParents(taskRun).stream()
)
.filter(t -> t.getValue() != null)
.map(TaskRun::getValue)

View File

@@ -44,6 +44,9 @@ public abstract class Input<T> implements Data {
@Pattern(regexp="^[a-zA-Z0-9][.a-zA-Z0-9_-]*")
String id;
@Deprecated
String name;
@NotNull
@Valid
Type type;
@@ -58,11 +61,11 @@ public abstract class Input<T> implements Data {
public abstract void validate(T input) throws ConstraintViolationException;
@JsonSetter
@Deprecated
public void setName(String name) {
if (this.id == null) {
this.id = name;
}
}
this.name = name;
}
}

View File

@@ -1,5 +1,6 @@
package io.kestra.core.models.flows;
import io.kestra.core.validations.TaskDefaultValidation;
import io.micronaut.core.annotation.Introspected;
import io.micronaut.core.convert.format.MapFormat;
import io.micronaut.core.naming.conventions.StringConvention;
@@ -13,6 +14,7 @@ import java.util.Map;
@Builder(toBuilder = true)
@AllArgsConstructor
@Introspected
@TaskDefaultValidation
public class TaskDefault {
private final String type;

View File

@@ -1,6 +1,7 @@
package io.kestra.core.models.hierarchies;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.Trigger;
import io.micronaut.core.annotation.Introspected;
import lombok.Getter;
import lombok.ToString;
@@ -9,18 +10,20 @@ import lombok.ToString;
@Getter
@Introspected
public abstract class AbstractGraphTrigger extends AbstractGraph {
private final AbstractTrigger trigger;
private final AbstractTrigger triggerDeclaration;
private final Trigger trigger;
public AbstractGraphTrigger(AbstractTrigger trigger) {
public AbstractGraphTrigger(AbstractTrigger triggerDeclaration, Trigger trigger) {
super();
this.triggerDeclaration = triggerDeclaration;
this.trigger = trigger;
}
@Override
public String getUid() {
if (this.trigger != null) {
return this.trigger.getId();
if (this.uid == null && this.triggerDeclaration != null) {
return this.triggerDeclaration.getId();
}
return this.uid;

View File

@@ -69,7 +69,7 @@ public class GraphCluster extends AbstractGraph {
public void addNode(AbstractGraph node, boolean withClusterUidPrefix) {
if (withClusterUidPrefix) {
node.updateUidWithChildren(prefixedUid(node.uid));
node.updateUidWithChildren(prefixedUid(Optional.ofNullable(node.uid).orElse(node.getUid())));
}
this.getGraph().addNode(node);
}
@@ -110,7 +110,9 @@ public class GraphCluster extends AbstractGraph {
// this is because we need other clusters' root & end to have edges over them, but they are already managed by their own cluster
(!(node instanceof GraphClusterRoot) && !(node instanceof GraphClusterEnd))
|| node.equals(this.root) || node.equals(this.end))
.forEach(node -> node.updateUidWithChildren(uid + node.uid.substring(this.uid.length())));
.forEach(node -> node.updateUidWithChildren(uid +
Optional.ofNullable(node.uid).orElse(node.getUid()).substring(this.uid.length())
));
super.updateUidWithChildren(uid);
}

View File

@@ -1,10 +1,11 @@
package io.kestra.core.models.hierarchies;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.Trigger;
public class GraphTrigger extends AbstractGraphTrigger {
public GraphTrigger(AbstractTrigger trigger) {
super(trigger);
public GraphTrigger(AbstractTrigger triggerDeclaration, Trigger trigger) {
super(triggerDeclaration, trigger);
}
}

View File

@@ -26,9 +26,6 @@ public class Trigger extends TriggerContext {
@Nullable
private String executionId;
@Nullable
private State.Type executionCurrentState;
@Nullable
private Instant updatedDate;
@@ -38,7 +35,6 @@ public class Trigger extends TriggerContext {
protected Trigger(TriggerBuilder<?, ?> b) {
super(b);
this.executionId = b.executionId;
this.executionCurrentState = b.executionCurrentState;
this.updatedDate = b.updatedDate;
this.evaluateRunningDate = b.evaluateRunningDate;
}
@@ -138,7 +134,6 @@ public class Trigger extends TriggerContext {
.date(trigger.getDate())
.nextExecutionDate(trigger.getNextExecutionDate())
.executionId(execution.getId())
.executionCurrentState(execution.getState().getCurrent())
.updatedDate(Instant.now())
.backfill(trigger.getBackfill())
.stopAfter(trigger.getStopAfter())

View File

@@ -198,7 +198,7 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
)
@PluginProperty
@Deprecated
private ScheduleBackfill backfill;
private Map<String, Object> backfill;
@Schema(
title = "What to do in case of missed schedules",
@@ -254,12 +254,12 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
// is after the end, then we calculate again the nextDate
// based on now()
if (backfill != null && nextDate != null && nextDate.isAfter(backfill.getEnd())) {
nextDate = computeNextEvaluationDate(executionTime, ZonedDateTime.now()).orElse(null);
nextDate = computeNextEvaluationDate(executionTime, convertDateTime(ZonedDateTime.now())).orElse(null);
}
}
// no previous present & no backfill or recover missed schedules, just provide now
else {
nextDate = computeNextEvaluationDate(executionTime, ZonedDateTime.now()).orElse(null);
nextDate = computeNextEvaluationDate(executionTime, convertDateTime(ZonedDateTime.now())).orElse(null);
}
// if max delay reached, we calculate a new date except if we are doing a backfill
@@ -280,7 +280,7 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
public ZonedDateTime nextEvaluationDate() {
// it didn't take into account the schedule condition, but as they are taken into account inside eval() it's OK.
ExecutionTime executionTime = this.executionTime();
return computeNextEvaluationDate(executionTime, ZonedDateTime.now()).orElse(ZonedDateTime.now());
return computeNextEvaluationDate(executionTime, convertDateTime(ZonedDateTime.now())).orElse(convertDateTime(ZonedDateTime.now()));
}
public ZonedDateTime previousEvaluationDate(ConditionContext conditionContext) {
@@ -301,7 +301,7 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
conditionContext.getRunContext().logger().warn("Unable to evaluate the conditions for the next evaluation date for trigger '{}', conditions will not be evaluated", this.getId());
}
}
return computePreviousEvaluationDate(executionTime, ZonedDateTime.now()).orElse(ZonedDateTime.now());
return computePreviousEvaluationDate(executionTime, convertDateTime(ZonedDateTime.now())).orElse(convertDateTime(ZonedDateTime.now()));
}
@Override
@@ -591,14 +591,6 @@ public class Schedule extends AbstractTrigger implements PollingTriggerInterface
private ZonedDateTime previous;
}
@Deprecated
public static class ScheduleBackfill {
@Schema(
title = "The first start date."
)
ZonedDateTime start;
}
public enum RecoverMissedSchedules {
LAST,
NONE,

View File

@@ -14,6 +14,7 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.storages.Storage;
import io.kestra.core.utils.ListUtils;
import lombok.extern.slf4j.Slf4j;
import java.io.InputStream;
@@ -99,12 +100,18 @@ public final class ExecutableUtils {
"flowRevision", currentFlow.getRevision()
);
// remove labels that are also defined in the flow to avoid duplicate
List<Label> flowLabels = ListUtils.emptyOnNull(flow.getLabels());
List<Label> filteredLabel = ListUtils.emptyOnNull(labels).stream()
.filter(label -> flowLabels.stream().noneMatch(flowLabel -> flowLabel.key().equals(label.key())))
.toList();
RunnerUtils runnerUtils = runContext.getApplicationContext().getBean(RunnerUtils.class);
Execution execution = runnerUtils
.newExecution(
flow,
(f, e) -> runnerUtils.typedInputs(f, e, inputs),
labels)
filteredLabel)
.withTrigger(ExecutionTrigger.builder()
.id(currentTask.getId())
.type(currentTask.getType())

View File

@@ -34,6 +34,11 @@ public class Executor {
private ExecutionResumed executionResumed;
private ExecutionResumed joinedExecutionResumed;
/**
* The sequence id should be incremented each time the execution is persisted after mutation.
*/
private long seqId = 0L;
/**
* List of {@link ExecutionKilled} to be propagated part of the execution.
*/
@@ -44,6 +49,12 @@ public class Executor {
this.offset = offset;
}
public Executor(Execution execution, Long offset, long seqId) {
this.execution = execution;
this.offset = offset;
this.seqId = seqId;
}
public Executor(WorkerTaskResult workerTaskResult) {
this.joinedWorkerTaskResult = workerTaskResult;
}
@@ -147,7 +158,18 @@ public class Executor {
public Executor serialize() {
return new Executor(
this.execution,
this.offset
this.offset,
this.seqId
);
}
/**
* Increments and returns the execution sequence id.
*
* @return the sequence id.
*/
public long incrementAndGetSeqId() {
this.seqId++;
return seqId;
}
}

View File

@@ -23,6 +23,10 @@ public interface FlowExecutorInterface {
}
default Optional<Flow> findByExecution(Execution execution) {
if (execution.getFlowRevision() == null) {
return Optional.empty();
}
return this.findById(
execution.getTenantId(),
execution.getNamespace(),

View File

@@ -82,6 +82,13 @@ public class RunContext {
this.initContext(flow, null, execution, null);
}
/**
* Equivalent to {@link #RunContext(ApplicationContext, Flow, Task, Execution, TaskRun, boolean)} with decryptVariables set to true
*/
public RunContext(ApplicationContext applicationContext, Flow flow, Task task, Execution execution, TaskRun taskRun) {
this(applicationContext, flow, task, execution, taskRun, true);
}
/**
* Normal usage
*
@@ -90,11 +97,12 @@ public class RunContext {
* @param task the current {@link io.kestra.core.models.tasks.Task}
* @param execution the current {@link Execution}
* @param taskRun the current {@link TaskRun}
* @param decryptVariables whether or not to decrypt secret variables
*/
public RunContext(ApplicationContext applicationContext, Flow flow, Task task, Execution execution, TaskRun taskRun) {
public RunContext(ApplicationContext applicationContext, Flow flow, Task task, Execution execution, TaskRun taskRun, boolean decryptVariables) {
this.initBean(applicationContext);
this.initLogger(taskRun, task);
this.initContext(flow, task, execution, taskRun);
this.initContext(flow, task, execution, taskRun, decryptVariables);
this.initPluginConfiguration(applicationContext, task.getType());
}
@@ -156,7 +164,11 @@ public class RunContext {
}
private void initContext(Flow flow, Task task, Execution execution, TaskRun taskRun) {
this.variables = this.variables(flow, task, execution, taskRun, null);
this.initContext(flow, task, execution, taskRun, true);
}
private void initContext(Flow flow, Task task, Execution execution, TaskRun taskRun, boolean decryptVariables) {
this.variables = this.variables(flow, task, execution, taskRun, null, decryptVariables);
if (taskRun != null && this.storageInterface != null) {
this.storage = new InternalStorage(
@@ -234,6 +246,10 @@ public class RunContext {
}
protected Map<String, Object> variables(Flow flow, Task task, Execution execution, TaskRun taskRun, AbstractTrigger trigger) {
return this.variables(flow, task, execution, taskRun, trigger, true);
}
protected Map<String, Object> variables(Flow flow, Task task, Execution execution, TaskRun taskRun, AbstractTrigger trigger, boolean decryptVariables) {
ImmutableMap.Builder<String, Object> builder = ImmutableMap.<String, Object>builder()
.put("envs", runContextCache.getEnvVars())
.put("globals", runContextCache.getGlobalVars());
@@ -295,13 +311,16 @@ public class RunContext {
if (execution.getTaskRunList() != null) {
Map<String, Object> outputs = new HashMap<>(execution.outputs());
decryptOutputs(outputs);
if (decryptVariables) {
decryptOutputs(outputs);
}
builder.put("outputs", outputs);
}
Map<String, Object> inputs = new HashMap<>();
if (execution.getInputs() != null) {
Map<String, Object> inputs = new HashMap<>(execution.getInputs());
if (flow != null && flow.getInputs() != null) {
inputs.putAll(execution.getInputs());
if (decryptVariables && flow != null && flow.getInputs() != null) {
// if some inputs are of type secret, we decode them
for (Input<?> input : flow.getInputs()) {
if (input instanceof SecretInput && inputs.containsKey(input.getId())) {
@@ -314,6 +333,14 @@ public class RunContext {
}
}
}
}
if (flow != null && flow.getInputs() != null) {
// we add default inputs value from the flow if not already set, this will be useful for triggers
flow.getInputs().stream()
.filter(input -> input.getDefaults() != null && !inputs.containsKey(input.getId()))
.forEach(input -> inputs.put(input.getId(), input.getDefaults()));
}
if (!inputs.isEmpty()) {
builder.put("inputs", inputs);
}
@@ -324,6 +351,7 @@ public class RunContext {
if (execution.getLabels() != null) {
builder.put("labels", execution.getLabels()
.stream()
.filter(label -> label.value() != null)
.map(label -> new AbstractMap.SimpleEntry<>(
label.key(),
label.value()
@@ -565,7 +593,7 @@ public class RunContext {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
private String decrypt(String encrypted) throws GeneralSecurityException {
public String decrypt(String encrypted) throws GeneralSecurityException {
if (secretKey.isPresent()) {
return EncryptionService.decrypt(secretKey.get(), encrypted);
} else {

View File

@@ -23,7 +23,11 @@ public class RunContextFactory {
}
public RunContext of(Flow flow, Task task, Execution execution, TaskRun taskRun) {
return new RunContext(applicationContext, flow, task, execution, taskRun);
return this.of(flow, task, execution, taskRun, true);
}
public RunContext of(Flow flow, Task task, Execution execution, TaskRun taskRun, boolean decryptVariables) {
return new RunContext(applicationContext, flow, task, execution, taskRun, decryptVariables);
}
public RunContext of(Flow flow, AbstractTrigger trigger) {

View File

@@ -20,17 +20,19 @@ import io.kestra.core.services.ConditionService;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.micronaut.context.annotation.Value;
import io.micronaut.http.multipart.StreamingFileUpload;
import io.micronaut.http.multipart.CompletedFileUpload;
import io.micronaut.http.multipart.CompletedPart;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.security.GeneralSecurityException;
@@ -76,51 +78,95 @@ public class RunnerUtils {
@Value("${kestra.encryption.secret-key}")
private Optional<String> secretKey;
public Map<String, Object> typedInputs(Flow flow, Execution execution, Map<String, Object> in, Publisher<StreamingFileUpload> files) throws IOException {
if (files == null) {
return this.typedInputs(flow, execution, in);
}
/**
* Utility method for retrieving types inputs for a flow.
*
* @param flow The Flow.
* @param execution The Execution.
* @param inputs The Flow's inputs.
* @return The Map of typed inputs.
*/
public Map<String, Object> typedInputs(final Flow flow,
final Execution execution,
final Publisher<CompletedPart> inputs) throws IOException {
return this.typedInputs(flow.getInputs(), execution, inputs);
}
Map<String, String> uploads = Flux.from(files)
/**
* Utility method for retrieving types inputs for a flow.
*
* @param inputs The inputs
* @param execution The Execution.
* @param in The Execution's inputs.
* @return The Map of typed inputs.
*/
public Map<String, Object> typedInputs(final List<Input<?>> inputs,
final Execution execution,
final Publisher<CompletedPart> in) throws IOException {
Map<String, Object> uploads = Flux.from(in)
.subscribeOn(Schedulers.boundedElastic())
.map(throwFunction(file -> {
File tempFile = File.createTempFile(file.getFilename() + "_", ".upl");
Publisher<Boolean> uploadPublisher = file.transferTo(tempFile);
Boolean bool = Mono.from(uploadPublisher).block();
.map(throwFunction(input -> {
if (input instanceof CompletedFileUpload fileUpload) {
try {
File tempFile = File.createTempFile(fileUpload.getFilename() + "_", ".upl");
try (var inputStream = fileUpload.getInputStream();
var outputStream = new FileOutputStream(tempFile)) {
long transferredBytes = inputStream.transferTo(outputStream);
if (transferredBytes == 0) {
fileUpload.discard();
throw new RuntimeException("Can't upload file: " + fileUpload.getFilename());
}
}
URI from = storageInterface.from(execution, fileUpload.getFilename(), tempFile);
if (!tempFile.delete()) {
tempFile.deleteOnExit();
}
if (!bool) {
throw new RuntimeException("Can't upload");
return new AbstractMap.SimpleEntry<>(
fileUpload.getFilename(),
(Object) from.toString()
);
} catch (IOException e) {
fileUpload.discard();
throw e;
}
} else {
return new AbstractMap.SimpleEntry<>(
input.getName(),
(Object) new String(input.getBytes())
);
}
URI from = storageInterface.from(execution, file.getFilename(), tempFile);
//noinspection ResultOfMethodCallIgnored
tempFile.delete();
return new AbstractMap.SimpleEntry<>(
file.getFilename(),
from.toString()
);
}))
.collectMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue)
.block();
Map<String, Object> merged = new HashMap<>();
if (in != null) {
merged.putAll(in);
}
merged.putAll(uploads);
return this.typedInputs(inputs, execution, uploads);
}
return this.typedInputs(flow, execution, merged);
/**
* Utility method for retrieving types inputs for a flow.
*
* @param flow The Flow.
* @param execution The Execution.
* @param in The Execution's inputs.
* @return The Map of typed inputs.
*/
public Map<String, Object> typedInputs(
final Flow flow,
final Execution execution,
final Map<String, Object> in
) {
return this.typedInputs(
flow.getInputs(),
execution,
in
);
}
@SuppressWarnings({"unchecked", "rawtypes"})
public Map<String, Object> typedInputs(Flow flow, Execution execution, Map<String, Object> in) {
if (flow.getInputs() == null) {
return ImmutableMap.of();
}
Map<String, Object> results = flow
.getInputs()
public Map<String, Object> typedInputs(List<Input<?>> inputs, Execution execution, Map<String, Object> in) {
Map<String, Object> results = ListUtils.emptyOnNull(inputs)
.stream()
.map((Function<Input, Optional<AbstractMap.SimpleEntry<String, Object>>>) input -> {
Object current = in == null ? null : in.get(input.getId());

View File

@@ -25,13 +25,12 @@ import java.util.regex.Pattern;
@Singleton
public class VariableRenderer {
private static final Pattern RAW_PATTERN = Pattern.compile("\\{%[-]*\\s*raw\\s*[-]*%\\}(.*?)\\{%[-]*\\s*endraw\\s*[-]*%\\}");
private static final Pattern RAW_PATTERN = Pattern.compile("(\\{%-*\\s*raw\\s*-*%}(.*?)\\{%-*\\s*endraw\\s*-*%})");
public static final int MAX_RENDERING_AMOUNT = 100;
private PebbleEngine pebbleEngine;
private final PebbleEngine pebbleEngine;
private final VariableConfiguration variableConfiguration;
@SuppressWarnings("unchecked")
@Inject
public VariableRenderer(ApplicationContext applicationContext, @Nullable VariableConfiguration variableConfiguration) {
this.variableConfiguration = variableConfiguration != null ? variableConfiguration : new VariableConfiguration();
@@ -81,20 +80,18 @@ public class VariableRenderer {
return inline;
}
return recursive
String render = recursive
? renderRecursively(inline, variables)
: renderOnce(inline, variables);
return RAW_PATTERN.matcher(render).replaceAll("$2");
}
public String renderOnce(String inline, Map<String, Object> variables) throws IllegalVariableEvaluationException {
// pre-process raw tags
Matcher rawMatcher = RAW_PATTERN.matcher(inline);
Map<String, String> replacers = new HashMap<>((int) Math.ceil(rawMatcher.groupCount() / 0.75));
String result = rawMatcher.replaceAll(matchResult -> {
var uuid = UUID.randomUUID().toString();
replacers.put(uuid, matchResult.group(1));
return uuid;
});
String result = replaceRawTags(rawMatcher, replacers);
try {
PebbleTemplate compiledTemplate = this.pebbleEngine.getLiteralTemplate(result);
@@ -108,7 +105,6 @@ public class VariableRenderer {
if (e instanceof PebbleException) {
throw properPebbleException((PebbleException) e);
}
throw new IllegalVariableEvaluationException(e);
} else {
result = alternativeRender;
@@ -116,17 +112,38 @@ public class VariableRenderer {
}
// post-process raw tags
for (var entry : replacers.entrySet()) {
result = result.replace(entry.getKey(), entry.getValue());
}
result = putBackRawTags(replacers, result);
return result;
}
/**
* This method can be used in fallback for rendering an input string.
*
* @param e The exception that was throw by the default variable renderer.
* @param inline The expression to be rendered.
* @param variables The context variables.
* @return The rendered string.
*/
protected String alternativeRender(Exception e, String inline, Map<String, Object> variables) throws IllegalVariableEvaluationException {
return null;
}
private static String putBackRawTags(Map<String, String> replacers, String result) {
for (var entry : replacers.entrySet()) {
result = result.replace(entry.getKey(), entry.getValue());
}
return result;
}
private static String replaceRawTags(Matcher rawMatcher, Map<String, String> replacers) {
return rawMatcher.replaceAll(matchResult -> {
var uuid = UUID.randomUUID().toString();
replacers.put(uuid, matchResult.group(1));
return uuid;
});
}
public String renderRecursively(String inline, Map<String, Object> variables) throws IllegalVariableEvaluationException {
return this.renderRecursively(0, inline, variables);
}
@@ -180,7 +197,8 @@ public class VariableRenderer {
return Optional.of(this.render(string, variables, recursive));
}
return Optional.empty();
// Return the given object if it cannot be rendered.
return Optional.ofNullable(object);
}
public List<Object> renderList(List<Object> list, Map<String, Object> variables) throws IllegalVariableEvaluationException {

View File

@@ -39,10 +39,9 @@ public class RenderFunction implements Function {
recursiveArg = true;
}
if (!(recursiveArg instanceof Boolean)) {
if (!(recursiveArg instanceof Boolean recursive)) {
throw new PebbleException(null, "The 'render' function expects an optional argument 'recursive' with type boolean.", lineNumber, self.getName());
}
Boolean recursive = (Boolean) recursiveArg;
EvaluationContextImpl evaluationContext = (EvaluationContextImpl) context;
Map<String, Object> variables = evaluationContext.getScopeChain().getGlobalScopes().stream()

View File

@@ -61,12 +61,14 @@ public abstract class AbstractScheduler implements Scheduler {
private final TaskDefaultService taskDefaultService;
private final WorkerGroupService workerGroupService;
private final LogService logService;
protected SchedulerExecutionStateInterface executionState;
// must be volatile as it's updated by the flow listener thread and read by the scheduleExecutor thread
private volatile Boolean isReady = false;
private final ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
@Getter
protected SchedulerTriggerStateInterface triggerState;
// schedulable and schedulableNextDate must be volatile and their access synchronized as they are updated and read by different threads.
@@ -270,7 +272,7 @@ public abstract class AbstractScheduler implements Scheduler {
logError(conditionContext, flow, abstractTrigger, e);
return null;
}
this.triggerState.save(triggerContext, scheduleContext);
this.triggerState.save(triggerContext, scheduleContext, "/kestra/services/scheduler/compute-schedulable/save/lastTrigger-nextDate-null");
} else {
triggerContext = lastTrigger;
}
@@ -372,7 +374,7 @@ public abstract class AbstractScheduler implements Scheduler {
Trigger triggerRunning = Trigger.of(f.getTriggerContext(), now);
try {
this.triggerState.save(triggerRunning, scheduleContext);
this.triggerState.save(triggerRunning, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-true/polling");
this.sendPollingTriggerToWorker(f);
} catch (InternalException e) {
logService.logTrigger(
@@ -398,7 +400,7 @@ public abstract class AbstractScheduler implements Scheduler {
schedule.nextEvaluationDate(f.getConditionContext(), Optional.of(f.getTriggerContext()))
);
trigger = trigger.checkBackfill();
this.triggerState.save(trigger, scheduleContext);
this.triggerState.save(trigger, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-true/schedule");
}
} else {
logService.logTrigger(
@@ -416,7 +418,7 @@ public abstract class AbstractScheduler implements Scheduler {
logError(f, e);
}
var trigger = f.getTriggerContext().toBuilder().nextExecutionDate(nextExecutionDate).build().checkBackfill();
this.triggerState.save(trigger, scheduleContext);
this.triggerState.save(trigger, scheduleContext, "/kestra/services/scheduler/handle/save/on-eval-false");
}
} catch (InternalException ie) {
// validate schedule condition can fail to render variables
@@ -427,13 +429,13 @@ public abstract class AbstractScheduler implements Scheduler {
.tenantId(f.getTriggerContext().getTenantId())
.namespace(f.getTriggerContext().getNamespace())
.flowId(f.getTriggerContext().getFlowId())
.flowRevision(f.getTriggerContext().getFlowRevision())
.flowRevision(f.getFlow().getRevision())
.labels(f.getFlow().getLabels())
.state(new State().withState(State.Type.FAILED))
.build();
ZonedDateTime nextExecutionDate = f.getPollingTrigger().nextEvaluationDate();
var trigger = f.getTriggerContext().resetExecution(State.Type.FAILED, nextExecutionDate);
this.saveLastTriggerAndEmitExecution(execution, trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext));
this.saveLastTriggerAndEmitExecution(execution, trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext, "/kestra/services/scheduler/handle/save/on-error"));
}
});
});
@@ -473,7 +475,7 @@ public abstract class AbstractScheduler implements Scheduler {
// Schedule triggers are being executed directly from the handle method within the context where triggers are locked.
// So we must save them by passing the scheduleContext.
this.saveLastTriggerAndEmitExecution(result.getExecution(), trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext));
this.saveLastTriggerAndEmitExecution(result.getExecution(), trigger, triggerToSave -> this.triggerState.save(triggerToSave, scheduleContext, "/kestra/services/scheduler/handleEvaluateSchedulingTriggerResult/save"));
}
protected void saveLastTriggerAndEmitExecution(Execution execution, Trigger trigger, Consumer<Trigger> saveAction) {
@@ -491,8 +493,10 @@ public abstract class AbstractScheduler implements Scheduler {
return true;
}
// The execution is not yet started, we skip
if (lastTrigger.getExecutionCurrentState() == null) {
Optional<Execution> execution = executionState.findById(lastTrigger.getTenantId(), lastTrigger.getExecutionId());
// executionState hasn't received the execution, we skip
if (execution.isEmpty()) {
if (lastTrigger.getUpdatedDate() != null) {
metricRegistry
.timer(MetricRegistry.SCHEDULER_EXECUTION_MISSING_DURATION, metricRegistry.tags(lastTrigger))
@@ -519,6 +523,10 @@ public abstract class AbstractScheduler implements Scheduler {
.record(Duration.between(lastTrigger.getUpdatedDate(), Instant.now()));
}
// TODO if we set the state in the trigger after it has been started we can avoid getting the execution and
// check that if an executionId but no state, this means the execution is not started
// we need to have {@code lastTrigger.getExecutionId() == null} to be tell the execution is not running.
// the scheduler will clean the execution from the trigger and we don't keep only terminated state as an end.
if (log.isDebugEnabled()) {
logService.logTrigger(
f.getTriggerContext(),
@@ -526,7 +534,7 @@ public abstract class AbstractScheduler implements Scheduler {
Level.DEBUG,
"Execution '{}' is still '{}', updated at '{}'",
lastTrigger.getExecutionId(),
lastTrigger.getExecutionCurrentState(),
execution.get().getState().getCurrent(),
lastTrigger.getUpdatedDate()
);
}

View File

@@ -1,4 +1,11 @@
package io.kestra.core.schedulers;
import java.util.function.Consumer;
// For tests purpose
public class DefaultScheduleContext implements ScheduleContextInterface {}
public class DefaultScheduleContext implements ScheduleContextInterface {
@Override
public void doInTransaction(Consumer<ScheduleContextInterface> consumer) {
consumer.accept(this);
}
}

View File

@@ -36,10 +36,12 @@ public class DefaultScheduler extends AbstractScheduler {
public DefaultScheduler(
ApplicationContext applicationContext,
FlowListenersInterface flowListeners,
SchedulerExecutionStateInterface executionState,
SchedulerTriggerStateInterface triggerState
) {
super(applicationContext, flowListeners);
this.triggerState = triggerState;
this.executionState = executionState;
this.conditionService = applicationContext.getBean(ConditionService.class);
this.flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);

View File

@@ -1,4 +1,14 @@
package io.kestra.core.schedulers;
import java.util.function.Consumer;
/**
* This context is used by the Scheduler to allow evaluating and updating triggers in a transaction from the main evaluation loop.
* See AbstractScheduler.handle().
*/
public interface ScheduleContextInterface {
/**
* Do trigger retrieval and updating in a single transaction.
*/
void doInTransaction(Consumer<ScheduleContextInterface> consumer);
}

View File

@@ -0,0 +1,19 @@
package io.kestra.core.schedulers;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import java.util.Optional;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@Singleton
public class SchedulerExecutionState implements SchedulerExecutionStateInterface {
@Inject
private ExecutionRepositoryInterface executionRepository;
@Override
public Optional<Execution> findById(String tenantId, String id) {
return executionRepository.findById(tenantId, id);
}
}

View File

@@ -0,0 +1,9 @@
package io.kestra.core.schedulers;
import io.kestra.core.models.executions.Execution;
import java.util.Optional;
public interface SchedulerExecutionStateInterface {
Optional<Execution> findById(String tenantId, String id);
}

View File

@@ -20,12 +20,22 @@ public interface SchedulerTriggerStateInterface {
Trigger create(Trigger trigger) throws ConstraintViolationException;
Trigger save(Trigger trigger, ScheduleContextInterface scheduleContext, String headerContent) throws ConstraintViolationException;
Trigger create(Trigger trigger, String headerContent) throws ConstraintViolationException;
Trigger update(Trigger trigger);
Trigger update(Flow flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) throws Exception;
/**
* Used by the JDBC implementation: find triggers in all tenants.
*/
List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContext);
// Required for Kafka
/**
* Used by the Kafka implementation: find triggers in the scheduler assigned flow (as in Kafka partition assignment).
*/
List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext);
}

View File

@@ -85,22 +85,22 @@ public class CollectorService {
return defaultUsage;
}
public Usage metrics() {
Usage.UsageBuilder<?, ?> builder = defaultUsage().toBuilder()
public Usage metrics(boolean details) {
Usage.UsageBuilder<?, ?> builder = defaultUsage()
.toBuilder()
.uuid(IdUtils.create());
if (serverType == ServerType.EXECUTOR || serverType == ServerType.STANDALONE) {
builder
if (details) {
builder = builder
.flows(FlowUsage.of(flowRepository))
.executions(ExecutionUsage.of(executionRepository));
}
return builder.build();
}
public void report() {
try {
Usage metrics = this.metrics();
Usage metrics = this.metrics(serverType == ServerType.EXECUTOR || serverType == ServerType.STANDALONE);
MutableHttpRequest<Usage> post = this.request(metrics);
if (log.isTraceEnabled()) {

View File

@@ -20,6 +20,7 @@ import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.repositories.MetricRepositoryInterface;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.tasks.flows.Pause;
import io.kestra.core.tasks.flows.WorkingDirectory;
import io.kestra.core.utils.GraphUtils;
import io.kestra.core.utils.IdUtils;
@@ -322,6 +323,41 @@ public class ExecutionService {
return unpausedExecution;
}
/**
* Resume a paused execution to a new state.
* Providing the flow, we can check if the PauseTask has subtasks,
* if not, we can directly set the task to success.
* The execution must be paused or this call will be a no-op.
*
* @param execution the execution to resume
* @param newState should be RUNNING or KILLING, other states may lead to undefined behaviour
* @param flow the flow of the execution
* @return the execution in the new state.
* @throws InternalException if the state of the execution cannot be updated
*/
public Execution resume(Execution execution, State.Type newState, Flow flow) throws InternalException {
var runningTaskRun = execution
.findFirstByState(State.Type.PAUSED)
.map(taskRun -> {
try {
Task task = flow.findTaskByTaskId(taskRun.getTaskId());
if (task instanceof Pause pauseTask && pauseTask.getTasks() == null && newState == State.Type.RUNNING) {
return taskRun.withState(State.Type.SUCCESS);
}
return taskRun.withState(newState);
} catch (InternalException e) {
throw new RuntimeException(e);
}
}
)
.orElseThrow(() -> new IllegalArgumentException("No paused task found on execution " + execution.getId()));
var unpausedExecution = execution
.withTaskRun(runningTaskRun)
.withState(newState);
this.eventPublisher.publishEvent(new CrudEvent<>(execution, CrudEventType.UPDATE));
return unpausedExecution;
}
/**
* Lookup for all executions triggered by given execution id, and returns all the relevant
* {@link ExecutionKilled events} that should be requested. This method is not responsible for executing the events.
@@ -432,7 +468,7 @@ public class ExecutionService {
return Stream
.concat(
execution
.findChilds(taskRun)
.findParents(taskRun)
.stream(),
Stream.of(taskRun)
)

View File

@@ -1,11 +1,15 @@
package io.kestra.core.services;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.hierarchies.*;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.core.utils.GraphUtils;
import io.kestra.core.utils.Rethrow;
import io.micronaut.data.model.Pageable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
@@ -19,20 +23,38 @@ public class GraphService {
@Inject
private FlowRepositoryInterface flowRepository;
@Inject
private TriggerRepositoryInterface triggerRepository;
@Inject
private TaskDefaultService taskDefaultService;
public FlowGraph flowGraph(Flow flow, List<String> expandedSubflows) throws IllegalVariableEvaluationException {
return FlowGraph.of(this.of(flow, Optional.ofNullable(expandedSubflows).orElse(Collections.emptyList()), new HashMap<>()));
return this.flowGraph(flow, expandedSubflows, null);
}
public FlowGraph flowGraph(Flow flow, List<String> expandedSubflows, Execution execution) throws IllegalVariableEvaluationException {
return FlowGraph.of(this.of(flow, Optional.ofNullable(expandedSubflows).orElse(Collections.emptyList()), new HashMap<>(), execution));
}
public GraphCluster of(Flow flow, List<String> expandedSubflows, Map<String, Flow> flowByUid) throws IllegalVariableEvaluationException {
return this.of(null, flow, expandedSubflows, flowByUid);
return this.of(flow, expandedSubflows, flowByUid, null);
}
public GraphCluster of(Flow flow, List<String> expandedSubflows, Map<String, Flow> flowByUid, Execution execution) throws IllegalVariableEvaluationException {
return this.of(null, flow, expandedSubflows, flowByUid, execution);
}
public GraphCluster of(GraphCluster baseGraph, Flow flow, List<String> expandedSubflows, Map<String, Flow> flowByUid) throws IllegalVariableEvaluationException {
return this.of(baseGraph, flow, expandedSubflows, flowByUid, null);
}
public GraphCluster of(GraphCluster baseGraph, Flow flow, List<String> expandedSubflows, Map<String, Flow> flowByUid, Execution execution) throws IllegalVariableEvaluationException {
String tenantId = flow.getTenantId();
flow = taskDefaultService.injectDefaults(flow);
GraphCluster graphCluster = GraphUtils.of(baseGraph, flow, null);
List<Trigger> triggers = null;
if (flow.getTriggers() != null) {
triggers = triggerRepository.find(Pageable.UNPAGED, null, tenantId, flow.getNamespace(), flow.getId());
}
GraphCluster graphCluster = GraphUtils.of(baseGraph, flow, execution, triggers);
Stream<Map.Entry<GraphCluster, SubflowGraphTask>> subflowToReplaceByParent = graphCluster.allNodesByParent().entrySet().stream()

View File

@@ -1,5 +1,8 @@
package io.kestra.core.services;
import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import jakarta.inject.Singleton;
import java.util.Collections;
@@ -8,12 +11,67 @@ import java.util.List;
@Singleton
public class SkipExecutionService {
private volatile List<String> skipExecutions = Collections.emptyList();
private volatile List<FlowId> skipFlows = Collections.emptyList();
private volatile List<NamespaceId> skipNamespaces = Collections.emptyList();
private volatile List<String> skipTenants = Collections.emptyList();
public synchronized void setSkipExecutions(List<String> skipExecutions) {
this.skipExecutions = skipExecutions;
this.skipExecutions = skipExecutions == null ? Collections.emptyList() : skipExecutions;
}
public synchronized void setSkipFlows(List<String> skipFlows) {
this.skipFlows = skipFlows == null ? Collections.emptyList() : skipFlows.stream().map(FlowId::from).toList();
}
public synchronized void setSkipNamespaces(List<String> skipNamespaces) {
this.skipNamespaces = skipNamespaces == null ? Collections.emptyList() : skipNamespaces.stream().map(NamespaceId::from).toList();
}
public synchronized void setSkipTenants(List<String> skipTenants) {
this.skipTenants = skipTenants == null ? Collections.emptyList() : skipTenants;
}
/**
* Warning: this method didn't check the flow, so it must be used only when neither of the others can be used.
*/
public boolean skipExecution(String executionId) {
return skipExecutions.contains(executionId);
}
public boolean skipExecution(Execution execution) {
return skipExecution(execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId());
}
public boolean skipExecution(TaskRun taskRun) {
return skipExecution(taskRun.getTenantId(), taskRun.getNamespace(), taskRun.getFlowId(), taskRun.getExecutionId());
}
@VisibleForTesting
boolean skipExecution(String tenant, String namespace, String flow, String executionId) {
return (tenant != null && skipTenants.contains(tenant)) ||
skipNamespaces.contains(new NamespaceId(tenant, namespace)) ||
skipFlows.contains(new FlowId(tenant, namespace, flow)) ||
(executionId != null && skipExecutions.contains(executionId));
}
private static String[] splitIdParts(String id) {
return id.split("\\|");
}
record FlowId(String tenant, String namespace, String flow) {
static FlowId from(String flowId) {
String[] parts = SkipExecutionService.splitIdParts(flowId);
if (parts.length == 3) {
return new FlowId(parts[0], parts[1], parts[2]);
}
return new FlowId(null, parts[0], parts[1]);
}
};
record NamespaceId(String tenant, String namespace) {
static NamespaceId from(String namespaceId) {
String[] parts = SkipExecutionService.splitIdParts(namespaceId);
return new NamespaceId(parts[0], parts[1]);
}
};
}

View File

@@ -107,7 +107,7 @@ public abstract class StorageService {
writers.forEach(throwConsumer(RandomAccessFile::close));
return files;
return files.stream().filter(p -> p.toFile().length() > 0).toList();
}
}

View File

@@ -0,0 +1,7 @@
package io.kestra.core.tasks.flows;
public interface ChildFlowInterface {
String getNamespace();
String getFlowId();
}

View File

@@ -173,7 +173,7 @@ import static io.kestra.core.utils.Rethrow.throwFunction;
)
}
)
public class ForEachItem extends Task implements FlowableTask<VoidOutput> {
public class ForEachItem extends Task implements FlowableTask<VoidOutput>, ChildFlowInterface {
@NotEmpty
@PluginProperty(dynamic = true)
@Schema(title = "The items to be split into batches and processed. Make sure to set it to Kestra's internal storage URI. This can be either the output from a previous task, formatted as `{{ outputs.task_id.uri }}`, or a FILE type input parameter, like `{{ inputs.myfile }}`. This task is optimized for files where each line represents a single item. Suitable file types include Amazon ION-type files (commonly produced by Query tasks), newline-separated JSON files, or CSV files formatted with one row per line and without a header. For files in other formats such as Excel, CSV, Avro, Parquet, XML, or JSON, it's recommended to first convert them to the ION format. This can be done using the conversion tasks available in the `io.kestra.plugin.serdes` module, which will transform files from their original format to ION.")

View File

@@ -123,7 +123,7 @@ public class Pause extends Sequential implements FlowableTask<VoidOutput> {
private boolean needPause(TaskRun parentTaskRun) {
return parentTaskRun.getState().getCurrent() == State.Type.RUNNING &&
parentTaskRun.getState().getHistories().get(parentTaskRun.getState().getHistories().size() - 2).getState() != State.Type.PAUSED;
parentTaskRun.getState().getHistories().stream().noneMatch(history -> history.getState() == State.Type.PAUSED);
}
@Override

View File

@@ -11,29 +11,14 @@ import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.ExecutableUtils;
import io.kestra.core.runners.FlowExecutorInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.runners.SubflowExecution;
import io.kestra.core.runners.SubflowExecutionResult;
import io.kestra.core.runners.*;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.experimental.SuperBuilder;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.util.*;
import java.util.stream.Collectors;
@SuperBuilder
@@ -60,7 +45,7 @@ import java.util.stream.Collectors;
)
}
)
public class Subflow extends Task implements ExecutableTask<Subflow.Output> {
public class Subflow extends Task implements ExecutableTask<Subflow.Output>, ChildFlowInterface {
static final String PLUGIN_FLOW_OUTPUTS_ENABLED = "outputs.enabled";
@@ -150,6 +135,7 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output> {
if (this.labels != null) {
for (Map.Entry<String, String> entry : this.labels.entrySet()) {
labels.removeIf(label -> label.key().equals(entry.getKey()));
labels.add(new Label(entry.getKey(), runContext.render(entry.getValue())));
}
}

View File

@@ -309,11 +309,16 @@ public class WorkingDirectory extends Sequential implements NamespaceFilesInterf
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
ZipOutputStream archive = new ZipOutputStream(bos)) {
for (var file : matchesList) {
var relativeFileName = file.toFile().getPath().substring(runContext.tempDir().toString().length() + 1);
for (var path : matchesList) {
File file = path.toFile();
if (file.isDirectory() || !file.canRead()) {
continue;
}
var relativeFileName = file.getPath().substring(runContext.tempDir().toString().length() + 1);
var zipEntry = new ZipEntry(relativeFileName);
archive.putNextEntry(zipEntry);
archive.write(Files.readAllBytes(file));
archive.write(Files.readAllBytes(path));
archive.closeEntry();
}

View File

@@ -13,10 +13,7 @@ import io.kestra.core.serializers.JacksonMapper;
import io.micronaut.core.util.functional.ThrowingFunction;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.io.BufferedReader;
@@ -57,6 +54,7 @@ import java.util.Map;
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public class DeduplicateItems extends Task implements RunnableTask<DeduplicateItems.Output> {
@Schema(

View File

@@ -15,10 +15,7 @@ import io.kestra.core.utils.TruthUtils;
import io.micronaut.core.util.functional.ThrowingFunction;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import lombok.*;
import lombok.experimental.SuperBuilder;
import java.io.BufferedReader;
@@ -56,6 +53,7 @@ import java.util.Map;
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public class FilterItems extends Task implements RunnableTask<FilterItems.Output> {
@Schema(

View File

@@ -9,27 +9,33 @@ import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.FlowableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.tasks.flows.Dag;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class GraphUtils {
public static FlowGraph flowGraph(Flow flow, Execution execution) throws IllegalVariableEvaluationException {
return FlowGraph.of(GraphUtils.of(flow, execution));
return GraphUtils.flowGraph(flow, execution, null);
}
public static GraphCluster of(GraphCluster graph, Flow flow, Execution execution) throws IllegalVariableEvaluationException {
public static FlowGraph flowGraph(Flow flow, Execution execution, List<Trigger> triggers) throws IllegalVariableEvaluationException {
return FlowGraph.of(GraphUtils.of(flow, execution, triggers));
}
public static GraphCluster of(GraphCluster graph, Flow flow, Execution execution, List<Trigger> triggers) throws IllegalVariableEvaluationException {
if (graph == null) {
graph = new GraphCluster();
}
if (flow.getTriggers() != null) {
GraphCluster triggers = GraphUtils.triggers(graph, flow.getTriggers());
graph.addEdge(triggers.getEnd(), graph.getRoot(), new Relation());
GraphCluster triggersClusters = GraphUtils.triggers(graph, flow.getTriggers(), triggers);
graph.addEdge(triggersClusters.getEnd(), graph.getRoot(), new Relation());
}
GraphUtils.sequential(
@@ -44,16 +50,29 @@ public class GraphUtils {
}
public static GraphCluster of(Flow flow, Execution execution) throws IllegalVariableEvaluationException {
return GraphUtils.of(new GraphCluster(), flow, execution);
return GraphUtils.of(flow, execution, null);
}
public static GraphCluster triggers(GraphCluster graph, List<AbstractTrigger> triggers) throws IllegalVariableEvaluationException {
public static GraphCluster of(Flow flow, Execution execution, List<Trigger> triggers) throws IllegalVariableEvaluationException {
return GraphUtils.of(new GraphCluster(), flow, execution, triggers);
}
public static GraphCluster triggers(GraphCluster graph, List<AbstractTrigger> triggersDeclarations, List<Trigger> triggers) throws IllegalVariableEvaluationException {
GraphCluster triggerCluster = new GraphCluster("Triggers");
graph.addNode(triggerCluster);
triggers.forEach(trigger -> {
GraphTrigger triggerNode = new GraphTrigger(trigger);
Map<String, Trigger> triggersById = Optional.ofNullable(triggers)
.map(Collection::stream)
.map(s -> s.collect(Collectors.toMap(
Trigger::getTriggerId,
Function.identity(),
(a, b) -> a.getNamespace().length() <= b.getNamespace().length() ? a : b
)))
.orElse(Collections.emptyMap());
triggersDeclarations.forEach(trigger -> {
GraphTrigger triggerNode = new GraphTrigger(trigger, triggersById.get(trigger.getId()));
triggerCluster.addNode(triggerNode);
triggerCluster.addEdge(triggerCluster.getRoot(), triggerNode, new Relation());
triggerCluster.addEdge(triggerNode, triggerCluster.getEnd(), new Relation());
@@ -264,7 +283,7 @@ public class GraphUtils {
);
if (execution != null && currentTaskRun != null) {
parentValues = execution.findChildsValues(currentTaskRun, true);
parentValues = execution.findParentsValues(currentTaskRun, true);
}
@@ -367,7 +386,7 @@ public class GraphUtils {
);
if (execution != null && currentTaskRun != null) {
parentValues = execution.findChildsValues(currentTaskRun, true);
parentValues = execution.findParentsValues(currentTaskRun, true);
}
// detect kids

View File

@@ -4,7 +4,6 @@ import com.google.common.collect.Lists;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@SuppressWarnings({"rawtypes", "unchecked"})
public class MapUtils {
@@ -23,12 +22,11 @@ public class MapUtils {
Map copy = copyMap(a);
Map<String, Object> copyMap = b
.entrySet()
.stream()
.collect(
HashMap::new,
() -> newHashMap(copy.size()),
(m, v) -> {
Object original = copy.get(v.getKey());
Object value = v.getValue();
@@ -45,19 +43,14 @@ public class MapUtils {
} else if (value instanceof Collection
&& original instanceof Collection) {
try {
Collection merge =
copyCollection(
found = Lists
.newArrayList(
(Collection) original,
(List) Lists
.newArrayList(
(Collection) original,
(Collection) value
)
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toList())
);
found = merge;
(Collection) value
)
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -81,15 +74,15 @@ public class MapUtils {
.entrySet()
.stream()
.collect(
HashMap::new,
() -> newHashMap(original.size()),
(map, entry) -> {
Object value = entry.getValue();
Object found;
if (value instanceof Map) {
found = copyMap((Map) value);
found = cloneMap((Map) value);
} else if (value instanceof Collection) {
found = copyCollection((Collection) value, (Collection) value);
found = cloneCollection((Collection) value);
} else {
found = value;
}
@@ -101,9 +94,19 @@ public class MapUtils {
);
}
private static Collection copyCollection(Collection collection, Collection elements) {
private static Map cloneMap(Map elements) {
try {
Collection newInstance = collection.getClass().getDeclaredConstructor().newInstance();
Map newInstance = elements.getClass().getDeclaredConstructor().newInstance();
newInstance.putAll(elements);
return newInstance;
} catch (Exception e) {
return new HashMap(elements);
}
}
private static Collection cloneCollection(Collection elements) {
try {
Collection newInstance = elements.getClass().getDeclaredConstructor().newInstance();
newInstance.addAll(elements);
return newInstance;
} catch (Exception e) {
@@ -124,4 +127,17 @@ public class MapUtils {
// https://bugs.openjdk.org/browse/JDK-8148463
.collect(HashMap::new, (m, v) -> m.put(v.getKey(), v.getValue()), HashMap::putAll);
}
/**
* Creates a hash map that can hold <code>numMappings</code> entry.
* This is a copy of the same methods available starting with Java 19.
*/
public static <K, V> HashMap<K, V> newHashMap(int numMappings) {
if (numMappings < 0) {
throw new IllegalArgumentException("Negative number of mappings: " + numMappings);
}
int hashMapCapacity = (int) Math.ceil(numMappings / 0.75d);
return new HashMap<>(hashMapCapacity);
}
}

View File

@@ -0,0 +1,12 @@
package io.kestra.core.validations;
import jakarta.validation.Constraint;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
@Retention(RetentionPolicy.RUNTIME)
@Constraint(validatedBy = {})
public @interface TaskDefaultValidation {
String message() default "invalid taskDefault";
}

View File

@@ -0,0 +1,56 @@
package io.kestra.core.validations.validator;
import io.kestra.core.models.flows.TaskDefault;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.serializers.YamlFlowParser;
import io.kestra.core.services.TaskDefaultService;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.Introspected;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.validation.validator.constraints.ConstraintValidator;
import io.micronaut.validation.validator.constraints.ConstraintValidatorContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import io.kestra.core.validations.TaskDefaultValidation;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Singleton
@Introspected
public class TaskDefaultValidator implements ConstraintValidator<TaskDefaultValidation, TaskDefault> {
@Override
public boolean isValid(@Nullable TaskDefault value, @NonNull AnnotationValue<TaskDefaultValidation> annotationMetadata, @NonNull ConstraintValidatorContext context) {
if (value == null) {
return false;
}
List<String> violations = new ArrayList<>();
if (value.getValues() == null) {
violations.add("Null values map found");
context.messageTemplate("Invalid Task Default: " + String.join(", ", violations));
return false;
}
// Check if the "values" map is empty
for (Map.Entry<String, Object> entry : value.getValues().entrySet()) {
if (entry.getValue() == null) {
violations.add("Null value found in values with key " + entry.getKey());
}
}
if (!violations.isEmpty()) {
context.messageTemplate("Invalid Task Default: " + String.join(", ", violations));
return false;
} else {
return true;
}
}
}

View File

@@ -98,10 +98,10 @@ class ClassPluginDocumentationTest {
ClassPluginDocumentation<? extends AbstractTrigger> doc = ClassPluginDocumentation.of(jsonSchemaGenerator, scan, Schedule.class, null);
assertThat(doc.getDefs().size(), is(4));
assertThat(doc.getDefs().size(), is(1));
assertThat(((Map<String, Object>) doc.getDefs().get("io.kestra.core.models.conditions.ScheduleCondition")).get("type"), is("object"));
assertThat(((Map<String, Object>) ((Map<String, Object>) doc.getDefs().get("io.kestra.core.models.conditions.ScheduleCondition")).get("properties")).size(), is(0));
assertThat(((Map<String, Object>) doc.getDefs().get("io.kestra.core.models.tasks.WorkerGroup")).get("type"), is("object"));
assertThat(((Map<String, Object>) ((Map<String, Object>) doc.getDefs().get("io.kestra.core.models.tasks.WorkerGroup")).get("properties")).size(), is(1));
}));
}
}

View File

@@ -0,0 +1,22 @@
package io.kestra.core.encryption;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Test;
import java.security.GeneralSecurityException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
public class EncryptionServiceTest {
private static final String KEY = "I6EGNzRESu3X3pKZidrqCGOHQFUFC0yK";
@Test
public void avoidNpeForEmptyOrNullText() throws GeneralSecurityException {
assertThat(EncryptionService.encrypt(KEY, null), nullValue());
assertThat(EncryptionService.decrypt(KEY, null), nullValue());
assertThat(EncryptionService.encrypt(KEY, ""), is(""));
assertThat(EncryptionService.decrypt(KEY, ""), is(""));
}
}

View File

@@ -4,6 +4,10 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.types.Schedule;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.core.runners.AbstractMemoryRunnerTest;
import io.kestra.core.serializers.YamlFlowParser;
import io.kestra.core.services.GraphService;
@@ -11,6 +15,7 @@ import io.kestra.core.tasks.flows.Subflow;
import io.kestra.core.tasks.flows.Switch;
import io.kestra.core.utils.GraphUtils;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.data.model.Pageable;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
@@ -31,6 +36,12 @@ class FlowGraphTest extends AbstractMemoryRunnerTest {
@Inject
private GraphService graphService;
@Inject
private TriggerRepositoryInterface triggerRepositoryInterface;
@Inject
private FlowRepositoryInterface flowRepositoryInterface;
@Test
void simple() throws IllegalVariableEvaluationException {
Flow flow = this.parse("flows/valids/return.yaml");
@@ -114,9 +125,9 @@ class FlowGraphTest extends AbstractMemoryRunnerTest {
Flow flow = this.parse("flows/valids/switch.yaml");
FlowGraph flowGraph = GraphUtils.flowGraph(flow, null);
assertThat(flowGraph.getNodes().size(), is(14));
assertThat(flowGraph.getEdges().size(), is(17));
assertThat(flowGraph.getClusters().size(), is(2));
assertThat(flowGraph.getNodes().size(), is(17));
assertThat(flowGraph.getEdges().size(), is(20));
assertThat(flowGraph.getClusters().size(), is(3));
assertThat(edge(flowGraph, ".*parent-seq", ".*parent-seq\\.[^.]*").getRelation().getRelationType(), is(RelationType.CHOICE));
assertThat(edge(flowGraph, ".*parent-seq", ".*parent-seq\\.t3\\.[^.]*").getRelation().getValue(), is("THIRD"));
@@ -205,11 +216,17 @@ class FlowGraphTest extends AbstractMemoryRunnerTest {
@Test
void trigger() throws IllegalVariableEvaluationException {
Flow flow = this.parse("flows/valids/trigger-flow-listener.yaml");
FlowGraph flowGraph = GraphUtils.flowGraph(flow, null);
triggerRepositoryInterface.save(
Trigger.of(flow, flow.getTriggers().get(0)).toBuilder().disabled(true).build()
);
FlowGraph flowGraph = graphService.flowGraph(flow, null);
assertThat(flowGraph.getNodes().size(), is(6));
assertThat(flowGraph.getEdges().size(), is(5));
assertThat(flowGraph.getClusters().size(), is(1));
AbstractGraph triggerGraph = flowGraph.getNodes().stream().filter(e -> e instanceof GraphTrigger).findFirst().orElseThrow();
assertThat(((GraphTrigger) triggerGraph).getTrigger().getDisabled(), is(true));
}
@Test
@@ -227,15 +244,15 @@ class FlowGraphTest extends AbstractMemoryRunnerTest {
Flow flow = this.parse("flows/valids/task-flow.yaml");
FlowGraph flowGraph = GraphUtils.flowGraph(flow, null);
assertThat(flowGraph.getNodes().size(), is(3));
assertThat(flowGraph.getEdges().size(), is(2));
assertThat(flowGraph.getClusters().size(), is(0));
assertThat(flowGraph.getNodes().size(), is(6));
assertThat(flowGraph.getEdges().size(), is(5));
assertThat(flowGraph.getClusters().size(), is(1));
flowGraph = graphService.flowGraph(flow, Collections.singletonList("root.launch"));
assertThat(flowGraph.getNodes().size(), is(17));
assertThat(flowGraph.getEdges().size(), is(20));
assertThat(flowGraph.getClusters().size(), is(3));
assertThat(flowGraph.getNodes().size(), is(23));
assertThat(flowGraph.getEdges().size(), is(26));
assertThat(flowGraph.getClusters().size(), is(5));
assertThat(((SubflowGraphTask) ((SubflowGraphCluster) cluster(flowGraph, "root\\.launch").getCluster()).getTaskNode()).getExecutableTask().subflowId().flowId(), is("switch"));
SubflowGraphTask subflowGraphTask = (SubflowGraphTask) nodeByUid(flowGraph, "root.launch");
@@ -245,6 +262,11 @@ class FlowGraphTest extends AbstractMemoryRunnerTest {
GraphTask switchNode = (GraphTask) nodeByUid(flowGraph, "root.launch.parent-seq");
assertThat(switchNode.getTask(), instanceOf(Switch.class));
assertThat(switchNode.getRelationType(), is(RelationType.CHOICE));
GraphTrigger flowTrigger = (GraphTrigger) nodeByUid(flowGraph, "root.Triggers.schedule");
assertThat(flowTrigger.getTriggerDeclaration(), instanceOf(Schedule.class));
GraphTrigger subflowTrigger = (GraphTrigger) nodeByUid(flowGraph, "root.launch.Triggers.schedule");
assertThat(subflowTrigger.getTriggerDeclaration(), instanceOf(Schedule.class));
}
private Flow parse(String path) {

View File

@@ -72,6 +72,10 @@ public abstract class AbstractTriggerRepositoryTest {
assertThat(find.size(), is(4));
assertThat(find.get(0).getNamespace(), is(namespace));
find = triggerRepository.find(Pageable.from(1, 4, Sort.of(Sort.Order.asc("namespace"))), null, null, null, searchedTrigger.getFlowId());
assertThat(find.size(), is(1));
assertThat(find.get(0).getFlowId(), is(searchedTrigger.getFlowId()));
find = triggerRepository.find(Pageable.from(1, 100, Sort.of(Sort.Order.asc(triggerRepository.sortMapping().apply("triggerId")))), null, null, namespacePrefix);
assertThat(find.size(), is(1));
assertThat(find.get(0).getTriggerId(), is(trigger.getTriggerId()));

View File

@@ -1,17 +1,24 @@
package io.kestra.core.runners;
import io.kestra.core.encryption.EncryptionService;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.executions.metrics.Timer;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.flows.Type;
import io.kestra.core.models.flows.input.StringInput;
import io.kestra.core.models.tasks.common.EncryptedString;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.tasks.test.PollingTrigger;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.context.annotation.Property;
import io.micronaut.context.annotation.Value;
@@ -245,4 +252,24 @@ class RunContextTest extends AbstractMemoryRunnerTest {
// the output is automatically decrypted so the return has the decrypted value of the hello task output
assertThat(returnTask.getOutputs().get("value"), is("Hello World"));
}
@Test
void withDefaultInput() throws IllegalVariableEvaluationException {
Flow flow = Flow.builder().id("triggerWithDefaultInput").namespace("io.kestra.test").revision(1).inputs(List.of(StringInput.builder().id("test").type(Type.STRING).defaults("test").build())).build();
Execution execution = Execution.builder().id(IdUtils.create()).flowId("triggerWithDefaultInput").namespace("io.kestra.test").state(new State()).build();
RunContext runContext = runContextFactory.of(flow, execution);
assertThat(runContext.render("{{inputs.test}}"), is("test"));
}
@Test
void withNullLabel() throws IllegalVariableEvaluationException {
Flow flow = Flow.builder().id("triggerWithDefaultInput").namespace("io.kestra.test").revision(1).inputs(List.of(StringInput.builder().id("test").type(Type.STRING).defaults("test").build())).build();
Execution execution = Execution.builder().id(IdUtils.create()).flowId("triggerWithDefaultInput").namespace("io.kestra.test").state(new State()).labels(List.of(new Label("key", null))).build();
RunContext runContext = runContextFactory.of(flow, execution);
assertThat(runContext.render("{{inputs.test}}"), is("test"));
}
}

View File

@@ -0,0 +1,43 @@
package io.kestra.core.runners;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.micronaut.context.ApplicationContext;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Map;
@MicronautTest
class VariableRendererTest {
@Inject
ApplicationContext applicationContext;
@Inject
VariableRenderer.VariableConfiguration variableConfiguration;
@Test
void shouldRenderUsingAlternativeRendering() throws IllegalVariableEvaluationException {
TestVariableRenderer renderer = new TestVariableRenderer(applicationContext, variableConfiguration);
String render = renderer.render("{{ dummy }}", Map.of());
Assertions.assertEquals("result", render);
}
public static class TestVariableRenderer extends VariableRenderer {
public TestVariableRenderer(ApplicationContext applicationContext,
VariableConfiguration variableConfiguration) {
super(applicationContext, variableConfiguration);
}
@Override
protected String alternativeRender(Exception e, String inline, Map<String, Object> variables) throws IllegalVariableEvaluationException {
return "result";
}
}
}

View File

@@ -10,6 +10,8 @@ import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
@@ -63,4 +65,9 @@ class RecursivePebbleVariableRendererTest {
);
assertThat(illegalVariableEvaluationException.getMessage(), containsString("Function or Macro [render] does not exist"));
}
@Test
void renderFunctionKeepRaw() throws IllegalVariableEvaluationException {
assertThat(variableRenderer.render("{% raw %}{{first}}{% endraw %}", Collections.emptyMap()), is("{{first}}"));
}
}

View File

@@ -0,0 +1,62 @@
package io.kestra.core.runners.pebble.functions;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.VariableRenderer;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.time.*;
import java.util.HashMap;
import java.util.Map;
@MicronautTest
class RenderFunctionTest {
@Inject
VariableRenderer variableRenderer;
@Test
void shouldRenderForString() throws IllegalVariableEvaluationException {
String rendered = variableRenderer.render("{{ render(input) }}", Map.of("input", "test"));
Assertions.assertEquals("test", rendered);
}
@Test
void shouldRenderForInteger() throws IllegalVariableEvaluationException {
String rendered = variableRenderer.render("{{ render(input) }}", Map.of("input", 42));
Assertions.assertEquals("42", rendered);
}
@Test
void shouldRenderForLong() throws IllegalVariableEvaluationException {
String rendered = variableRenderer.render("{{ render(input) }}", Map.of("input", 42L));
Assertions.assertEquals("42", rendered);
}
@Test
void shouldRenderForBoolean() throws IllegalVariableEvaluationException {
String rendered = variableRenderer.render("{{ render(input) }}", Map.of("input", true));
Assertions.assertEquals("true", rendered);
}
@Test
void shouldRenderForNull() throws IllegalVariableEvaluationException {
String rendered = variableRenderer.render("{{ render(input) }}", new HashMap<>(){{put("input", null);}});
Assertions.assertEquals("", rendered);
}
@Test
void shouldRenderForDateTime() throws IllegalVariableEvaluationException {
Instant now = Instant.now();
LocalDateTime datetime = LocalDateTime.ofInstant(now, ZoneOffset.UTC);
String rendered = variableRenderer.render("{{ render(input) }}", Map.of("input", datetime));
Assertions.assertEquals(datetime.toString(), rendered);
}
@Test
void shouldRenderForDuration() throws IllegalVariableEvaluationException {
String rendered = variableRenderer.render("{{ render(input) }}", Map.of("input", Duration.ofSeconds(5)));
Assertions.assertEquals(Duration.ofSeconds(5).toString(), rendered);
}
}

View File

@@ -7,7 +7,6 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.types.Schedule;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.runners.TestMethodScopedWorker;
import io.kestra.core.runners.Worker;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
@@ -17,13 +16,13 @@ import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.*;
class SchedulerConditionTest extends AbstractSchedulerTest {
@Inject
@@ -32,6 +31,9 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
@Inject
protected SchedulerTriggerStateInterface triggerState;
@Inject
protected SchedulerExecutionStateInterface executionState;
private static Flow createScheduleFlow() {
Schedule schedule = Schedule.builder()
.id("hourly")
@@ -57,6 +59,7 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
void schedule() throws Exception {
// mock flow listeners
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
SchedulerExecutionStateInterface executionRepositorySpy = spy(this.executionState);
CountDownLatch queueCount = new CountDownLatch(4);
Flow flow = createScheduleFlow();
@@ -74,12 +77,22 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
.when(flowListenersServiceSpy)
.flows();
// mock the backfill execution is ended
doAnswer(invocation -> Optional.of(Execution.builder().state(new State().withState(State.Type.SUCCESS)).build()))
.when(executionRepositorySpy)
.findById(any(), any());
// start the worker as it execute polling triggers
Worker worker = new Worker(applicationContext, 8, null);
worker.run();
// scheduler
try (AbstractScheduler scheduler = new DefaultScheduler(
applicationContext,
flowListenersServiceSpy,
triggerState);
Worker worker = new TestMethodScopedWorker(applicationContext, 8, null)) {
executionRepositorySpy,
triggerState
)) {
// wait for execution
Runnable assertionStop = executionQueue.receive(SchedulerConditionTest.class, either -> {
Execution execution = either.getLeft();
@@ -96,8 +109,6 @@ class SchedulerConditionTest extends AbstractSchedulerTest {
scheduler.run();
queueCount.await(15, TimeUnit.SECONDS);
// needed for RetryingTest to work since there is no context cleaning between method => we have to clear assertion receiver manually
assertionStop.run();
assertThat(queueCount.getCount(), is(0L));
}

View File

@@ -35,6 +35,9 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
@Inject
private SchedulerTriggerStateInterface triggerState;
@Inject
private SchedulerExecutionState schedulerExecutionState;
@Inject
private FlowListeners flowListenersService;
@@ -167,6 +170,7 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
assertThat(queueCount.getCount(), is(0L));
assertThat(last.get(), notNullValue());
assertThat(last.get().getFlowRevision(), notNullValue());
assertThat(last.get().getState().getCurrent(), is(State.Type.FAILED));
}
}
@@ -187,6 +191,7 @@ public class SchedulerPollingTriggerTest extends AbstractSchedulerTest {
return new DefaultScheduler(
applicationContext,
flowListenersServiceSpy,
schedulerExecutionState,
triggerState
);
}

View File

@@ -35,6 +35,9 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
@Inject
protected SchedulerTriggerStateInterface triggerState;
@Inject
protected SchedulerExecutionStateInterface executionState;
@Inject
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
protected QueueInterface<LogEntry> logQueue;
@@ -62,10 +65,11 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
.truncatedTo(ChronoUnit.HOURS);
}
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
return new DefaultScheduler(
applicationContext,
flowListenersServiceSpy,
executionStateSpy,
triggerState
);
}
@@ -75,6 +79,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
void schedule() throws Exception {
// mock flow listeners
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
SchedulerExecutionStateInterface executionStateSpy = spy(this.executionState);
CountDownLatch queueCount = new CountDownLatch(6);
CountDownLatch invalidLogCount = new CountDownLatch(1);
Set<String> date = new HashSet<>();
@@ -109,7 +114,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
triggerState.create(trigger.toBuilder().triggerId("schedule-invalid").flowId(invalid.getId()).build());
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionStateSpy)) {
// wait for execution
Runnable assertionStop = executionQueue.receive(either -> {
Execution execution = either.getLeft();
@@ -169,7 +174,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
triggerState.create(trigger);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
scheduler.run();
Await.until(() -> {
@@ -203,7 +208,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
CountDownLatch queueCount = new CountDownLatch(1);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
// wait for execution
Runnable assertionStop = executionQueue.receive(either -> {
Execution execution = either.getLeft();
@@ -249,7 +254,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
CountDownLatch queueCount = new CountDownLatch(1);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
// wait for execution
Runnable assertionStop = executionQueue.receive(either -> {
Execution execution = either.getLeft();
@@ -293,7 +298,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
triggerState.create(lastTrigger);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
scheduler.run();
Await.until(() -> scheduler.isReady(), Duration.ofMillis(100), Duration.ofSeconds(5));
@@ -324,7 +329,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
.build();
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
scheduler.run();
Await.until(() -> {
@@ -389,7 +394,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
triggerState.create(trigger);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
scheduler.run();
// Wait 3s to see if things happen
@@ -427,7 +432,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
CountDownLatch queueCount = new CountDownLatch(2);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
// wait for execution
Runnable assertionStop = executionQueue.receive(either -> {
Execution execution = either.getLeft();
@@ -488,7 +493,7 @@ public class SchedulerScheduleTest extends AbstractSchedulerTest {
CountDownLatch queueCount = new CountDownLatch(1);
// scheduler
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy)) {
try (AbstractScheduler scheduler = scheduler(flowListenersServiceSpy, executionState)) {
// wait for execution
Runnable assertionStop = executionQueue.receive(either -> {
Execution execution = either.getLeft();

View File

@@ -15,14 +15,14 @@ import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.*;
public class SchedulerThreadTest extends AbstractSchedulerTest {
@Inject
@@ -31,6 +31,9 @@ public class SchedulerThreadTest extends AbstractSchedulerTest {
@Inject
protected SchedulerTriggerStateInterface triggerState;
@Inject
protected SchedulerExecutionStateInterface executionState;
public static Flow createThreadFlow() {
return createThreadFlow(null);
}
@@ -71,17 +74,23 @@ public class SchedulerThreadTest extends AbstractSchedulerTest {
// mock flow listeners
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);
SchedulerExecutionStateInterface schedulerExecutionStateSpy = spy(this.executionState);
doReturn(Collections.singletonList(flow))
.when(flowListenersServiceSpy)
.flows();
// mock the backfill execution is ended
doAnswer(invocation -> Optional.of(Execution.builder().state(new State().withState(State.Type.SUCCESS)).build()))
.when(schedulerExecutionStateSpy)
.findById(any(), any());
// scheduler
try (
AbstractScheduler scheduler = new DefaultScheduler(
applicationContext,
flowListenersServiceSpy,
schedulerExecutionStateSpy,
triggerState
);
Worker worker = new TestMethodScopedWorker(applicationContext, 8, null);

View File

@@ -30,7 +30,7 @@ class CollectorServiceTest {
try (ApplicationContext applicationContext = Helpers.applicationContext(properties).start()) {
CollectorService collectorService = applicationContext.getBean(CollectorService.class);
Usage metrics = collectorService.metrics();
Usage metrics = collectorService.metrics(true);
assertThat(metrics.getUri(), is("https://mysuperhost.com/subpath"));
@@ -45,6 +45,9 @@ class CollectorServiceTest {
assertThat(metrics.getHost().getOs().getFamily(), notNullValue());
assertThat(metrics.getConfigurations().getRepositoryType(), is("memory"));
assertThat(metrics.getConfigurations().getQueueType(), is("memory"));
assertThat(metrics.getExecutions(), notNullValue());
assertThat(metrics.getExecutions().getDailyExecutionsCount().size(), is(0));
assertThat(metrics.getExecutions().getDailyTaskRunsCount().size(), is(0));
assertThat(metrics.getInstanceUuid(), is(TestSettingRepository.instanceUuid));
}
}

View File

@@ -1,5 +1,7 @@
package io.kestra.core.services;
import io.kestra.core.models.flows.Type;
import io.kestra.core.models.flows.input.StringInput;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import io.kestra.core.models.flows.Flow;
@@ -115,4 +117,29 @@ class FlowServiceTest {
assertThat(warnings.size(), is(1));
assertThat(warnings.get(0), containsString("The system namespace is reserved for background workflows"));
}
@Test
void propertyRenamingDeprecation() {
Flow flow = Flow.builder()
.id("flowId")
.namespace("io.kestra.unittest")
.inputs(List.of(
StringInput.builder()
.id("inputWithId")
.type(Type.STRING)
.build(),
StringInput.builder()
.name("inputWithName")
.type(Type.STRING)
.build()
))
.tasks(Collections.singletonList(Return.builder()
.id("taskId")
.type(Return.class.getName())
.format("test")
.build()))
.build();
assertThat(flowService.deprecationPaths(flow), is(List.of("inputs[1].name")));
}
}

View File

@@ -1,7 +1,10 @@
package io.kestra.core.services;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.List;
@@ -14,8 +17,16 @@ class SkipExecutionServiceTest {
@Inject
private SkipExecutionService skipExecutionService;
@BeforeEach
void resetAll() {
skipExecutionService.setSkipExecutions(null);
skipExecutionService.setSkipFlows(null);
skipExecutionService.setSkipNamespaces(null);
skipExecutionService.setSkipTenants(null);
}
@Test
void test() {
void skipExecutionByExecutionId() {
var executionToSkip = "aaabbbccc";
var executionNotToSkip = "bbbcccddd";
@@ -24,4 +35,64 @@ class SkipExecutionServiceTest {
assertThat(skipExecutionService.skipExecution(executionToSkip), is(true));
assertThat(skipExecutionService.skipExecution(executionNotToSkip), is(false));
}
@Test
void skipExecutionByExecution() {
var executionToSkip = Execution.builder().id("skip").build();
var executionToSkipByFlow = Execution.builder().id("id").namespace("namespace").flowId("skip").build();
skipExecutionService.setSkipExecutions(List.of("skip"));
skipExecutionService.setSkipFlows(List.of("namespace|skip"));
assertThat(skipExecutionService.skipExecution(executionToSkip), is(true));
assertThat(skipExecutionService.skipExecution(executionToSkipByFlow), is(true));
}
@Test
void skipExecutionByTaskRun() {
var taskRunToSkip = TaskRun.builder().executionId("skip").build();
var taskRunToSkipByFlow = TaskRun.builder().id("id").namespace("namespace").flowId("skip").executionId("keep").build();
skipExecutionService.setSkipExecutions(List.of("skip"));
skipExecutionService.setSkipFlows(List.of("namespace|skip"));
assertThat(skipExecutionService.skipExecution(taskRunToSkip), is(true));
assertThat(skipExecutionService.skipExecution(taskRunToSkipByFlow), is(true));
}
@Test
void skipExecutionByFlowId() {
var flowToSkip = "namespace|skip";
var flowToSkipWithTenant = "tenant|namespace|skip";
skipExecutionService.setSkipFlows(List.of(flowToSkip, flowToSkipWithTenant));
assertThat(skipExecutionService.skipExecution(null, "namespace", "skip", "random"), is(true));
assertThat(skipExecutionService.skipExecution(null, "wrong", "skip", "random"), is(false));
assertThat(skipExecutionService.skipExecution("tenant", "namespace", "skip", "random"), is(true));
assertThat(skipExecutionService.skipExecution("wrong", "namespace", "skip", "random"), is(false));
assertThat(skipExecutionService.skipExecution(null, "namespace", "not_skipped", "random"), is(false));
assertThat(skipExecutionService.skipExecution("tenant", "namespace", "not_skipped", "random"), is(false));
}
@Test
void skipExecutionByNamespace() {
skipExecutionService.setSkipNamespaces(List.of("tenant|namespace"));
assertThat(skipExecutionService.skipExecution("tenant", "namespace", "someFlow", "someExecution"), is(true));
assertThat(skipExecutionService.skipExecution(null, "namespace", "someFlow", "someExecution"), is(false));
assertThat(skipExecutionService.skipExecution("anotherTenant", "namespace", "someFlow", "someExecution"), is(false));
assertThat(skipExecutionService.skipExecution("tenant", "namespace", "anotherFlow", "anotherExecution"), is(true));
assertThat(skipExecutionService.skipExecution("tenant", "other.namespace", "someFlow", "someExecution"), is(false));
}
@Test
void skipExecutionByTenantId() {
skipExecutionService.setSkipTenants(List.of("tenant"));
assertThat(skipExecutionService.skipExecution("tenant", "namespace", "someFlow", "someExecution"), is(true));
assertThat(skipExecutionService.skipExecution("anotherTenant", "namespace", "someFlow", "someExecution"), is(false));
assertThat(skipExecutionService.skipExecution("tenant", "another.namespace", "someFlow", "someExecution"), is(true));
assertThat(skipExecutionService.skipExecution("anotherTenant", "another.namespace", "someFlow", "someExecution"), is(false));
}
}

View File

@@ -9,6 +9,7 @@ import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
@MicronautTest
class InputTest {
@@ -25,4 +26,26 @@ class InputTest {
assertThat(modelValidator.isValid(validInput).isEmpty(), is(true));
}
@Test
void inputNameDeprecation() {
String id = "test";
StringInput validInput = StringInput.builder()
.id(id)
.type(Type.STRING)
.build();
assertThat(validInput.getId(), is(id));
assertThat(validInput.getName(), nullValue());
String newName = "newName";
validInput = StringInput.builder()
.type(Type.STRING)
.build();
validInput.setName(newName);
assertThat(validInput.getId(), is(newName));
assertThat(validInput.getName(), is(newName));
}
}

View File

@@ -0,0 +1,40 @@
package io.kestra.core.validations;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.TaskDefault;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.serializers.YamlFlowParser;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import jakarta.validation.ConstraintViolationException;
import org.junit.jupiter.api.Test;
import java.io.File;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
@MicronautTest
class TaskDefaultValidationTest {
@Inject
private ModelValidator modelValidator;
@Test
void nullValue() {
TaskDefault taskDefault = TaskDefault.builder()
.type("io.kestra.tests")
.build();
Optional<ConstraintViolationException> validate = modelValidator.isValid(taskDefault);
assertThat(validate.isPresent(), is(true));
}
}

View File

@@ -0,0 +1,20 @@
id: restart_pause_last_failed
namespace: io.kestra.tests
tasks:
- id: a
type: io.kestra.core.tasks.log.Log
message: "{{ task.id }}"
- id: b
type: io.kestra.core.tasks.log.Log
message: "{{ task.id }}"
- id: pause
type: io.kestra.core.tasks.flows.Pause
delay: PT1S
tasks:
- id: c
type: io.kestra.core.tasks.log.Log
message: "{{taskrun.attemptsCount == 1 ? 'ok' : ko}}"
- id: d
type: io.kestra.core.tasks.log.Log
message: "{{ task.id }}"

View File

@@ -41,3 +41,8 @@ tasks:
- id: default
type: io.kestra.core.tasks.debugs.Return
format: "{{task.id}} > {{ inputs.def }} > {{taskrun.startDate}}"
triggers:
- id: schedule
type: io.kestra.core.models.triggers.types.Schedule
cron: "0 * * * *"

View File

@@ -21,4 +21,9 @@ tasks:
labels:
launchTaskLabel: launchFoo
outputs:
extracted: "{{ outputs.default.value ?? outputs['error-t1'].value }}"
extracted: "{{ outputs.default.value ?? outputs['error-t1'].value }}"
triggers:
- id: schedule
type: io.kestra.core.models.triggers.types.Schedule
cron: "0 * * * *"

View File

@@ -1,4 +1,4 @@
version=0.15.0
version=0.15.38
jacksonVersion=2.16.1
micronautVersion=4.3.4
@@ -7,4 +7,5 @@ slf4jVersion=2.0.12
org.gradle.parallel=true
org.gradle.caching=true
org.gradle.priority=low
org.gradle.priority=low

View File

@@ -2,12 +2,13 @@ package io.kestra.schedulers.h2;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
import io.kestra.core.schedulers.SchedulerScheduleTest;
import io.kestra.jdbc.runner.JdbcScheduler;
class H2SchedulerScheduleTest extends SchedulerScheduleTest {
@Override
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
return new JdbcScheduler(
applicationContext,
flowListenersServiceSpy

View File

@@ -7,9 +7,14 @@ import org.jooq.*;
import org.jooq.Record;
import org.jooq.impl.DSL;
import java.util.List;
import java.util.*;
import java.util.stream.Collectors;
public class MysqlQueue<T> extends JdbcQueue<T> {
// TODO - remove once 'queue' table is re-designed
private static final MysqlQueueConsumers QUEUE_CONSUMERS = new MysqlQueueConsumers();
public MysqlQueue(Class<T> cls, ApplicationContext applicationContext) {
super(cls, applicationContext);
}
@@ -54,7 +59,7 @@ public class MysqlQueue<T> extends JdbcQueue<T> {
.where(AbstractJdbcRepository.field("type").eq(this.cls.getName()))
.and(DSL.or(List.of(
AbstractJdbcRepository.field("consumers").isNull(),
DSL.condition("NOT(FIND_IN_SET(?, consumers) > 0)", queueType)
AbstractJdbcRepository.field("consumers").in(QUEUE_CONSUMERS.allForConsumerNotIn(queueType))
)));
if (consumerGroup != null) {
@@ -90,4 +95,44 @@ public class MysqlQueue<T> extends JdbcQueue<T> {
update.execute();
}
private static final class MysqlQueueConsumers {
private static final Set<String> CONSUMERS;
static {
CONSUMERS = new HashSet<>();
String[] elements = {"indexer", "executor", "worker", "scheduler"};
List<String> results = new ArrayList<>();
// Generate all combinations and their permutations
generateCombinations(elements, new boolean[elements.length], new ArrayList<>(), results);
CONSUMERS.addAll(results);
}
public Set<String> allForConsumerNotIn(String consumer) {
return CONSUMERS.stream().filter(s -> !s.contains(consumer)).collect(Collectors.toSet());
}
private static void generateCombinations(String[] elements, boolean[] used, List<String> current, List<String> results) {
if (!current.isEmpty()) {
results.add(String.join(",", current));
}
for (int i = 0; i < elements.length; i++) {
if (!used[i]) {
used[i] = true;
current.add(elements[i]);
generateCombinations(elements, used, current, results);
if (current.isEmpty()) {
throw new NoSuchElementException();
} else {
current.remove(current.size() - 1);
}
used[i] = false;
}
}
}
}
}

View File

@@ -2,12 +2,13 @@ package io.kestra.schedulers.mysql;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
import io.kestra.core.schedulers.SchedulerScheduleTest;
import io.kestra.jdbc.runner.JdbcScheduler;
class MysqlSchedulerScheduleTest extends SchedulerScheduleTest {
@Override
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
return new JdbcScheduler(
applicationContext,
flowListenersServiceSpy

View File

@@ -2,12 +2,13 @@ package io.kestra.schedulers.postgres;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
import io.kestra.core.schedulers.SchedulerScheduleTest;
import io.kestra.jdbc.runner.JdbcScheduler;
class PostgresSchedulerScheduleTest extends SchedulerScheduleTest {
@Override
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy) {
protected AbstractScheduler scheduler(FlowListeners flowListenersServiceSpy, SchedulerExecutionStateInterface executionStateSpy) {
return new JdbcScheduler(
applicationContext,
flowListenersServiceSpy

View File

@@ -169,7 +169,6 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
Trigger current = optionalTrigger.get();
current = current.toBuilder()
.executionId(trigger.getExecutionId())
.executionCurrentState(trigger.getExecutionCurrentState())
.updatedDate(trigger.getUpdatedDate())
.build();
this.save(context, current);

View File

@@ -291,7 +291,7 @@ public class JdbcExecutor implements ExecutorInterface {
workerJobRunningRepository.getWorkerJobWithWorkerDead(context, workersToDeleteUuids)
.forEach(workerJobRunning -> {
if (workerJobRunning instanceof WorkerTaskRunning workerTaskRunning) {
if (skipExecutionService.skipExecution(workerTaskRunning.getTaskRun().getExecutionId())) {
if (skipExecutionService.skipExecution(workerTaskRunning.getTaskRun())) {
// if the execution is skipped, we remove the workerTaskRunning and skip its resubmission
log.warn("Skipping execution {}", workerTaskRunning.getTaskRun().getId());
workerJobRunningRepository.deleteByKey(workerTaskRunning.uid());
@@ -345,7 +345,7 @@ public class JdbcExecutor implements ExecutorInterface {
}
Execution message = either.getLeft();
if (skipExecutionService.skipExecution(message.getId())) {
if (skipExecutionService.skipExecution(message)) {
log.warn("Skipping execution {}", message.getId());
return;
}
@@ -523,7 +523,7 @@ public class JdbcExecutor implements ExecutorInterface {
}
WorkerTaskResult message = either.getLeft();
if (skipExecutionService.skipExecution(message.getTaskRun().getExecutionId())) {
if (skipExecutionService.skipExecution(message.getTaskRun())) {
log.warn("Skipping execution {}", message.getTaskRun().getExecutionId());
return;
}
@@ -609,7 +609,7 @@ public class JdbcExecutor implements ExecutorInterface {
log.warn("Skipping execution {}", message.getExecutionId());
return;
}
if (skipExecutionService.skipExecution(message.getParentTaskRun().getExecutionId())) {
if (skipExecutionService.skipExecution(message.getParentTaskRun())) {
log.warn("Skipping execution {}", message.getParentTaskRun().getExecutionId());
return;
}

View File

@@ -32,10 +32,10 @@ import java.util.function.BiConsumer;
public class JdbcScheduler extends AbstractScheduler {
private final QueueInterface<Execution> executionQueue;
private final TriggerRepositoryInterface triggerRepository;
private final ConditionService conditionService;
private final FlowRepositoryInterface flowRepository;
private final JooqDSLContextWrapper dslContextWrapper;
private final ConditionService conditionService;
@SuppressWarnings("unchecked")
@@ -49,6 +49,7 @@ public class JdbcScheduler extends AbstractScheduler {
executionQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.EXECUTION_NAMED));
triggerRepository = applicationContext.getBean(AbstractJdbcTriggerRepository.class);
triggerState = applicationContext.getBean(SchedulerTriggerStateInterface.class);
executionState = applicationContext.getBean(SchedulerExecutionState.class);
conditionService = applicationContext.getBean(ConditionService.class);
flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
dslContextWrapper = applicationContext.getBean(JooqDSLContextWrapper.class);
@@ -58,6 +59,7 @@ public class JdbcScheduler extends AbstractScheduler {
public void run() {
super.run();
// reset scheduler trigger at end
executionQueue.receive(
Scheduler.class,
either -> {
@@ -76,14 +78,6 @@ public class JdbcScheduler extends AbstractScheduler {
.ifPresent(trigger -> {
this.triggerState.update(trigger.resetExecution(execution.getState().getCurrent()));
});
} else {
// update execution state on each state change so the scheduler knows the execution is running
triggerRepository
.findByExecution(execution)
.filter(trigger -> execution.getState().getCurrent() != trigger.getExecutionCurrentState())
.ifPresent(trigger -> {
((JdbcSchedulerTriggerState) this.triggerState).updateExecution(Trigger.of(execution, trigger));
});
}
}
}
@@ -106,7 +100,7 @@ public class JdbcScheduler extends AbstractScheduler {
public void handleNext(List<Flow> flows, ZonedDateTime now, BiConsumer<List<Trigger>, ScheduleContextInterface> consumer) {
JdbcSchedulerContext schedulerContext = new JdbcSchedulerContext(this.dslContextWrapper);
schedulerContext.startTransaction(scheduleContextInterface -> {
schedulerContext.doInTransaction(scheduleContextInterface -> {
List<Trigger> triggers = this.triggerState.findByNextExecutionDateReadyForAllTenants(now, scheduleContextInterface);
consumer.accept(triggers, scheduleContextInterface);

View File

@@ -18,17 +18,14 @@ public class JdbcSchedulerContext implements ScheduleContextInterface {
this.dslContextWrapper = dslContextWrapper;
}
public void startTransaction(Consumer<ScheduleContextInterface> consumer) {
@Override
public void doInTransaction(Consumer<ScheduleContextInterface> consumer) {
this.dslContextWrapper.transaction(configuration -> {
this.context = DSL.using(configuration);
consumer.accept(this);
this.commit();
this.context.commit();
});
}
public void commit() {
this.context.commit();
}
}

View File

@@ -54,6 +54,18 @@ public class JdbcSchedulerTriggerState implements SchedulerTriggerStateInterface
return trigger;
}
@Override
public Trigger create(Trigger trigger, String headerContent) {
return this.triggerRepository.create(trigger);
}
@Override
public Trigger save(Trigger trigger, ScheduleContextInterface scheduleContextInterface, String headerContent) {
this.triggerRepository.save(trigger, scheduleContextInterface);
return trigger;
}
@Override
public Trigger create(Trigger trigger) {

View File

@@ -65,6 +65,21 @@ public class MemoryTriggerRepository implements TriggerRepositoryInterface {
@Override
public ArrayListTotal<Trigger> find(Pageable from, String query, String tenantId, String namespace, String flowId) {
throw new UnsupportedOperationException();
List<Trigger> filteredTriggers = triggers.stream().filter(trigger -> {
if (tenantId != null && !tenantId.equals(trigger.getTenantId())) {
return false;
}
if (namespace != null && !namespace.equals(trigger.getNamespace())) {
return false;
}
if (flowId != null && !flowId.equals(trigger.getFlowId())) {
return false;
}
return true;
}).toList();
return new ArrayListTotal<>(filteredTriggers, filteredTriggers.size());
}
}

View File

@@ -131,7 +131,7 @@ public class MemoryExecutor implements ExecutorInterface {
}
Execution message = either.getLeft();
if (skipExecutionService.skipExecution(message.getId())) {
if (skipExecutionService.skipExecution(message)) {
log.warn("Skipping execution {}", message.getId());
return;
}
@@ -365,7 +365,7 @@ public class MemoryExecutor implements ExecutorInterface {
WorkerTaskResult message = either.getLeft();
if (skipExecutionService.skipExecution(message.getTaskRun().getExecutionId())) {
if (skipExecutionService.skipExecution(message.getTaskRun())) {
log.warn("Skipping execution {}", message.getTaskRun().getExecutionId());
return;
}
@@ -422,7 +422,7 @@ public class MemoryExecutor implements ExecutorInterface {
log.warn("Skipping execution {}", message.getExecutionId());
return;
}
if (skipExecutionService.skipExecution(message.getParentTaskRun().getExecutionId())) {
if (skipExecutionService.skipExecution(message.getParentTaskRun())) {
log.warn("Skipping execution {}", message.getParentTaskRun().getExecutionId());
return;
}

View File

@@ -56,6 +56,22 @@ public class MemorySchedulerTriggerState implements SchedulerTriggerStateInterfa
return trigger;
}
@Override
public Trigger save(Trigger trigger, ScheduleContextInterface scheduleContextInterface, String headerContent) {
triggers.put(trigger.uid(), trigger);
triggerQueue.emit(trigger);
return trigger;
}
@Override
public Trigger create(Trigger trigger, String headerContent) {
triggers.put(trigger.uid(), trigger);
triggerQueue.emit(trigger);
return trigger;
}
@Override
public Trigger update(Trigger trigger) {
triggers.put(trigger.uid(), trigger);

75
ui/package-lock.json generated
View File

@@ -8,7 +8,7 @@
"name": "kestra",
"version": "0.1.0",
"dependencies": {
"@kestra-io/ui-libs": "^0.0.36",
"@kestra-io/ui-libs": "^0.0.39",
"@popperjs/core": "npm:@sxzz/popperjs-es@2.11.7",
"@vue-flow/background": "^1.2.0",
"@vue-flow/controls": "1.0.6",
@@ -121,6 +121,7 @@
"cpu": [
"x64"
],
"dev": true,
"optional": true,
"os": [
"linux"
@@ -299,9 +300,9 @@
"integrity": "sha512-eF2rxCRulEKXHTRiDrDy6erMYWqNw4LPdQ8UQA4huuxaQsVeRPFl2oM8oDGxMFhJUWZf9McpLtJasDDZb/Bpeg=="
},
"node_modules/@kestra-io/ui-libs": {
"version": "0.0.36",
"resolved": "https://registry.npmjs.org/@kestra-io/ui-libs/-/ui-libs-0.0.36.tgz",
"integrity": "sha512-yJJa0+tVlcWVllMVHoFQVrWzR7nIyF/+6aN8u+OPnMaHR0zSUx9MwaxF6u/YYPoBw6J4zq4ysn3pspq/DGB4ag==",
"version": "0.0.39",
"resolved": "https://registry.npmjs.org/@kestra-io/ui-libs/-/ui-libs-0.0.39.tgz",
"integrity": "sha512-uX5Iqio6Ni6woUDuuEPP2fkImP6Y041FGR88Lt0Q6gfCEhxs4yW7WqCWpyvYRBHz4FT6Pb/qYscTIlXcYZIWKA==",
"peerDependencies": {
"@vue-flow/background": "^1.2.0",
"@vue-flow/controls": "1.0.6",
@@ -494,12 +495,6 @@
"integrity": "sha512-Hr5Jfhc9eYOQNPYO5WLDq/n4jqijdHNlDXjuAQkkt+mWdQR+XJToOHrsD4cPaMXpn6KO7y2+wM8AZEs8VpBLVA==",
"dev": true
},
"node_modules/@types/linkify-it": {
"version": "3.0.5",
"resolved": "https://registry.npmjs.org/@types/linkify-it/-/linkify-it-3.0.5.tgz",
"integrity": "sha512-yg6E+u0/+Zjva+buc3EIb+29XEg4wltq7cSmd4Uc2EE/1nUVmxyzpX6gUXD0V8jIrG0r7YeOGVIbYRkxeooCtw==",
"peer": true
},
"node_modules/@types/lodash": {
"version": "4.14.198",
"resolved": "https://registry.npmjs.org/@types/lodash/-/lodash-4.14.198.tgz",
@@ -513,22 +508,6 @@
"@types/lodash": "*"
}
},
"node_modules/@types/markdown-it": {
"version": "13.0.7",
"resolved": "https://registry.npmjs.org/@types/markdown-it/-/markdown-it-13.0.7.tgz",
"integrity": "sha512-U/CBi2YUUcTHBt5tjO2r5QV/x0Po6nsYwQU4Y04fBS6vfoImaiZ6f8bi3CjTCxBPQSO1LMyUqkByzi8AidyxfA==",
"peer": true,
"dependencies": {
"@types/linkify-it": "*",
"@types/mdurl": "*"
}
},
"node_modules/@types/mdurl": {
"version": "1.0.5",
"resolved": "https://registry.npmjs.org/@types/mdurl/-/mdurl-1.0.5.tgz",
"integrity": "sha512-6L6VymKTzYSrEf4Nev4Xa1LCHKrlTlYCBMTlQKFuddo1CvQcE52I0mwfOJayueUC7MJuXOeHTcIU683lzd0cUA==",
"peer": true
},
"node_modules/@types/minimatch": {
"version": "5.1.2",
"resolved": "https://registry.npmjs.org/@types/minimatch/-/minimatch-5.1.2.tgz",
@@ -2222,13 +2201,14 @@
}
},
"node_modules/es5-ext": {
"version": "0.10.62",
"resolved": "https://registry.npmjs.org/es5-ext/-/es5-ext-0.10.62.tgz",
"integrity": "sha512-BHLqn0klhEpnOKSrzn/Xsz2UIW8j+cGmo9JLzr8BiUapV8hPL9+FliFqjwr9ngW7jWdnxv6eO+/LqyhJVqgrjA==",
"version": "0.10.64",
"resolved": "https://registry.npmjs.org/es5-ext/-/es5-ext-0.10.64.tgz",
"integrity": "sha512-p2snDhiLaXe6dahss1LddxqEm+SkuDvV8dnIQG0MWjyHpcMNfXKPE+/Cc0y+PhxJX3A4xGNeFCj5oc0BUh6deg==",
"hasInstallScript": true,
"dependencies": {
"es6-iterator": "^2.0.3",
"es6-symbol": "^3.1.3",
"esniff": "^2.0.1",
"next-tick": "^1.1.0"
},
"engines": {
@@ -2258,6 +2238,7 @@
"version": "0.19.8",
"resolved": "https://registry.npmjs.org/esbuild/-/esbuild-0.19.8.tgz",
"integrity": "sha512-l7iffQpT2OrZfH2rXIp7/FkmaeZM0vxbxN9KfiCwGYuZqzMg/JdvX26R31Zxn/Pxvsrg3Y9N6XTcnknqDyyv4w==",
"dev": true,
"hasInstallScript": true,
"bin": {
"esbuild": "bin/esbuild"
@@ -2460,6 +2441,25 @@
"url": "https://opencollective.com/eslint"
}
},
"node_modules/esniff": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/esniff/-/esniff-2.0.1.tgz",
"integrity": "sha512-kTUIGKQ/mDPFoJ0oVfcmyJn4iBDRptjNVIzwIFR7tqWXdVI9xfA2RMwY/gbSpJG3lkdWNEjLap/NqVHZiJsdfg==",
"dependencies": {
"d": "^1.0.1",
"es5-ext": "^0.10.62",
"event-emitter": "^0.3.5",
"type": "^2.7.2"
},
"engines": {
"node": ">=0.10"
}
},
"node_modules/esniff/node_modules/type": {
"version": "2.7.2",
"resolved": "https://registry.npmjs.org/type/-/type-2.7.2.tgz",
"integrity": "sha512-dzlvlNlt6AXU7EBSfpAscydQ7gXB+pPGsPnfJnZpiNJBDj7IaJzQlBZYGdEi4R9HmPdBv2XmWJ6YUtoTa7lmCw=="
},
"node_modules/espree": {
"version": "9.6.1",
"resolved": "https://registry.npmjs.org/espree/-/espree-9.6.1.tgz",
@@ -2536,6 +2536,15 @@
"node": ">=0.10.0"
}
},
"node_modules/event-emitter": {
"version": "0.3.5",
"resolved": "https://registry.npmjs.org/event-emitter/-/event-emitter-0.3.5.tgz",
"integrity": "sha512-D9rRn9y7kLPnJ+hMq7S/nhvoKwwvVJahBi2BPmx3bvbsEdK3W9ii8cBSGjP+72/LnM4n6fo3+dkCX5FeTQruXA==",
"dependencies": {
"d": "1",
"es5-ext": "~0.10.14"
}
},
"node_modules/execa": {
"version": "7.2.0",
"resolved": "https://registry.npmjs.org/execa/-/execa-7.2.0.tgz",
@@ -2717,9 +2726,9 @@
"dev": true
},
"node_modules/follow-redirects": {
"version": "1.15.4",
"resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.4.tgz",
"integrity": "sha512-Cr4D/5wlrb0z9dgERpUL3LrmPKVDsETIJhaCMeDfuFYcqa5bldGV6wBsAN6X/vxlXQtFBMrXdXxdL8CbDTGniw==",
"version": "1.15.6",
"resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.6.tgz",
"integrity": "sha512-wWN62YITEaOpSK584EZXJafH1AGpO8RVgElfkuXbTOrPX4fIfOyEpW/CsiNd8JdYrAoOvafRTOEnvsO++qCqFA==",
"funding": [
{
"type": "individual",
@@ -5176,7 +5185,7 @@
"version": "5.3.3",
"resolved": "https://registry.npmjs.org/typescript/-/typescript-5.3.3.tgz",
"integrity": "sha512-pXWcraxM0uxAS+tN0AG/BF2TyqmHO014Z070UsJ+pFvYuRSq8KH8DmWpnbXe0pEPDHXZV3FcAbJkijJ5oNEnWw==",
"devOptional": true,
"dev": true,
"bin": {
"tsc": "bin/tsc",
"tsserver": "bin/tsserver"

View File

@@ -12,7 +12,7 @@
"lint": "eslint . --ext .vue,.js,.jsx,.cjs,.mjs --fix"
},
"dependencies": {
"@kestra-io/ui-libs": "^0.0.36",
"@kestra-io/ui-libs": "^0.0.39",
"@popperjs/core": "npm:@sxzz/popperjs-es@2.11.7",
"@vue-flow/background": "^1.2.0",
"@vue-flow/controls": "1.0.6",

View File

@@ -80,23 +80,23 @@
&& isSecurityAdviceEnable) {
const checked = ref(false);
ElMessageBox({
title: "Your data is not secured",
title: this.$t("security_advice.title"),
message: () => {
return h("div", null, [
h("p", null, "Don't lose one bit. Enable our free security features"),
h("p", null, this.$t("security_advice.content")),
h(ElSwitch, {
modelValue: checked.value,
"onUpdate:modelValue": (val) => {
checked.value = val
localStorage.setItem("security.advice.show", `${!val}`)
},
activeText: "Don't show again"
activeText: this.$t("security_advice.switch_text")
}),
])
},
showCancelButton: true,
confirmButtonText: "Enabled security",
cancelButtonText: "Dismiss",
confirmButtonText: this.$t("security_advice.enable"),
cancelButtonText: this.$t("cancel"),
center: false,
showClose: false,
}).then(() => {

View File

@@ -0,0 +1,61 @@
<template>
<el-drawer
:model-value="props.modelValue"
@update:model-value="emit('update:modelValue', $event)"
destroy-on-close
lock-scroll
size=""
:append-to-body="true"
:class="{'full-screen': fullScreen}"
ref="editorDomElement"
>
<template #header>
<span>
{{ title }}
<slot name="header" />
</span>
<el-button link class="full-screen">
<Fullscreen :title="$t('toggle fullscreen')" @click="toggleFullScreen" />
</el-button>
</template>
<template #footer>
<slot name="footer" />
</template>
<template #default>
<slot />
</template>
</el-drawer>
</template>
<script setup>
import {ref} from "vue";
import Fullscreen from "vue-material-design-icons/Fullscreen.vue"
const props = defineProps({
modelValue: {
type: Boolean,
required: true
},
title: {
type: String,
required: false,
default: undefined
},
});
const emit = defineEmits(["update:modelValue"])
const fullScreen = ref(false);
const toggleFullScreen = () => {
fullScreen.value = !fullScreen.value;
}
</script>
<style scoped lang="scss">
button.full-screen {
font-size: 24px;
}
</style>

View File

@@ -99,7 +99,7 @@
title: this.title || "Error",
message: h("div", children),
position: "bottom-right",
type: "error",
type: this.message.variant,
duration: 0,
dangerouslyUseHTMLString: true,
customClass: "error-notification" + (children.length > 1 ? " large" : "")

View File

@@ -1,7 +1,7 @@
<template>
<el-tabs class="router-link" :class="{top: top}" v-model="activeName">
<el-tab-pane
v-for="tab in tabs"
v-for="tab in tabs.filter(t => !t.hidden)"
:key="tab.name"
:label="tab.title"
:name="tab.name || 'default'"
@@ -16,7 +16,7 @@
</el-tab-pane>
</el-tabs>
<section :class="containerClass">
<section v-bind="$attrs" :class="containerClass">
<component
v-bind="{...activeTab.props, ...attrsWithoutClass}"
v-on="activeTab['v-on'] ?? {}"
@@ -104,7 +104,7 @@
return {[this.activeTab.containerClass] : true};
}
return {"container" : true}
return {"container" : true, "mt-4": true};
},
activeTab() {
return this.tabs

View File

@@ -77,12 +77,6 @@
</router-link>
</template>
</el-table-column>
<el-table-column :label="$t('state')">
<template #default="scope">
<status v-if="scope.row.executionCurrentState" :status="scope.row.executionCurrentState" size="small" />
</template>
</el-table-column>
<el-table-column :label="$t('date')">
<template #default="scope">
<date-ago :inverted="true" :date="scope.row.date" />
@@ -120,6 +114,7 @@
<el-table-column column-key="disable" class-name="row-action">
<template #default="scope">
<el-switch
v-if="!scope.row.missingSource"
size="small"
:active-text="$t('enabled')"
:model-value="!scope.row.disabled"
@@ -127,6 +122,9 @@
class="switch-text"
:active-action-icon="Check"
/>
<el-tooltip v-else :content="'flow source not found'">
<AlertCircle class="trigger-issue-icon" />
</el-tooltip>
</template>
</el-table-column>
</el-table>
@@ -154,6 +152,7 @@
import action from "../../models/action";
import TopNavBar from "../layout/TopNavBar.vue";
import Check from "vue-material-design-icons/Check.vue";
import AlertCircle from "vue-material-design-icons/AlertCircle.vue";
</script>
<script>
import NamespaceSelect from "../namespace/NamespaceSelect.vue";
@@ -166,7 +165,6 @@
import RefreshButton from "../layout/RefreshButton.vue";
import DateAgo from "../layout/DateAgo.vue";
import Id from "../Id.vue";
import Status from "../Status.vue";
import {mapState} from "vuex";
export default {
@@ -178,7 +176,6 @@
SearchField,
NamespaceSelect,
DateAgo,
Status,
Id,
},
data() {
@@ -251,9 +248,21 @@
},
triggersMerged() {
return this.triggers.map(triggers => {
return {...triggers.abstractTrigger, ...triggers.triggerContext, codeDisabled: triggers.abstractTrigger.disabled}
return {
...triggers?.abstractTrigger,
...triggers.triggerContext,
codeDisabled: triggers?.abstractTrigger?.disabled,
// if we have no abstract trigger, it means that flow or trigger definition hasn't been found
missingSource: !triggers.abstractTrigger
}
})
}
}
};
</script>
</script>
<style>
.trigger-issue-icon{
color: var(--bs-warning);
font-size: 1.4em;
}
</style>

View File

@@ -28,13 +28,9 @@
</el-form-item>
</collapse>
<el-drawer
<drawer
v-if="isModalOpen"
v-model="isModalOpen"
destroy-on-close
lock-scroll
:append-to-body="true"
size=""
:title="$t('eval.title')"
>
<template #footer>
@@ -52,7 +48,7 @@
<p><strong>{{ debugError }}</strong></p>
<pre class="mb-0">{{ debugStackTrace }}</pre>
</el-alert>
</el-drawer>
</drawer>
<el-table
:data="outputsPaginated"
@@ -99,6 +95,7 @@
import Pagination from "../layout/Pagination.vue";
import {apiUrl} from "override/utils/route";
import SubFlowLink from "../flows/SubFlowLink.vue";
import Drawer from "../Drawer.vue";
export default {
components: {
@@ -107,6 +104,7 @@
VarValue,
Editor,
Collapse,
Drawer
},
data() {
return {

View File

@@ -5,13 +5,13 @@
<li v-if="isAllowedEdit">
<a :href="`${finalApiUrl}/executions/${execution.id}`" target="_blank">
<el-button :icon="Api">
{{ $t('api') }}
{{ $t("api") }}
</el-button>
</a>
</li>
<li v-if="canDelete">
<el-button :icon="Delete" @click="deleteExecution">
{{ $t('delete') }}
{{ $t("delete") }}
</el-button>
</li>
<li v-if="isAllowedEdit">
@@ -26,8 +26,13 @@
</template>
</top-nav-bar>
<template v-if="ready">
<tabs :route-name="$route.params && $route.params.id ? 'executions/update': ''" @follow="follow" :tabs="tabs" />
<tabs
:route-name="$route.params && $route.params.id ? 'executions/update': ''"
@follow="follow"
:tabs="tabs"
/>
</template>
<div v-else class="full-space" v-loading="!ready" />
</template>
<script setup>
@@ -71,7 +76,7 @@
this.follow();
window.addEventListener("popstate", this.follow)
},
mounted () {
mounted() {
this.previousExecutionId = this.$route.params.id
},
watch: {
@@ -96,24 +101,41 @@
self.closeSSE();
}
let execution = JSON.parse(event.data);
// we are receiving a first "fake" event to force initializing the connection: ignoring it
if (event.lastEventId !== "start") {
let execution = JSON.parse(event.data);
if (!this.flow ||
execution.flowId !== this.flow.id ||
execution.namespace !== this.flow.namespace ||
execution.flowRevision !== this.flow.revision
) {
this.$store.dispatch(
"flow/loadFlow",
{namespace: execution.namespace, id: execution.flowId, revision: execution.flowRevision}
);
this.$store.dispatch("flow/loadRevisions", {
namespace: execution.namespace,
id: execution.flowId
})
if (!this.flow ||
execution.flowId !== this.flow.id ||
execution.namespace !== this.flow.namespace ||
execution.flowRevision !== this.flow.revision
) {
this.$store.dispatch(
"flow/loadFlow",
{
namespace: execution.namespace,
id: execution.flowId,
revision: execution.flowRevision
}
);
this.$store.dispatch("flow/loadRevisions", {
namespace: execution.namespace,
id: execution.flowId
})
}
this.$store.commit("execution/setExecution", execution);
}
this.$store.commit("execution/setExecution", execution);
}
// sse.onerror doesnt return the details of the error
// but as our emitter can only throw an error on 404
// we can safely assume that the error
this.sse.onerror = () => {
this.$store.dispatch("core/showMessage", {
variant: "error",
title: this.$t("error"),
message: this.$t("errors.404.flow or execution"),
});
}
});
},
@@ -159,12 +181,14 @@
];
},
editFlow() {
this.$router.push({name:"flows/update", params: {
namespace: this.$route.params.namespace,
id: this.$route.params.flowId,
tab: "editor",
tenant: this.$route.params.tenant
}})
this.$router.push({
name: "flows/update", params: {
namespace: this.$route.params.namespace,
id: this.$route.params.flowId,
tab: "editor",
tenant: this.$route.params.tenant
}
})
},
deleteExecution() {
if (this.execution) {
@@ -268,3 +292,8 @@
}
};
</script>
<style>
.full-space {
flex: 1 1 auto;
}
</style>

View File

@@ -123,7 +123,7 @@
stripe
table-layout="auto"
fixed
@row-dblclick="onRowDoubleClick"
@row-dblclick="row => onRowDoubleClick(executionParams(row))"
@sort-change="onSort"
@selection-change="handleSelectionChange"
:selectable="!hidden?.includes('selection') && canCheck"
@@ -184,7 +184,7 @@
:label="$t('id')"
>
<template #default="scope">
<id :value="scope.row.id" :shrink="true" @click="onRowDoubleClick(scope.row)" />
<id :value="scope.row.id" :shrink="true" @click="onRowDoubleClick(executionParams(scope.row))" />
</template>
</el-table-column>
@@ -568,6 +568,13 @@
}
},
methods: {
executionParams(row) {
return {
namespace: row.namespace,
flowId: row.flowId,
id: row.id
}
},
onDisplayColumnsChange(event) {
localStorage.setItem(this.storageKey, event);
this.displayColumns = event;

View File

@@ -2,16 +2,12 @@
<el-button size="small" type="primary" :icon="EyeOutline" @click="getFilePreview">
Preview
</el-button>
<el-drawer
<drawer
v-if="selectedPreview === value && filePreview"
v-model="isPreviewOpen"
destroy-on-close
lock-scroll
size=""
:append-to-body="true"
>
<template #header>
<h3>{{ $t("preview") }}</h3>
{{ $t("preview") }}
</template>
<template #default>
<el-alert v-if="filePreview.truncated" show-icon type="warning" :closable="false" class="mb-2">
@@ -58,7 +54,7 @@
</el-form-item>
</el-form>
</template>
</el-drawer>
</drawer>
</template>
<script setup>
@@ -70,9 +66,10 @@
import ListPreview from "../ListPreview.vue";
import {mapGetters, mapState} from "vuex";
import Markdown from "../layout/Markdown.vue";
import Drawer from "../Drawer.vue";
export default {
components: {Markdown, ListPreview, Editor},
components: {Markdown, ListPreview, Editor, Drawer},
props: {
value: {
type: String,

View File

@@ -6,17 +6,13 @@
{{ $t('metrics') }}
</el-dropdown-item>
<el-drawer
<drawer
v-if="isOpen"
v-model="isOpen"
:title="$t('metrics')"
destroy-on-close
:append-to-body="true"
size=""
direction="ltr"
>
<metrics-table ref="table" :task-run-id="taskRun.id" :execution="execution" />
</el-drawer>
</drawer>
</template>
<script setup>
@@ -26,10 +22,12 @@
<script>
import MetricsTable from "./MetricsTable.vue";
import Drawer from "../Drawer.vue";
export default {
components: {
MetricsTable
MetricsTable,
Drawer
},
data() {
return {

View File

@@ -86,7 +86,7 @@
return {
loadInit: false,
metrics: undefined,
metricsTotal: undefined
metricsTotal: 0
};
},
props: {

View File

@@ -7,21 +7,17 @@
{{ $t('outputs') }}
</el-dropdown-item>
<el-drawer
<drawer
v-if="isOpen"
v-model="isOpen"
:title="$t('outputs')"
destroy-on-close
:append-to-body="true"
size=""
direction="ltr"
>
<vars
:execution="execution"
class="table-unrounded mt-1"
:data="outputs"
/>
</el-drawer>
</drawer>
</template>
<script setup>
@@ -30,10 +26,12 @@
<script>
import Vars from "../executions/Vars.vue";
import Drawer from "../Drawer.vue";
export default {
components: {
Vars,
Drawer,
},
props: {
outputs: {

View File

@@ -48,7 +48,7 @@
<div v-if="execution.trigger" class="mt-4">
<h5>{{ $t("trigger") }}</h5>
<vars :execution="execution" :data="execution.trigger" />
<vars :execution="execution" :data="triggerVariables" />
</div>
<div v-if="execution.inputs" class="mt-4">
@@ -183,6 +183,14 @@
})
})
return inputs;
},
// This is used to display correctly trigger variables
triggerVariables() {
let trigger = this.execution.trigger
trigger["trigger"] = this.execution.trigger.variables
delete trigger["variables"]
return trigger
}
},
};

View File

@@ -1,5 +1,6 @@
<template>
<el-tooltip
v-if="isReplay || enabled"
:persistent="false"
transition=""
:hide-after="0"
@@ -11,13 +12,13 @@
:is="component"
:icon="!isReplay ? RestartIcon : PlayBoxMultiple"
@click="isOpen = !isOpen"
v-if="component !== 'el-dropdown-item' && (isReplay || enabled)"
v-if="component !== 'el-dropdown-item'"
:disabled="!enabled"
:class="!isReplay ? 'restart me-1' : ''"
>
{{ $t(replayOrRestart) }}
</component>
<span v-else-if="component === 'el-dropdown-item' && (isReplay || enabled)">
<span v-else-if="component === 'el-dropdown-item'">
<component
:is="component"
:icon="!isReplay ? RestartIcon : PlayBoxMultiple"
@@ -48,7 +49,7 @@
<p v-html="$t(replayOrRestart + ' confirm', {id: execution.id})" />
<el-form>
<el-form v-if="revisionsOptions && revisionsOptions.length > 1">
<p class="text-muted">
{{ $t("restart change revision") }}
</p>

View File

@@ -210,18 +210,28 @@
this.sseBySubflow[subflow] = sse;
sse.onmessage = (event) => {
if (event && event.lastEventId === "end") {
sse.close();
this.closeSubExecutionSSE(subflow);
}
const previousExecution = this.subflowsExecutions[subflow];
this.$store.commit("execution/addSubflowExecution", {subflow, execution: JSON.parse(event.data)});
// we are receiving a first "fake" event to force initializing the connection: ignoring it
if (event.lastEventId !== "start") {
const previousExecution = this.subflowsExecutions[subflow];
this.$store.commit("execution/addSubflowExecution", {subflow, execution: JSON.parse(event.data)});
// add subflow execution id to graph
if(previousExecution === undefined) {
this.loadGraph(true);
// add subflow execution id to graph
if(previousExecution === undefined) {
this.loadGraph(true);
}
}
};
});
},
closeSubExecutionSSE(subflow) {
const sse = this.sseBySubflow[subflow];
if (sse) {
sse.close();
delete this.sseBySubflow[subflow];
}
}
}
};

View File

@@ -1,8 +1,8 @@
<template>
<el-table stripe table-layout="auto" fixed :data="variables">
<el-table-column prop="key" rowspan="3" :label="$t('name')">
<el-table-column prop="key" min-width="500" :label="$t('name')">
<template #default="scope">
<code>{{ scope.row.key }}</code>
<code class="key-col">{{ scope.row.key }}</code>
</template>
</el-table-column>
@@ -50,3 +50,8 @@
},
};
</script>
<style>
.key-col {
min-width: 200px;
}
</style>

View File

@@ -4,10 +4,10 @@
@change="onSelectedFilterType()"
class="filter"
>
<el-radio-button :label="filterType.RELATIVE">
<el-radio-button :value="filterType.RELATIVE">
{{ $t("relative") }}
</el-radio-button>
<el-radio-button :label="filterType.ABSOLUTE">
<el-radio-button :value="filterType.ABSOLUTE">
{{ $t("absolute") }}
</el-radio-button>
</el-radio-group>
@@ -38,11 +38,6 @@
"update:filterValue"
],
created() {
this.filterType = {
RELATIVE: "REL",
ABSOLUTE: "ABS"
};
this.selectedFilterType = (this.$route.query.startDate || this.$route.query.endDate) ? this.filterType.ABSOLUTE : this.filterType.RELATIVE;
},
mounted() {
@@ -50,7 +45,11 @@
},
data() {
return {
selectedFilterType: undefined
selectedFilterType: undefined,
filterType: {
RELATIVE: "REL",
ABSOLUTE: "ABS"
}
}
},
computed: {

Some files were not shown because too many files have changed in this diff Show More