Compare commits

...

111 Commits

Author SHA1 Message Date
nKwiatkowski
709ac37773 Merge branch 'develop' into fix/failin_execution_test
# Conflicts:
#	webserver/src/test/java/io/kestra/webserver/controllers/api/ExecutionControllerRunnerTest.java
2025-08-04 11:37:46 +02:00
YannC.
f35a0b6d60 fix: add missing webhook releases secrets for github releases 2025-08-01 23:21:27 +02:00
brian.mulier
0c9ed17f1c fix(core): remove icon for inputs in no-code
closes #10520
2025-08-01 16:32:08 +02:00
brian.mulier
7ca20371f8 fix(executions): avoid race condition leading to never-ending follow with non-terminal state 2025-08-01 13:12:14 +02:00
brian.mulier
8ff3454cbd fix(core): ensure instances can read all messages when no consumer group / queue type 2025-08-01 13:12:14 +02:00
Piyush Bhaskar
09593d9fd2 fix(namespaces): fixes loading of additional ns (#10518) 2025-08-01 16:28:01 +05:30
Loïc Mathieu
d3cccf36f0 feat(flow): pull up description to the FlowInterface
This avoid the need to parse the flow for ex by AI to get the description.
2025-08-01 12:43:49 +02:00
Loïc Mathieu
eeb91cd9ed fix(tests): RunContextLoggerTest.secrets(): wrong number of logs in awaitLogs() 2025-08-01 12:41:41 +02:00
Loïc Mathieu
2679b0f067 feat(flows): warn on runnable only properties on non-runnable tasks
Closes #9967
Closes #10500
2025-08-01 12:41:08 +02:00
Piyush Bhaskar
54281864c8 fix(executions): do not rely on monaco to get value (#10515) 2025-08-01 13:23:43 +05:30
Loïc Mathieu
e4f9b11d0c fix(ci): workflow build artifact doesn't need the plugin version 2025-08-01 09:41:48 +02:00
Barthélémy Ledoux
12cef0593c fix(flows): playground need to use ui-libs (#10506) 2025-08-01 09:06:11 +02:00
Piyush Bhaskar
c6cf8f307f fix(flows): route to flow page (#10514) 2025-08-01 12:10:56 +05:30
Piyush Bhaskar
3b4eb55f84 fix(executions): properly handle methods and computed for tabs (#10513) 2025-08-01 12:10:27 +05:30
YannC
d32949985d fix: handle empty flows list in lastExecutions correctly (#10493) 2025-08-01 07:21:00 +02:00
YannC
c051ca2e66 fix(ui): load correctly filters + refresh dashboard on filter change (#10504) 2025-08-01 07:15:46 +02:00
Piyush Bhaskar
93a456963b fix(editor): adjust padding for editor (#10497)
* fix(editor): adjust padding for editor

* fix: make padding 16px
2025-07-31 19:10:46 +05:30
YannC.
9a45f17680 fix(ci): do not run github release on tag 2025-07-31 14:37:51 +02:00
github-actions[bot]
5fb6806d74 chore(core): localize to languages other than english (#10494)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-31 17:44:10 +05:30
Barthélémy Ledoux
f3cff72edd fix(flows): forget all old taskRunId when a new execution (#10487) 2025-07-31 13:41:57 +02:00
Barthélémy Ledoux
0abc660e7d fix(flows): wait longer for widgets to be rendered (#10485) 2025-07-31 13:41:46 +02:00
Barthélémy Ledoux
f09ca3d92e fix(flows): load flows documentation when coming back to no-code root (#10374) 2025-07-31 13:41:36 +02:00
YannC
9fd778fca1 feat(ui): added http method autocompletion (#10492) 2025-07-31 13:28:59 +02:00
Loïc Mathieu
667af25e1b fix(executions): Don't create outputs from the Subflow task when we didn't wait
As, well, if we didn't wait for the subflow execution, we cannot have access to its outputs.
2025-07-31 13:06:58 +02:00
github-actions[bot]
1b1aed5ff1 chore(core): localize to languages other than english (#10489)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-31 12:14:37 +02:00
Barthélémy Ledoux
da1bb58199 fix(flows): add the load errors to the flow errors (#10483) 2025-07-31 11:53:43 +02:00
Loïc Mathieu
d3e661f9f8 feat(system): improve performance of computeSchedulable
- Store flowIds in a list to avoid computing the multiple times
- Storeg triggers by ID in a map to avoid iterating the list of triggers for each flow
2025-07-31 11:35:01 +02:00
yuri1969
2126c8815e feat(core): validate URL configuration
Used the `ServerCommandValidator` style.

BREAKING CHANGE: app won't start due invalid `kestra.url`
2025-07-31 11:24:21 +02:00
yuri1969
6cfc5b8799 fix(build): reduce Gradle warnings 2025-07-31 11:21:01 +02:00
Barthélémy Ledoux
16d44034f0 fix(flows): hide executionkind meta in the logs (#10482) 2025-07-31 10:50:34 +02:00
Barthélémy Ledoux
f76e62a4af fix(executions): do not rely on monaco to get value (#10467) 2025-07-31 09:28:33 +02:00
Piyush Bhaskar
f6645da94c fix(core): remove top spacing from no execution page and removing the redundant code (#10445)
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-07-31 12:03:58 +05:30
github-actions[bot]
93b2bbf0d0 chore(core): localize to languages other than english (#10471)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-31 08:23:08 +02:00
Piyush Bhaskar
9d46e2aece fix(executions): make columns that are not links normal text (#10460)
* fix(executions): make it normal text

* fix(executions): use monospace font only
2025-07-31 10:33:33 +05:30
brian.mulier
133315a2a5 chore(deps): hardcode vue override version 2025-07-30 19:25:50 +02:00
brian.mulier
b96b9bb414 fix(core): avoid follow execution from being discarded too early
closes #10472
closes #7623
2025-07-30 19:25:50 +02:00
Barthélémy Ledoux
9865d8a7dc fix(flows): playground - implement new designs (#10459)
Co-authored-by: brian.mulier <bmmulier@hotmail.fr>
2025-07-30 17:54:46 +02:00
brian-mulier-p
29f22c2f81 fix(core): redesign playground run task button (#10423)
closes #10389
2025-07-30 15:23:33 +02:00
dependabot[bot]
3e69469381 build(deps): bump net.thisptr:jackson-jq from 1.3.0 to 1.4.0
Bumps [net.thisptr:jackson-jq](https://github.com/eiiches/jackson-jq) from 1.3.0 to 1.4.0.
- [Release notes](https://github.com/eiiches/jackson-jq/releases)
- [Commits](https://github.com/eiiches/jackson-jq/compare/1.3.0...1.4.0)

---
updated-dependencies:
- dependency-name: net.thisptr:jackson-jq
  dependency-version: 1.4.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-07-30 15:08:39 +02:00
dependabot[bot]
38c24ccf7f build(deps): bump software.amazon.awssdk:bom from 2.32.6 to 2.32.11
Bumps software.amazon.awssdk:bom from 2.32.6 to 2.32.11.

---
updated-dependencies:
- dependency-name: software.amazon.awssdk:bom
  dependency-version: 2.32.11
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-07-30 15:07:49 +02:00
Loïc Mathieu
12cf41a309 fix(ci): don't publish docker in build-artifact 2025-07-30 14:42:16 +02:00
Malaydewangan09
7b8ea0d885 feat(plugins): add script plugins 2025-07-30 17:27:48 +05:30
Barthélémy Ledoux
cf88bbcb12 fix(flows): playground align restart button button (#10415) 2025-07-30 11:57:24 +02:00
Loïc Mathieu
6abe7f96e7 fix(ci): add missing build artifact job 2025-07-30 11:47:10 +02:00
Loïc Mathieu
e73ac78d8b build(ci): allow downloading the exe from the workflow and not the release
This would allow running the workflow even if the release step fail
2025-07-30 11:23:43 +02:00
François Delbrayelle
b0687eb702 fix(): fix icons 2025-07-30 10:28:10 +02:00
weibo1
85f9070f56 feat: Trigger Initialization Method Performance Optimization 2025-07-30 09:23:48 +02:00
YannC
0a42ab40ec fix(dashboard): pageSize & pageNumber is now correctly pass when fetching a chart (#10413) 2025-07-30 08:45:20 +02:00
Piyush Bhaskar
856d2d1d51 refactor(flows): remove execution chart (#10425) 2025-07-30 11:54:35 +05:30
YannC.
a7d6dbc8a3 feat(ci): allow to run github release ci on dispatch 2025-07-29 15:04:50 +02:00
YannC.
cf82109da6 fix(ci): correctly pass GH token to release workflow 2025-07-29 15:01:36 +02:00
Barthélémy Ledoux
d4168ba424 fix(flows): playground clear current execution when clearExecutions() (#10414) 2025-07-29 14:43:11 +02:00
Loïc Mathieu
46a294f25a chore(version): upgrade to v1.0.0-SNAPSHOT 2025-07-29 14:23:19 +02:00
Loïc Mathieu
a229036d8d chore(version): update to version 'v0.24.0-rc0-SNAPSHOT'. 2025-07-29 14:21:49 +02:00
François Delbrayelle
a518fefecd feat(plugins): add plugin-deepseek 2025-07-29 13:58:11 +02:00
Barthélémy Ledoux
1d3210fd7d fix(flows): remove text from warning button (#10370) 2025-07-29 11:27:37 +02:00
brian-mulier-p
597f84ecb7 fix(core): topology was no longer working on new flows (#10411)
closes #10354
2025-07-29 11:19:05 +02:00
Barthélémy Ledoux
5f3c7ac9f0 fix(core): allow icons api call to take longer than local call (#10412) 2025-07-29 11:13:12 +02:00
Nicolas K.
77c4691b04 fix(tests): rework basic auth service test (#10409)
* fix(tests): rework basic auth service test

* fix(tests): clean basic auth service test

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-07-29 10:59:43 +02:00
github-actions[bot]
6d34416529 chore(core): localize to languages other than english (#10410)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-29 13:55:25 +05:30
Piyush Bhaskar
40a67d5dcd feat(flows): add webhook curl and json field (#10392) 2025-07-29 13:48:28 +05:30
Loïc Mathieu
2c68c704f6 fix(test): flaky test JdbcQueueTest.withGroupAndType() 2025-07-29 09:35:28 +02:00
Miloš Paunović
e59d9f622c chore(namespaces): properly handle file name field on flow run dialog if set from defaults (#10390)
Closes https://github.com/kestra-io/kestra/issues/10365.
2025-07-29 08:29:08 +02:00
Piyush Bhaskar
c951ba39a7 fix(core): make validation less aggressive (#10406) 2025-07-29 11:41:38 +05:30
github-actions[bot]
a0200cfacb chore(core): localize to languages other than english (#10405)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-28 20:50:23 +02:00
dependabot[bot]
c6310f0697 build(deps): bump com.github.ben-manes.caffeine:caffeine
Bumps [com.github.ben-manes.caffeine:caffeine](https://github.com/ben-manes/caffeine) from 3.2.1 to 3.2.2.
- [Release notes](https://github.com/ben-manes/caffeine/releases)
- [Commits](https://github.com/ben-manes/caffeine/compare/v3.2.1...v3.2.2)

---
updated-dependencies:
- dependency-name: com.github.ben-manes.caffeine:caffeine
  dependency-version: 3.2.2
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-07-28 17:28:31 +02:00
dependabot[bot]
21ba59a525 build(deps): bump org.postgresql:postgresql in the gradle group
Bumps the gradle group with 1 update: [org.postgresql:postgresql](https://github.com/pgjdbc/pgjdbc).


Updates `org.postgresql:postgresql` from 42.7.6 to 42.7.7
- [Release notes](https://github.com/pgjdbc/pgjdbc/releases)
- [Changelog](https://github.com/pgjdbc/pgjdbc/blob/master/CHANGELOG.md)
- [Commits](https://github.com/pgjdbc/pgjdbc/compare/REL42.7.6...REL42.7.7)

---
updated-dependencies:
- dependency-name: org.postgresql:postgresql
  dependency-version: 42.7.7
  dependency-type: direct:production
  dependency-group: gradle
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-07-28 17:26:23 +02:00
Barthélémy Ledoux
4f9e3cd06c fix(flows): bring back alignment on overview (#10369) 2025-07-28 17:22:55 +02:00
Barthélémy Ledoux
e74010d1a4 fix(flows): playground - use all tasks as breakpoints (#10399) 2025-07-28 17:22:23 +02:00
Barthélémy Ledoux
465e6467e9 chore: update ui-libs (#10391) 2025-07-28 17:12:56 +02:00
YannC
c68c1b16d9 fix: set postgres and mysql queue offset as a bigint (#10344) 2025-07-28 16:28:09 +02:00
YannC
468c32156e chore: enforce micronaut open api version until Micronaut Platform works (#10393) 2025-07-28 16:27:59 +02:00
Loïc Mathieu
6e0a1c61ef fix(tests): increase the sleep inside ExecutionControllerRunnerTest.triggerExecutionAndFollowDependencies()
I'm not happy with that but I ran 3x 100 repetitions and all passed
2025-07-28 16:23:40 +02:00
Loïc Mathieu
552d55ef6b fix(test): RestactCaseTest.restartFailedWithFinally() should use executionService.isTerminated() 2025-07-28 16:23:40 +02:00
skayliu
08b0b682bf refactor(pebble): add more timestamp data time format 2025-07-28 16:17:55 +02:00
ben8t
cff90c93bb feat(plugin): add Notion plugin (#10049) 2025-07-28 16:09:01 +02:00
Roman Acevedo
ea465056d0 fix(triggers): bulk action on triggers did not take into account this is async (#10307) 2025-07-28 15:21:03 +02:00
Piyush Bhaskar
02f150f0b0 fix(core): animation on ai agent button (#10379)
* fix(core): animation on ai agent button

* reafctor(ai): add AITriggerButton component.
2025-07-28 18:48:34 +05:30
Loïc Mathieu
95d95d3d3c fix(tests): fix assertions in DateFilterTest.now() 2025-07-28 14:51:41 +02:00
Nicolas K.
6b8d3d6928 fix(tests): flaky test with wire mock not staring fast enough (#10383)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-07-28 14:07:48 +02:00
Piyush Bhaskar
1e347073ca fix(plugins): go to list of plugins from sidebar (#10385)
* fix(plugins): go to list of plugins from sidebar

* fix: update route to use path.
2025-07-28 17:24:09 +05:30
Loïc Mathieu
ac09dcecd9 fix(flows): wrong warning in FILE input
As we deprecate the `extension` property but still have a default for it, the warning is always disaply.
Removing the default and applying only when needed fixes the issue.

Fixes #10361
2025-07-28 13:40:11 +02:00
Miloš Paunović
40b337cd22 fix(namespaces): make sure the namespace parameter is properly passed when reading a file (#10384)
Relates to https://github.com/kestra-io/kestra/issues/10363.
Relates to https://github.com/kestra-io/kestra-ee/issues/4514.
2025-07-28 12:35:44 +02:00
Karthik D
5377d16036 chore(plugins): simplify the plugins search field (#10373)
Closes https://github.com/kestra-io/kestra/issues/10300.

Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-07-28 12:15:08 +02:00
Barthélémy Ledoux
f717bc413f tests: update frontend-tests (#10380) 2025-07-28 12:02:32 +02:00
Loïc Mathieu
d6bed2d235 fix(flows): file watching flow should delete the flow when the file is deleted while reading for update
When you use flow watching, a file can be updated then deleted, if it occurs quickly you could have a modified event, read the file then have a file not found exception as it has been deleted.
We should delete the flow in this case.

This has been detected because the related tests are flaky, doing that reduce falkiness of the related tests
2025-07-28 12:00:30 +02:00
Loïc Mathieu
07fd74b238 fix(test): flaky ExecutionControllerRunnerTest.triggerExecutionAndFollowDependencies() 2025-07-28 11:50:57 +02:00
Loïc Mathieu
60eef29de2 fix(tests): await for terminated execution instead of sleep in ExecutionControllerRunnerTest.killByIdShouldFailed() 2025-07-28 11:50:57 +02:00
Loïc Mathieu
20ca7b6380 fix(tests): increase wait time to 10 in LogConsumerTests.logs()
This  should reduce flakiness
2025-07-28 11:50:57 +02:00
Piyush Bhaskar
9d82df61c6 Revert "fix(core): fixes cumbersome operator selection (#10322)" (#10377)
This reverts commit e78210b5eb.
2025-07-28 14:52:58 +05:30
Piyush Bhaskar
e78210b5eb fix(core): fixes cumbersome operator selection (#10322) 2025-07-28 14:29:42 +05:30
Miloš Paunović
83143fae83 fix(executions): add default icons to execution dependency view (#10375)
Closes https://github.com/kestra-io/kestra/issues/10327.
2025-07-28 10:58:12 +02:00
Malay Dewangan
25f5ccc6b5 fix(runner): support floating-point CPU values in Docker runner (#10366) 2025-07-28 14:21:23 +05:30
Piyush Bhaskar
cf3e49a284 fix(core): give height to tooltip and use ks tokens (#10372) 2025-07-28 13:49:42 +05:30
JAGADEESH E
9a72d378df chore(core): amend missing property causing console warning in the settings page (#9788)
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-07-28 09:49:14 +02:00
Loïc Mathieu
752a927fac feat(logs): allow LogShipper to filters on flowId and executionId
Closes https://github.com/kestra-io/kestra-ee/issues/4468
2025-07-28 09:40:36 +02:00
yuri
4053392921 fix(core): amend misc label-related issues (#10044)
* fix(core): amend misc label-related issues

* re-enabled bulk update of label value
* re-enabled merging flow-execution labels by key
* made duplicated keys rejection readable
* forced multiple validations within `RequestUtils`
* ensured existing labels can be overriden
* added multiple tests validating complex scenarios

BREAKING CHANGE: switched from first to last label value override
BREAKING CHANGE: preventing empty key/value labels
BREAKING CHANGE: preventing whitespace in key

* fix(core): reflect feedback

* Deduplicated a list inside the `Labels` task.
* Worked around label mutation at `Worker`.
* Attempted to deduplicate labels within `Execution` as possible.

* fix(core): remove irrelevant changes
2025-07-28 09:38:38 +02:00
Barthélémy Ledoux
8b0483643a feat(flows): code editor can launch playground (#10359) 2025-07-28 09:15:39 +02:00
Piyush Bhaskar
5feeb41c7a fix(core): update state count emission and filter table executions. (#10367) 2025-07-28 12:42:20 +05:30
github-actions[bot]
d7f5e5c05d chore(core): localize to languages other than english (#10368)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-28 09:09:32 +02:00
Aditya
4840f723fc chore(core): properly handle environment name set either via config and through the settings page (#10151)
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-07-28 09:06:34 +02:00
Devesh Kumar
8cf159b281 fix(namespaces): prevent namespace folder highlighting when containing file is selected (#10364)
Closes https://github.com/kestra-io/kestra/issues/10360.
2025-07-28 08:47:25 +02:00
Loïc Mathieu
4c79576113 fix(tests): improve JdbcQueueTest flaky tests 2025-07-25 12:50:27 +02:00
Florian Hussonnois
f87f2ed753 fix(system): avoid potential NPE in ServiceLivenessManager (#10338)
Avoid a potential NPE in ServiceLivenessManager when
a local service is unregistered during shutdown before the liveness probe completes

Fix: #10338
2025-07-25 12:33:18 +02:00
Florian Hussonnois
298a6c7ca8 fix(system): ignore state transition failure for indexer
Fix: kestra-io/kestra-ee#4474
2025-07-25 11:35:09 +02:00
Loïc Mathieu
ab464fff6e fix(executions): flow concurrency limit not honors when executions are created at a high rate
This is due to the fact that we now process the execution queue concurrently so there is a race when counting currently running executions. This can be seen easily using a ForEachItem as it could create tens or hundreds of executions almost instantly leading to almost all those executions started as they would all see 0 executions running...

Using a dedicated execution running queue, as done in EE, would serialize the messages and fix the issue.

However, if using multiple executor instances and concurrency limit = 1, there is a theoretical race as no locks will be done if no execution is running. A max surge of executions could be as high as the number of executor but this race is less probable to happen in real world scenario.

Fixes #10167
2025-07-25 11:35:00 +02:00
Florian Hussonnois
6dcba16314 chore(core): clean QueryFilter class 2025-07-25 11:34:09 +02:00
Barthélémy Ledoux
80a328e87e fix(flows): better loading pattern (#10345) 2025-07-25 10:14:07 +02:00
Loïc Mathieu
f2034f4975 fix(executions): race condition inside nested ForEach with concurrency
Fixes #10167
2025-07-25 09:45:29 +02:00
github-actions[bot]
edca56d168 chore(core): localize to languages other than english (#10341)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-24 23:21:03 +02:00
nKwiatkowski
c55dedcc56 fix(tests): failing execution controller test 2025-07-23 16:11:55 +02:00
160 changed files with 4069 additions and 1677 deletions

View File

@@ -20,6 +20,15 @@ on:
required: false
type: string
default: "LATEST"
force-download-artifact:
description: 'Force download artifact'
required: false
type: string
default: "true"
options:
- "true"
- "false"
env:
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
jobs:
@@ -38,9 +47,18 @@ jobs:
id: plugins
with:
plugin-version: ${{ env.PLUGIN_VERSION }}
# ********************************************************************************************************************
# Build
# ********************************************************************************************************************
build-artifacts:
name: Build Artifacts
if: ${{ github.event.inputs.force-download-artifact == 'true' }}
uses: ./.github/workflows/workflow-build-artifacts.yml
docker:
name: Publish Docker
needs: [ plugins ]
needs: [ plugins, build-artifacts ]
runs-on: ubuntu-latest
strategy:
matrix:
@@ -73,14 +91,27 @@ jobs:
else
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
fi
# Download release
- name: Download release
# [workflow_dispatch]
# Download executable from GitHub Release
- name: Artifacts - Download release (workflow_dispatch)
id: download-github-release
if: github.event_name == 'workflow_dispatch' && github.event.inputs.force-download-artifact == 'false'
uses: robinraju/release-downloader@v1.12
with:
tag: ${{steps.vars.outputs.tag}}
fileName: 'kestra-*'
out-file-path: build/executable
# [workflow_call]
# Download executable from artifact
- name: Artifacts - Download executable
if: github.event_name != 'workflow_dispatch' || steps.download-github-release.outcome == 'skipped'
uses: actions/download-artifact@v4
with:
name: exe
path: build/executable
- name: Copy exe to image
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra

View File

@@ -43,7 +43,8 @@ jobs:
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
end:
runs-on: ubuntu-latest
needs:

View File

@@ -1,23 +1,7 @@
name: Build Artifacts
on:
workflow_call:
inputs:
plugin-version:
description: "Kestra version"
default: 'LATEST'
required: true
type: string
outputs:
docker-tag:
value: ${{ jobs.build.outputs.docker-tag }}
description: "The Docker image Tag for Kestra"
docker-artifact-name:
value: ${{ jobs.build.outputs.docker-artifact-name }}
description: "The GitHub artifact containing the Kestra docker image name."
plugins:
value: ${{ jobs.build.outputs.plugins }}
description: "The Kestra plugins list used for the build."
workflow_call: {}
jobs:
build:
@@ -82,55 +66,6 @@ jobs:
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
# Docker Tag
- name: Setup - Docker vars
id: vars
shell: bash
run: |
TAG=${GITHUB_REF#refs/*/}
if [[ $TAG = "master" ]]
then
TAG="latest";
elif [[ $TAG = "develop" ]]
then
TAG="develop";
elif [[ $TAG = v* ]]
then
TAG="${TAG}";
else
TAG="build-${{ github.run_id }}";
fi
echo "tag=${TAG}" >> $GITHUB_OUTPUT
echo "artifact=docker-kestra-${TAG}" >> $GITHUB_OUTPUT
# Docker setup
- name: Docker - Setup QEMU
uses: docker/setup-qemu-action@v3
- name: Docker - Fix Qemu
shell: bash
run: |
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
- name: Docker - Setup Buildx
uses: docker/setup-buildx-action@v3
# Docker Build
- name: Docker - Build & export image
uses: docker/build-push-action@v6
if: "!startsWith(github.ref, 'refs/tags/v')"
with:
context: .
push: false
file: Dockerfile
tags: |
kestra/kestra:${{ steps.vars.outputs.tag }}
build-args: |
KESTRA_PLUGINS=${{ steps.plugins.outputs.plugins }}
APT_PACKAGES=${{ env.DOCKER_APT_PACKAGES }}
PYTHON_LIBRARIES=${{ env.DOCKER_PYTHON_LIBRARIES }}
outputs: type=docker,dest=/tmp/${{ steps.vars.outputs.artifact }}.tar
# Upload artifacts
- name: Artifacts - Upload JAR
uses: actions/upload-artifact@v4
@@ -143,10 +78,3 @@ jobs:
with:
name: exe
path: build/executable/
- name: Artifacts - Upload Docker
uses: actions/upload-artifact@v4
if: "!startsWith(github.ref, 'refs/tags/v')"
with:
name: ${{ steps.vars.outputs.artifact }}
path: /tmp/${{ steps.vars.outputs.artifact }}.tar

View File

@@ -1,14 +1,17 @@
name: Github - Release
on:
workflow_dispatch:
workflow_call:
secrets:
GH_PERSONAL_TOKEN:
description: "The Github personal token."
required: true
push:
tags:
- '*'
SLACK_RELEASES_WEBHOOK_URL:
description: "The Slack webhook URL."
required: true
jobs:
publish:

View File

@@ -41,8 +41,6 @@ jobs:
name: Build Artifacts
if: ${{ github.event.inputs.force-download-artifact == 'true' }}
uses: ./.github/workflows/workflow-build-artifacts.yml
with:
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
# ********************************************************************************************************************
# Docker
# ********************************************************************************************************************

View File

@@ -42,12 +42,16 @@ on:
SONATYPE_GPG_FILE:
description: "The Sonatype GPG file."
required: true
GH_PERSONAL_TOKEN:
description: "GH personnal Token."
required: true
SLACK_RELEASES_WEBHOOK_URL:
description: "Slack webhook for releases channel."
required: true
jobs:
build-artifacts:
name: Build - Artifacts
uses: ./.github/workflows/workflow-build-artifacts.yml
with:
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
Docker:
name: Publish Docker
@@ -77,4 +81,5 @@ jobs:
if: startsWith(github.ref, 'refs/tags/v')
uses: ./.github/workflows/workflow-github-release.yml
secrets:
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}

View File

@@ -26,6 +26,7 @@
#plugin-debezium:io.kestra.plugin:plugin-debezium-oracle:LATEST
#plugin-debezium:io.kestra.plugin:plugin-debezium-postgres:LATEST
#plugin-debezium:io.kestra.plugin:plugin-debezium-sqlserver:LATEST
#plugin-deepseek:io.kestra.plugin:plugin-deepseek:LATEST
#plugin-docker:io.kestra.plugin:plugin-docker:LATEST
#plugin-elasticsearch:io.kestra.plugin:plugin-elasticsearch:LATEST
#plugin-fivetran:io.kestra.plugin:plugin-fivetran:LATEST
@@ -86,13 +87,18 @@
#plugin-powerbi:io.kestra.plugin:plugin-powerbi:LATEST
#plugin-pulsar:io.kestra.plugin:plugin-pulsar:LATEST
#plugin-redis:io.kestra.plugin:plugin-redis:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-bun:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-deno:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-go:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-groovy:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-jbang:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-julia:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-jython:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-lua:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-nashorn:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-node:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-perl:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-php:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-powershell:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-python:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-r:LATEST

View File

@@ -225,14 +225,14 @@ subprojects {
}
testlogger {
theme 'mocha-parallel'
showExceptions true
showFullStackTraces true
showCauses true
slowThreshold 2000
showStandardStreams true
showPassedStandardStreams false
showSkippedStandardStreams true
theme = 'mocha-parallel'
showExceptions = true
showFullStackTraces = true
showCauses = true
slowThreshold = 2000
showStandardStreams = true
showPassedStandardStreams = false
showSkippedStandardStreams = true
}
}
}
@@ -410,7 +410,7 @@ jar {
shadowJar {
archiveClassifier.set(null)
mergeServiceFiles()
zip64 true
zip64 = true
}
distZip.dependsOn shadowJar
@@ -427,8 +427,8 @@ def executableDir = layout.buildDirectory.dir("executable")
def executable = layout.buildDirectory.file("executable/${project.name}-${project.version}").get().asFile
tasks.register('writeExecutableJar') {
group "build"
description "Write an executable jar from shadow jar"
group = "build"
description = "Write an executable jar from shadow jar"
dependsOn = [shadowJar]
final shadowJarFile = tasks.shadowJar.outputs.files.singleFile
@@ -454,8 +454,8 @@ tasks.register('writeExecutableJar') {
}
tasks.register('executableJar', Zip) {
group "build"
description "Zip the executable jar"
group = "build"
description = "Zip the executable jar"
dependsOn = [writeExecutableJar]
archiveFileName = "${project.name}-${project.version}.zip"

View File

@@ -162,7 +162,15 @@ public class FileChangedEventListener {
}
} catch (NoSuchFileException e) {
log.error("File not found: {}", entry, e);
log.warn("File not found: {}, deleting it", entry, e);
// the file might have been deleted while reading so if not found we try to delete the flow
flows.stream()
.filter(flow -> flow.getPath().equals(filePath.toString()))
.findFirst()
.ifPresent(flowWithPath -> {
flowFilesManager.deleteFlow(flowWithPath.getTenantId(), flowWithPath.getNamespace(), flowWithPath.getId());
this.flows.removeIf(fwp -> fwp.uidWithoutRevision().equals(flowWithPath.uidWithoutRevision()));
});
} catch (IOException e) {
log.error("Error reading file: {}", entry, e);
}

View File

@@ -1,11 +1,10 @@
package io.kestra.core.models;
import io.kestra.core.utils.MapUtils;
import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
public record Label(@NotNull String key, @NotNull String value) {
@@ -29,11 +28,36 @@ public record Label(@NotNull String key, @NotNull String value) {
* @return the nested {@link Map}.
*/
public static Map<String, Object> toNestedMap(List<Label> labels) {
Map<String, Object> asMap = labels.stream()
return MapUtils.flattenToNestedMap(toMap(labels));
}
/**
* Static helper method for converting a list of labels to a flat map.
* Key order is kept.
*
* @param labels The list of {@link Label} to be converted.
* @return the flat {@link Map}.
*/
public static Map<String, String> toMap(@Nullable List<Label> labels) {
if (labels == null || labels.isEmpty()) return Collections.emptyMap();
return labels.stream()
.filter(label -> label.value() != null && label.key() != null)
// using an accumulator in case labels with the same key exists: the first is kept
.collect(Collectors.toMap(Label::key, Label::value, (first, second) -> first));
return MapUtils.flattenToNestedMap(asMap);
// using an accumulator in case labels with the same key exists: the second is kept
.collect(Collectors.toMap(Label::key, Label::value, (first, second) -> second, LinkedHashMap::new));
}
/**
* Static helper method for deduplicating a list of labels by their key.
* Value of the last key occurrence is kept.
*
* @param labels The list of {@link Label} to be deduplicated.
* @return the deduplicated {@link List}.
*/
public static List<Label> deduplicate(@Nullable List<Label> labels) {
if (labels == null || labels.isEmpty()) return Collections.emptyList();
return toMap(labels).entrySet().stream()
.map(entry -> new Label(entry.getKey(), entry.getValue()))
.collect(Collectors.toCollection(ArrayList::new));
}
/**

View File

@@ -6,9 +6,9 @@ import com.fasterxml.jackson.annotation.JsonValue;
import io.kestra.core.exceptions.InvalidQueryFiltersException;
import io.kestra.core.models.dashboards.filters.*;
import io.kestra.core.utils.Enums;
import java.util.ArrayList;
import lombok.Builder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -49,42 +49,27 @@ public record QueryFilter(
PREFIX
}
@SuppressWarnings("unchecked")
private List<Object> asValues(Object value) {
return value instanceof String valueStr ? Arrays.asList(valueStr.split(",")) : (List<Object>) value;
}
@SuppressWarnings("unchecked")
public <T extends Enum<T>> AbstractFilter<T> toDashboardFilterBuilder(T field, Object value) {
switch (this.operation) {
case EQUALS:
return EqualTo.<T>builder().field(field).value(value).build();
case NOT_EQUALS:
return NotEqualTo.<T>builder().field(field).value(value).build();
case GREATER_THAN:
return GreaterThan.<T>builder().field(field).value(value).build();
case LESS_THAN:
return LessThan.<T>builder().field(field).value(value).build();
case GREATER_THAN_OR_EQUAL_TO:
return GreaterThanOrEqualTo.<T>builder().field(field).value(value).build();
case LESS_THAN_OR_EQUAL_TO:
return LessThanOrEqualTo.<T>builder().field(field).value(value).build();
case IN:
return In.<T>builder().field(field).values(asValues(value)).build();
case NOT_IN:
return NotIn.<T>builder().field(field).values(asValues(value)).build();
case STARTS_WITH:
return StartsWith.<T>builder().field(field).value(value.toString()).build();
case ENDS_WITH:
return EndsWith.<T>builder().field(field).value(value.toString()).build();
case CONTAINS:
return Contains.<T>builder().field(field).value(value.toString()).build();
case REGEX:
return Regex.<T>builder().field(field).value(value.toString()).build();
case PREFIX:
return Regex.<T>builder().field(field).value("^" + value.toString().replace(".", "\\.") + "(?:\\..+)?$").build();
default:
throw new IllegalArgumentException("Unsupported operation: " + this.operation);
}
return switch (this.operation) {
case EQUALS -> EqualTo.<T>builder().field(field).value(value).build();
case NOT_EQUALS -> NotEqualTo.<T>builder().field(field).value(value).build();
case GREATER_THAN -> GreaterThan.<T>builder().field(field).value(value).build();
case LESS_THAN -> LessThan.<T>builder().field(field).value(value).build();
case GREATER_THAN_OR_EQUAL_TO -> GreaterThanOrEqualTo.<T>builder().field(field).value(value).build();
case LESS_THAN_OR_EQUAL_TO -> LessThanOrEqualTo.<T>builder().field(field).value(value).build();
case IN -> In.<T>builder().field(field).values(asValues(value)).build();
case NOT_IN -> NotIn.<T>builder().field(field).values(asValues(value)).build();
case STARTS_WITH -> StartsWith.<T>builder().field(field).value(value.toString()).build();
case ENDS_WITH -> EndsWith.<T>builder().field(field).value(value.toString()).build();
case CONTAINS -> Contains.<T>builder().field(field).value(value.toString()).build();
case REGEX -> Regex.<T>builder().field(field).value(value.toString()).build();
case PREFIX -> Regex.<T>builder().field(field).value("^" + value.toString().replace(".", "\\.") + "(?:\\..+)?$").build();
};
}
public enum Field {

View File

@@ -25,6 +25,7 @@ import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
import io.kestra.core.services.LabelService;
import io.kestra.core.test.flow.TaskFixture;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.MapUtils;
import io.swagger.v3.oas.annotations.Hidden;
import jakarta.annotation.Nullable;
@@ -131,12 +132,12 @@ public class Execution implements DeletedInterface, TenantInterface {
* @param labels The Flow labels.
* @return a new {@link Execution}.
*/
public static Execution newExecution(final Flow flow, final List<Label> labels) {
public static Execution newExecution(final FlowInterface flow, final List<Label> labels) {
return newExecution(flow, null, labels, Optional.empty());
}
public List<Label> getLabels() {
return Optional.ofNullable(this.labels).orElse(new ArrayList<>());
return ListUtils.emptyOnNull(this.labels);
}
/**
@@ -181,8 +182,22 @@ public class Execution implements DeletedInterface, TenantInterface {
}
/**
* Customization of Lombok-generated builder.
*/
public static class ExecutionBuilder {
/**
* Enforce unique values of {@link Label} when using the builder.
*
* @param labels The labels.
* @return Deduplicated labels.
*/
public ExecutionBuilder labels(List<Label> labels) {
this.labels = Label.deduplicate(labels);
return this;
}
void prebuild() {
this.originalId = this.id;
this.metadata = ExecutionMetadata.builder()
@@ -231,7 +246,6 @@ public class Execution implements DeletedInterface, TenantInterface {
}
public Execution withLabels(List<Label> labels) {
return new Execution(
this.tenantId,
this.id,
@@ -241,7 +255,7 @@ public class Execution implements DeletedInterface, TenantInterface {
this.taskRunList,
this.inputs,
this.outputs,
labels,
Label.deduplicate(labels),
this.variables,
this.state,
this.parentId,
@@ -400,7 +414,7 @@ public class Execution implements DeletedInterface, TenantInterface {
*
* @param resolvedTasks normal tasks
* @param resolvedErrors errors tasks
* @param resolvedErrors finally tasks
* @param resolvedFinally finally tasks
* @return the flow we need to follow
*/
public List<ResolvedTask> findTaskDependingFlowState(

View File

@@ -38,6 +38,8 @@ public abstract class AbstractFlow implements FlowInterface {
@Min(value = 1)
Integer revision;
String description;
@Valid
List<Input<?>> inputs;

View File

@@ -61,13 +61,10 @@ public class Flow extends AbstractFlow implements HasUID {
}
});
String description;
Map<String, Object> variables;
@Valid
@NotEmpty
@Schema(additionalProperties = Schema.AdditionalPropertiesValue.TRUE)
List<Task> tasks;

View File

@@ -31,6 +31,8 @@ public interface FlowInterface extends FlowId, DeletedInterface, TenantInterface
Pattern YAML_REVISION_MATCHER = Pattern.compile("(?m)^revision: \\d+\n?");
String getDescription();
boolean isDisabled();
boolean isDeleted();

View File

@@ -20,9 +20,8 @@ public class FileInput extends Input<URI> {
private static final String DEFAULT_EXTENSION = ".upl";
@Builder.Default
@Deprecated(since = "0.24", forRemoval = true)
public String extension = DEFAULT_EXTENSION;
public String extension;
@Override
public void validate(URI input) throws ConstraintViolationException {
@@ -33,6 +32,7 @@ public class FileInput extends Input<URI> {
String res = inputs.stream()
.filter(in -> in instanceof FileInput)
.filter(in -> in.getId().equals(fileName))
.filter(flowInput -> ((FileInput) flowInput).getExtension() != null)
.map(flowInput -> ((FileInput) flowInput).getExtension())
.findFirst()
.orElse(FileInput.DEFAULT_EXTENSION);

View File

@@ -28,6 +28,7 @@ public interface QueueFactoryInterface {
String SUBFLOWEXECUTIONRESULT_NAMED = "subflowExecutionResultQueue";
String CLUSTER_EVENT_NAMED = "clusterEventQueue";
String SUBFLOWEXECUTIONEND_NAMED = "subflowExecutionEndQueue";
String EXECUTION_RUNNING_NAMED = "executionRunningQueue";
QueueInterface<Execution> execution();
@@ -58,4 +59,6 @@ public interface QueueFactoryInterface {
QueueInterface<SubflowExecutionResult> subflowExecutionResult();
QueueInterface<SubflowExecutionEnd> subflowExecutionEnd();
QueueInterface<ExecutionRunning> executionRunning();
}

View File

@@ -27,7 +27,7 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
void delete(String consumerGroup, T message) throws QueueException;
default Runnable receive(Consumer<Either<T, DeserializationException>> consumer) {
return receive((String) null, consumer);
return receive(null, consumer, false);
}
default Runnable receive(String consumerGroup, Consumer<Either<T, DeserializationException>> consumer) {

View File

@@ -27,8 +27,6 @@ public class QueueService {
return ((Executor) object).getExecution().getId();
} else if (object.getClass() == MetricEntry.class) {
return null;
} else if (object.getClass() == ExecutionRunning.class) {
return ((ExecutionRunning) object).getExecution().getId();
} else if (object.getClass() == SubflowExecutionEnd.class) {
return ((SubflowExecutionEnd) object).getParentExecutionId();
} else {

View File

@@ -161,7 +161,7 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
}
List<Execution> lastExecutions(
@Nullable String tenantId,
String tenantId,
@Nullable List<FlowFilter> flows
);
}

View File

@@ -82,6 +82,8 @@ public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry
Flux<LogEntry> findAsync(
@Nullable String tenantId,
@Nullable String namespace,
@Nullable String flowId,
@Nullable String executionId,
@Nullable Level minLevel,
ZonedDateTime startDate
);

View File

@@ -1,5 +1,6 @@
package io.kestra.core.runners;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.utils.IdUtils;
import jakarta.validation.constraints.NotNull;
@@ -11,7 +12,7 @@ import lombok.With;
@Value
@AllArgsConstructor
@Builder
public class ExecutionRunning {
public class ExecutionRunning implements HasUID {
String tenantId;
@NotNull
@@ -26,6 +27,7 @@ public class ExecutionRunning {
@With
ConcurrencyState concurrencyState;
@Override
public String uid() {
return IdUtils.fromPartsAndSeparator('|', this.tenantId, this.namespace, this.flowId, this.execution.getId());
}

View File

@@ -102,49 +102,39 @@ public class ExecutorService {
return this.flowExecutorInterface;
}
public Executor checkConcurrencyLimit(Executor executor, FlowInterface flow, Execution execution, long count) {
// if above the limit, handle concurrency limit based on its behavior
if (count >= flow.getConcurrency().getLimit()) {
public ExecutionRunning processExecutionRunning(FlowInterface flow, int runningCount, ExecutionRunning executionRunning) {
// if concurrency was removed, it can be null as we always get the latest flow definition
if (flow.getConcurrency() != null && runningCount >= flow.getConcurrency().getLimit()) {
return switch (flow.getConcurrency().getBehavior()) {
case QUEUE -> {
var newExecution = execution.withState(State.Type.QUEUED);
ExecutionRunning executionRunning = ExecutionRunning.builder()
.tenantId(flow.getTenantId())
.namespace(flow.getNamespace())
.flowId(flow.getId())
.execution(newExecution)
.concurrencyState(ExecutionRunning.ConcurrencyState.QUEUED)
.build();
// when max concurrency is reached, we throttle the execution and stop processing
logService.logExecution(
newExecution,
executionRunning.getExecution(),
Level.INFO,
"Flow is queued due to concurrency limit exceeded, {} running(s)",
count
"Execution is queued due to concurrency limit exceeded, {} running(s)",
runningCount
);
// return the execution queued
yield executor
.withExecutionRunning(executionRunning)
.withExecution(newExecution, "checkConcurrencyLimit");
var newExecution = executionRunning.getExecution().withState(State.Type.QUEUED);
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_QUEUED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_QUEUED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
yield executionRunning
.withExecution(newExecution)
.withConcurrencyState(ExecutionRunning.ConcurrencyState.QUEUED);
}
case CANCEL ->
executor.withExecution(execution.withState(State.Type.CANCELLED), "checkConcurrencyLimit");
executionRunning
.withExecution(executionRunning.getExecution().withState(State.Type.CANCELLED))
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
case FAIL ->
executor.withException(new IllegalStateException("Flow is FAILED due to concurrency limit exceeded"), "checkConcurrencyLimit");
executionRunning
.withExecution(executionRunning.getExecution().failedExecutionFromExecutor(new IllegalStateException("Execution is FAILED due to concurrency limit exceeded")).getExecution())
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
};
}
// if under the limit, update the executor with a RUNNING ExecutionRunning to track them
var executionRunning = new ExecutionRunning(
flow.getTenantId(),
flow.getNamespace(),
flow.getId(),
executor.getExecution(),
ExecutionRunning.ConcurrencyState.RUNNING
);
return executor.withExecutionRunning(executionRunning);
// if under the limit, run it!
return executionRunning
.withExecution(executionRunning.getExecution().withState(State.Type.RUNNING))
.withConcurrencyState(ExecutionRunning.ConcurrencyState.RUNNING);
}
public Executor process(Executor executor) {

View File

@@ -286,18 +286,10 @@ public class FlowableUtils {
// start as many tasks as we have concurrency slots
return collect.values().stream()
.map(resolvedTasks -> filterCreated(resolvedTasks, taskRuns, parentTaskRun))
.map(resolvedTasks -> resolveSequentialNexts(execution, resolvedTasks, null, null, parentTaskRun))
.filter(resolvedTasks -> !resolvedTasks.isEmpty())
.limit(concurrencySlots)
.map(resolvedTasks -> resolvedTasks.getFirst().toNextTaskRun(execution))
.toList();
}
private static List<ResolvedTask> filterCreated(List<ResolvedTask> tasks, List<TaskRun> taskRuns, TaskRun parentTaskRun) {
return tasks.stream()
.filter(resolvedTask -> taskRuns.stream()
.noneMatch(taskRun -> FlowableUtils.isTaskRunFor(resolvedTask, taskRun, parentTaskRun))
)
.map(resolvedTasks -> resolvedTasks.getFirst())
.toList();
}

View File

@@ -102,6 +102,19 @@ public abstract class AbstractDate {
}
if (value instanceof Long longValue) {
if(value.toString().length() == 13) {
return Instant.ofEpochMilli(longValue).atZone(zoneId);
}else if(value.toString().length() == 19 ){
if(value.toString().endsWith("000")){
long seconds = longValue/1_000_000_000;
int nanos = (int) (longValue%1_000_000_000);
return Instant.ofEpochSecond(seconds,nanos).atZone(zoneId);
}else{
long milliseconds = longValue/1_000_000;
int micros = (int) (longValue%1_000_000);
return Instant.ofEpochMilli(milliseconds).atZone(zoneId).withNano(micros*1000);
}
}
return Instant.ofEpochSecond(longValue).atZone(zoneId);
}

View File

@@ -8,6 +8,7 @@ import io.kestra.core.events.CrudEventType;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
@@ -318,7 +319,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
}
synchronized (this) { // we need a sync block as we read then update so we should not do it in multiple threads concurrently
List<Trigger> triggers = triggerState.findAllForAllTenants();
Map<String, Trigger> triggers = triggerState.findAllForAllTenants().stream().collect(Collectors.toMap(HasUID::uid, Function.identity()));
flows
.stream()
@@ -328,7 +329,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
.flatMap(flow -> flow.getTriggers().stream().filter(trigger -> trigger instanceof WorkerTriggerInterface).map(trigger -> new FlowAndTrigger(flow, trigger)))
.distinct()
.forEach(flowAndTrigger -> {
Optional<Trigger> trigger = triggers.stream().filter(t -> t.uid().equals(Trigger.uid(flowAndTrigger.flow(), flowAndTrigger.trigger()))).findFirst(); // must have one or none
String triggerUid = Trigger.uid(flowAndTrigger.flow(), flowAndTrigger.trigger());
Optional<Trigger> trigger = Optional.ofNullable(triggers.get(triggerUid));
if (trigger.isEmpty()) {
RunContext runContext = runContextFactory.of(flowAndTrigger.flow(), flowAndTrigger.trigger());
ConditionContext conditionContext = conditionService.conditionContext(runContext, flowAndTrigger.flow(), null);
@@ -467,9 +469,12 @@ public abstract class AbstractScheduler implements Scheduler, Service {
private List<FlowWithTriggers> computeSchedulable(List<FlowWithSource> flows, List<Trigger> triggerContextsToEvaluate, ScheduleContextInterface scheduleContext) {
List<String> flowToKeep = triggerContextsToEvaluate.stream().map(Trigger::getFlowId).toList();
List<String> flowIds = flows.stream().map(FlowId::uidWithoutRevision).toList();
Map<String, Trigger> triggerById = triggerContextsToEvaluate.stream().collect(Collectors.toMap(HasUID::uid, Function.identity()));
// delete trigger which flow has been deleted
triggerContextsToEvaluate.stream()
.filter(trigger -> !flows.stream().map(FlowId::uidWithoutRevision).toList().contains(FlowId.uid(trigger)))
.filter(trigger -> !flowIds.contains(FlowId.uid(trigger)))
.forEach(trigger -> {
try {
this.triggerState.delete(trigger);
@@ -491,12 +496,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
.map(abstractTrigger -> {
RunContext runContext = runContextFactory.of(flow, abstractTrigger);
ConditionContext conditionContext = conditionService.conditionContext(runContext, flow, null);
Trigger triggerContext = null;
Trigger lastTrigger = triggerContextsToEvaluate
.stream()
.filter(triggerContextToFind -> triggerContextToFind.uid().equals(Trigger.uid(flow, abstractTrigger)))
.findFirst()
.orElse(null);
Trigger triggerContext;
Trigger lastTrigger = triggerById.get(Trigger.uid(flow, abstractTrigger));
// If a trigger is not found in triggers to evaluate, then we ignore it
if (lastTrigger == null) {
return null;

View File

@@ -250,9 +250,10 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
stateLock.lock();
// Optional callback to be executed at the end.
Runnable returnCallback = null;
localServiceState = localServiceState(service);
try {
localServiceState = localServiceState(service);
if (localServiceState == null) {
return null; // service has been unregistered.
}
@@ -301,7 +302,7 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
// Update the local instance
this.serviceRegistry.register(localServiceState.with(remoteInstance));
} catch (Exception e) {
final ServiceInstance localInstance = localServiceState(service).instance();
final ServiceInstance localInstance = localServiceState.instance();
log.error("[Service id={}, type='{}', hostname='{}'] Failed to update state to {}. Error: {}",
localInstance.uid(),
localInstance.type(),
@@ -317,7 +318,7 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
returnCallback.run();
}
}
return localServiceState(service).instance();
return Optional.ofNullable(localServiceState(service)).map(LocalServiceState::instance).orElse(null);
}
private void mayDisableStateUpdate(final Service service, final ServiceInstance instance) {
@@ -371,9 +372,11 @@ public class ServiceLivenessManager extends AbstractServiceLivenessTask {
final Service service,
final ServiceInstance instance,
final boolean isLivenessEnabled) {
// Never shutdown STANDALONE server or WEB_SERVER service.
if (instance.server().type().equals(ServerInstance.Type.STANDALONE) ||
instance.is(ServiceType.WEBSERVER)) {
// Never shutdown STANDALONE server or WEBSERVER and INDEXER services.
if (ServerInstance.Type.STANDALONE.equals(instance.server().type()) ||
instance.is(ServiceType.INDEXER) ||
instance.is(ServiceType.WEBSERVER)
) {
// Force the RUNNING state.
return Optional.of(instance.state(Service.ServiceState.RUNNING, now, null));
}

View File

@@ -9,6 +9,7 @@ import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.validations.ModelValidator;
@@ -51,7 +52,6 @@ import java.util.stream.StreamSupport;
@Singleton
@Slf4j
public class FlowService {
@Inject
Optional<FlowRepositoryInterface> flowRepository;
@@ -236,6 +236,7 @@ public class FlowService {
}
List<String> warnings = new ArrayList<>(checkValidSubflows(flow, tenantId));
List<io.kestra.plugin.core.trigger.Flow> flowTriggers = ListUtils.emptyOnNull(flow.getTriggers()).stream()
.filter(io.kestra.plugin.core.trigger.Flow.class::isInstance)
.map(io.kestra.plugin.core.trigger.Flow.class::cast)
@@ -246,6 +247,21 @@ public class FlowService {
}
});
// add warning for runnable properties (timeout, workerGroup, taskCache) when used not in a runnable
flow.allTasksWithChilds().forEach(task -> {
if (!(task instanceof RunnableTask<?>)) {
if (task.getTimeout() != null) {
warnings.add("The task '" + task.getId() + "' cannot use the 'timeout' property as it's only relevant for runnable tasks.");
}
if (task.getTaskCache() != null) {
warnings.add("The task '" + task.getId() + "' cannot use the 'taskCache' property as it's only relevant for runnable tasks.");
}
if (task.getWorkerGroup() != null) {
warnings.add("The task '" + task.getId() + "' cannot use the 'workerGroup' property as it's only relevant for runnable tasks.");
}
}
});
return warnings;
}

View File

@@ -0,0 +1,77 @@
package io.kestra.core.validations;
import io.micronaut.context.annotation.Context;
import io.micronaut.context.env.Environment;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import java.io.Serial;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.List;
/**
* Enforces validation rules upon the application configuration.
*/
@Slf4j
@Context
public class AppConfigValidator {
private static final String KESTRA_URL_KEY = "kestra.url";
private final Environment environment;
@Inject
public AppConfigValidator(Environment environment) {
this.environment = environment;
}
@PostConstruct
void validate() {
final List<Boolean> validationResults = List.of(
isKestraUrlValid()
);
if (validationResults.contains(false)) {
throw new AppConfigException("Invalid configuration");
}
}
private boolean isKestraUrlValid() {
if (!environment.containsProperty(KESTRA_URL_KEY)) {
return true;
}
final String rawUrl = environment.getProperty(KESTRA_URL_KEY, String.class).orElseThrow();
final URL url;
try {
url = URI.create(rawUrl).toURL();
} catch (IllegalArgumentException | MalformedURLException e) {
log.error(
"Value of the '{}' configuration property must be a valid URL - e.g. https://your.company.com",
KESTRA_URL_KEY
);
return false;
}
if (!List.of("http", "https").contains(url.getProtocol())) {
log.error(
"Value of the '{}' configuration property must contain either HTTP or HTTPS scheme - e.g. https://your.company.com",
KESTRA_URL_KEY
);
return false;
}
return true;
}
public static class AppConfigException extends RuntimeException {
@Serial
private static final long serialVersionUID = 1L;
public AppConfigException(String errorMessage) {
super(errorMessage);
}
}
}

View File

@@ -54,9 +54,10 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
violations.add("Namespace '" + value.getNamespace() + "' does not exist but is required to exist before a flow can be created in it.");
}
List<Task> allTasks = value.allTasksWithChilds();
// tasks unique id
List<String> taskIds = value.allTasksWithChilds()
.stream()
List<String> taskIds = allTasks.stream()
.map(Task::getId)
.toList();
@@ -72,8 +73,8 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
violations.add("Duplicate trigger id with name [" + String.join(", ", duplicateIds) + "]");
}
value.allTasksWithChilds()
.stream().filter(task -> task instanceof ExecutableTask<?> executableTask
allTasks.stream()
.filter(task -> task instanceof ExecutableTask<?> executableTask
&& value.getId().equals(executableTask.subflowId().flowId())
&& value.getNamespace().equals(executableTask.subflowId().namespace()))
.forEach(task -> violations.add("Recursive call to flow [" + value.getNamespace() + "." + value.getId() + "]"));
@@ -102,7 +103,7 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
.map(input -> Pattern.compile("\\{\\{\\s*inputs." + input.getId() + "\\s*\\}\\}"))
.collect(Collectors.toList());
List<String> invalidTasks = value.allTasks()
List<String> invalidTasks = allTasks.stream()
.filter(task -> checkObjectFieldsWithPatterns(task, inputsWithMinusPatterns))
.map(task -> task.getId())
.collect(Collectors.toList());
@@ -112,12 +113,12 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
" [" + String.join(", ", invalidTasks) + "]");
}
List<Pattern> outputsWithMinusPattern = value.allTasks()
List<Pattern> outputsWithMinusPattern = allTasks.stream()
.filter(output -> Optional.ofNullable(output.getId()).orElse("").contains("-"))
.map(output -> Pattern.compile("\\{\\{\\s*outputs\\." + output.getId() + "\\.[^}]+\\s*\\}\\}"))
.collect(Collectors.toList());
invalidTasks = value.allTasks()
invalidTasks = allTasks.stream()
.filter(task -> checkObjectFieldsWithPatterns(task, outputsWithMinusPattern))
.map(task -> task.getId())
.collect(Collectors.toList());

View File

@@ -19,7 +19,6 @@ import lombok.experimental.SuperBuilder;
@NoArgsConstructor
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
@EqualsAndHashCode
//@TriggersDataFilterValidation
@Schema(
title = "Display Execution data in a dashboard chart.",
description = "Execution data can be displayed in charts broken out by Namespace and filtered by State, for example."

View File

@@ -111,8 +111,9 @@ public class Labels extends Task implements ExecutionUpdatableTask {
})
).collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue
));
Map.Entry::getValue,
(first, second) -> second)
);
} else if (labels instanceof Map<?, ?> map) {
labelsAsMap = map.entrySet()
.stream()

View File

@@ -208,48 +208,50 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
return Optional.empty();
}
boolean isOutputsAllowed = runContext
.<Boolean>pluginConfiguration(PLUGIN_FLOW_OUTPUTS_ENABLED)
.orElse(true);
final Output.OutputBuilder builder = Output.builder()
.executionId(execution.getId())
.state(execution.getState().getCurrent());
final Map<String, Object> subflowOutputs = Optional
.ofNullable(flow.getOutputs())
.map(outputs -> outputs
.stream()
.collect(Collectors.toMap(
io.kestra.core.models.flows.Output::getId,
io.kestra.core.models.flows.Output::getValue)
)
)
.orElseGet(() -> isOutputsAllowed ? this.getOutputs() : null);
VariablesService variablesService = ((DefaultRunContext) runContext).getApplicationContext().getBean(VariablesService.class);
if (subflowOutputs != null) {
try {
Map<String, Object> outputs = runContext.render(subflowOutputs);
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
if (flow.getOutputs() != null && flowInputOutput != null) {
outputs = flowInputOutput.typedOutputs(flow, execution, outputs);
}
builder.outputs(outputs);
} catch (Exception e) {
runContext.logger().warn("Failed to extract outputs with the error: '{}'", e.getLocalizedMessage(), e);
var state = State.Type.fail(this);
Variables variables = variablesService.of(StorageContext.forTask(taskRun), builder.build());
taskRun = taskRun
.withState(state)
.withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(state)).build()))
.withOutputs(variables);
if (this.wait) { // we only compute outputs if we wait for the subflow
boolean isOutputsAllowed = runContext
.<Boolean>pluginConfiguration(PLUGIN_FLOW_OUTPUTS_ENABLED)
.orElse(true);
return Optional.of(SubflowExecutionResult.builder()
.executionId(execution.getId())
.state(State.Type.FAILED)
.parentTaskRun(taskRun)
.build());
final Map<String, Object> subflowOutputs = Optional
.ofNullable(flow.getOutputs())
.map(outputs -> outputs
.stream()
.collect(Collectors.toMap(
io.kestra.core.models.flows.Output::getId,
io.kestra.core.models.flows.Output::getValue)
)
)
.orElseGet(() -> isOutputsAllowed ? this.getOutputs() : null);
if (subflowOutputs != null) {
try {
Map<String, Object> outputs = runContext.render(subflowOutputs);
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
if (flow.getOutputs() != null && flowInputOutput != null) {
outputs = flowInputOutput.typedOutputs(flow, execution, outputs);
}
builder.outputs(outputs);
} catch (Exception e) {
runContext.logger().warn("Failed to extract outputs with the error: '{}'", e.getLocalizedMessage(), e);
var state = State.Type.fail(this);
Variables variables = variablesService.of(StorageContext.forTask(taskRun), builder.build());
taskRun = taskRun
.withState(state)
.withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(state)).build()))
.withOutputs(variables);
return Optional.of(SubflowExecutionResult.builder()
.executionId(execution.getId())
.state(State.Type.FAILED)
.parentTaskRun(taskRun)
.build());
}
}
}

View File

@@ -0,0 +1,11 @@
<svg width="512" height="512" viewBox="0 0 512 512" fill="currentColor" xmlns="http://www.w3.org/2000/svg">
<g clip-path="url(#clip0_1765_9330)">
<path d="M244.592 215.915C251.569 208.938 262.881 208.938 269.858 215.915L298.537 244.595C305.514 251.572 305.514 262.883 298.537 269.86L269.858 298.54C262.881 305.517 251.569 305.517 244.592 298.54L215.913 269.86C208.936 262.883 208.936 251.572 215.913 244.595L244.592 215.915Z" />
<path d="M376.685 215.687C383.537 208.835 394.646 208.835 401.498 215.687L430.63 244.818C437.482 251.67 437.482 262.78 430.63 269.632L401.498 298.763C394.646 305.615 383.537 305.615 376.685 298.763L347.553 269.632C340.701 262.78 340.701 251.67 347.553 244.818L376.685 215.687Z" />
<path d="M244.818 83.8243C251.671 76.9722 262.78 76.9722 269.632 83.8243L298.763 112.956C305.616 119.808 305.616 130.917 298.763 137.769L269.632 166.901C262.78 173.753 251.671 173.753 244.818 166.901L215.687 137.769C208.835 130.917 208.835 119.808 215.687 112.956L244.818 83.8243Z" />
<path d="M232.611 178.663C239.588 185.64 239.588 196.951 232.611 203.928L203.931 232.608C196.955 239.585 185.643 239.585 178.666 232.608L149.986 203.928C143.01 196.952 143.01 185.64 149.986 178.663L178.666 149.983C185.643 143.006 196.955 143.006 203.931 149.983L232.611 178.663Z" />
<path d="M166.901 244.818C173.753 251.67 173.753 262.78 166.901 269.632L137.77 298.763C130.918 305.615 119.808 305.615 112.956 298.763L83.8246 269.632C76.9725 262.78 76.9725 251.67 83.8246 244.818L112.956 215.687C119.808 208.835 130.918 208.835 137.77 215.687L166.901 244.818Z" />
<path d="M364.472 178.663C371.449 185.64 371.449 196.951 364.472 203.928L335.793 232.608C328.816 239.585 317.504 239.585 310.527 232.608L281.848 203.928C274.871 196.952 274.871 185.64 281.848 178.663L310.527 149.983C317.504 143.006 328.816 143.006 335.793 149.983L364.472 178.663Z" />
<path d="M285.45 367.015C301.037 382.602 301.037 407.873 285.45 423.46C269.863 439.047 244.591 439.047 229.004 423.46C213.417 407.873 213.417 382.602 229.004 367.015C244.591 351.428 269.863 351.428 285.45 367.015Z" />
</g>
</svg>

After

Width:  |  Height:  |  Size: 2.1 KiB

View File

@@ -0,0 +1,11 @@
<svg width="512" height="512" viewBox="0 0 512 512" fill="currentColor" xmlns="http://www.w3.org/2000/svg">
<g clip-path="url(#clip0_1765_9330)">
<path d="M244.592 215.915C251.569 208.938 262.881 208.938 269.858 215.915L298.537 244.595C305.514 251.572 305.514 262.883 298.537 269.86L269.858 298.54C262.881 305.517 251.569 305.517 244.592 298.54L215.913 269.86C208.936 262.883 208.936 251.572 215.913 244.595L244.592 215.915Z" />
<path d="M376.685 215.687C383.537 208.835 394.646 208.835 401.498 215.687L430.63 244.818C437.482 251.67 437.482 262.78 430.63 269.632L401.498 298.763C394.646 305.615 383.537 305.615 376.685 298.763L347.553 269.632C340.701 262.78 340.701 251.67 347.553 244.818L376.685 215.687Z" />
<path d="M244.818 83.8243C251.671 76.9722 262.78 76.9722 269.632 83.8243L298.763 112.956C305.616 119.808 305.616 130.917 298.763 137.769L269.632 166.901C262.78 173.753 251.671 173.753 244.818 166.901L215.687 137.769C208.835 130.917 208.835 119.808 215.687 112.956L244.818 83.8243Z" />
<path d="M232.611 178.663C239.588 185.64 239.588 196.951 232.611 203.928L203.931 232.608C196.955 239.585 185.643 239.585 178.666 232.608L149.986 203.928C143.01 196.952 143.01 185.64 149.986 178.663L178.666 149.983C185.643 143.006 196.955 143.006 203.931 149.983L232.611 178.663Z" />
<path d="M166.901 244.818C173.753 251.67 173.753 262.78 166.901 269.632L137.77 298.763C130.918 305.615 119.808 305.615 112.956 298.763L83.8246 269.632C76.9725 262.78 76.9725 251.67 83.8246 244.818L112.956 215.687C119.808 208.835 130.918 208.835 137.77 215.687L166.901 244.818Z" />
<path d="M364.472 178.663C371.449 185.64 371.449 196.951 364.472 203.928L335.793 232.608C328.816 239.585 317.504 239.585 310.527 232.608L281.848 203.928C274.871 196.952 274.871 185.64 281.848 178.663L310.527 149.983C317.504 143.006 328.816 143.006 335.793 149.983L364.472 178.663Z" />
<path d="M285.45 367.015C301.037 382.602 301.037 407.873 285.45 423.46C269.863 439.047 244.591 439.047 229.004 423.46C213.417 407.873 213.417 382.602 229.004 367.015C244.591 351.428 269.863 351.428 285.45 367.015Z" />
</g>
</svg>

After

Width:  |  Height:  |  Size: 2.1 KiB

View File

@@ -1,11 +1,12 @@
package io.kestra.core.models;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class LabelTest {
@Test
@@ -15,9 +16,8 @@ class LabelTest {
new Label(Label.CORRELATION_ID, "id"))
);
Assertions.assertEquals(
Map.of("system", Map.of("username", "test", "correlationId", "id")),
result
assertThat(result).isEqualTo(
Map.of("system", Map.of("username", "test", "correlationId", "id"))
);
}
@@ -29,9 +29,48 @@ class LabelTest {
new Label(Label.CORRELATION_ID, "id"))
);
Assertions.assertEquals(
Map.of("system", Map.of("username", "test1", "correlationId", "id")),
result
assertThat(result).isEqualTo(
Map.of("system", Map.of("username", "test2", "correlationId", "id"))
);
}
@Test
void shouldGetMapGivenDistinctLabels() {
Map<String, String> result = Label.toMap(List.of(
new Label(Label.USERNAME, "test"),
new Label(Label.CORRELATION_ID, "id"))
);
assertThat(result).isEqualTo(
Map.of(Label.USERNAME, "test", Label.CORRELATION_ID, "id")
);
}
@Test
void shouldGetMapGivenDuplicateLabels() {
Map<String, String> result = Label.toMap(List.of(
new Label(Label.USERNAME, "test1"),
new Label(Label.USERNAME, "test2"),
new Label(Label.CORRELATION_ID, "id"))
);
assertThat(result).isEqualTo(
Map.of(Label.USERNAME, "test2", Label.CORRELATION_ID, "id")
);
}
@Test
void shouldDuplicateLabelsWithKeyOrderKept() {
List<Label> result = Label.deduplicate(List.of(
new Label(Label.USERNAME, "test1"),
new Label(Label.USERNAME, "test2"),
new Label(Label.CORRELATION_ID, "id"),
new Label(Label.USERNAME, "test3"))
);
assertThat(result).containsExactly(
new Label(Label.USERNAME, "test3"),
new Label(Label.CORRELATION_ID, "id")
);
}
}

View File

@@ -2,6 +2,7 @@ package io.kestra.core.models.executions;
import io.kestra.core.models.Label;
import io.kestra.core.utils.IdUtils;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import io.kestra.core.models.flows.State;
@@ -157,7 +158,58 @@ class ExecutionTest {
.labels(List.of(new Label("test", "test-value")))
.build();
assertThat(execution.getLabels().size()).isEqualTo(1);
assertThat(execution.getLabels().getFirst()).isEqualTo(new Label("test", "test-value"));
assertThat(execution.getLabels()).containsExactly(new Label("test", "test-value"));
}
@Test
void labelsGetDeduplicated() {
final List<Label> duplicatedLabels = List.of(
new Label("test", "value1"),
new Label("test", "value2")
);
final Execution executionWithLabels = Execution.builder()
.build()
.withLabels(duplicatedLabels);
assertThat(executionWithLabels.getLabels()).containsExactly(new Label("test", "value2"));
final Execution executionBuilder = Execution.builder()
.labels(duplicatedLabels)
.build();
assertThat(executionBuilder.getLabels()).containsExactly(new Label("test", "value2"));
}
@Test
@Disabled("Solve label deduplication on instantization")
void labelsGetDeduplicatedOnNewInstance() {
final List<Label> duplicatedLabels = List.of(
new Label("test", "value1"),
new Label("test", "value2")
);
final Execution executionNew = new Execution(
"foo",
"id",
"namespace",
"flowId",
1,
Collections.emptyList(),
Map.of(),
Map.of(),
duplicatedLabels,
Map.of(),
State.of(State.Type.SUCCESS, Collections.emptyList()),
"parentId",
"originalId",
null,
false,
null,
null,
null,
null,
null,
null
);
assertThat(executionNew.getLabels()).containsExactly(new Label("test", "value2"));
}
}

View File

@@ -44,6 +44,7 @@ import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static io.kestra.core.models.flows.FlowScope.USER;
@@ -740,4 +741,16 @@ public abstract class AbstractExecutionRepositoryTest {
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
assertThat(executions.size()).isEqualTo(0L);
}
@Test
protected void shouldReturnLastExecutionsWhenInputsAreNull() {
inject();
List<Execution> lastExecutions = executionRepository.lastExecutions(MAIN_TENANT, null);
assertThat(lastExecutions).isNotEmpty();
Set<String> flowIds = lastExecutions.stream().map(Execution::getFlowId).collect(Collectors.toSet());
assertThat(flowIds.size()).isEqualTo(lastExecutions.size());
}
}

View File

@@ -46,7 +46,7 @@ public abstract class AbstractLogRepositoryTest {
.flowId("flowId")
.namespace("io.kestra.unittest")
.taskId("taskId")
.executionId(IdUtils.create())
.executionId("executionId")
.taskRunId(IdUtils.create())
.attemptNumber(0)
.timestamp(Instant.now())
@@ -293,19 +293,23 @@ public abstract class AbstractLogRepositoryTest {
ZonedDateTime startDate = ZonedDateTime.now().minusSeconds(1);
Flux<LogEntry> find = logRepository.findAsync(MAIN_TENANT, "io.kestra.unittest", Level.INFO, startDate);
Flux<LogEntry> find = logRepository.findAsync(MAIN_TENANT, "io.kestra.unittest", null, null, Level.INFO, startDate);
List<LogEntry> logEntries = find.collectList().block();
assertThat(logEntries).hasSize(3);
find = logRepository.findAsync(MAIN_TENANT, null, Level.ERROR, startDate);
find = logRepository.findAsync(MAIN_TENANT, null, null, null, Level.ERROR, startDate);
logEntries = find.collectList().block();
assertThat(logEntries).hasSize(1);
find = logRepository.findAsync(MAIN_TENANT, "io.kestra.unused", Level.INFO, startDate);
find = logRepository.findAsync(MAIN_TENANT, "io.kestra.unittest", "flowId", null, Level.ERROR, startDate);
logEntries = find.collectList().block();
assertThat(logEntries).hasSize(1);
find = logRepository.findAsync(MAIN_TENANT, "io.kestra.unused", "flowId", "executionId", Level.INFO, startDate);
logEntries = find.collectList().block();
assertThat(logEntries).hasSize(0);
find = logRepository.findAsync(MAIN_TENANT, null, Level.INFO, startDate.plusSeconds(2));
find = logRepository.findAsync(MAIN_TENANT, null, null, null, Level.INFO, startDate.plusSeconds(2));
logEntries = find.collectList().block();
assertThat(logEntries).hasSize(0);
}

View File

@@ -417,6 +417,12 @@ public abstract class AbstractRunnerTest {
flowConcurrencyCaseTest.flowConcurrencyCancelPause();
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-for-each-item.yaml", "flows/valids/flow-concurrency-queue.yml"})
protected void flowConcurrencyWithForEachItem() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem();
}
@Test
@ExecuteFlow("flows/valids/executable-fail.yml")
void badExecutable(Execution execution) {

View File

@@ -8,18 +8,28 @@ import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.StringUtils;
import reactor.core.publisher.Flux;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.time.Duration;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
@@ -28,7 +38,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Singleton
public class FlowConcurrencyCaseTest {
@Inject
private RunnerUtils runnerUtils;
private StorageInterface storageInterface;
@Inject
protected RunnerUtils runnerUtils;
@Inject
private FlowInputOutput flowIO;
@Inject
private FlowRepositoryInterface flowRepository;
@@ -237,4 +253,49 @@ public class FlowConcurrencyCaseTest {
assertThat(secondExecutionResult.get().getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
assertThat(secondExecutionResult.get().getState().getHistories().get(1).getState()).isEqualTo(State.Type.CANCELLED);
}
public void flowConcurrencyWithForEachItem() throws TimeoutException, QueueException, InterruptedException, URISyntaxException, IOException {
URI file = storageUpload();
Map<String, Object> inputs = Map.of("file", file.toString(), "batch", 4);
Execution forEachItem = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-for-each-item", null,
(flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs), Duration.ofSeconds(5));
assertThat(forEachItem.getState().getCurrent()).isEqualTo(Type.RUNNING);
Set<String> executionIds = new HashSet<>();
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
if ("flow-concurrency-queue".equals(e.getLeft().getFlowId()) && e.getLeft().getState().isRunning()) {
executionIds.add(e.getLeft().getId());
}
});
// wait a little to be sure there are not too many executions started
Thread.sleep(500);
assertThat(executionIds).hasSize(1);
receive.blockLast();
Execution terminated = runnerUtils.awaitExecution(e -> e.getId().equals(forEachItem.getId()) && e.getState().isTerminated(), () -> {}, Duration.ofSeconds(10));
assertThat(terminated.getState().getCurrent()).isEqualTo(Type.SUCCESS);
}
private URI storageUpload() throws URISyntaxException, IOException {
File tempFile = File.createTempFile("file", ".txt");
Files.write(tempFile.toPath(), content());
return storageInterface.put(
MAIN_TENANT,
null,
new URI("/file/storage/file.txt"),
new FileInputStream(tempFile)
);
}
private List<String> content() {
return IntStream
.range(0, 7)
.mapToObj(value -> StringUtils.leftPad(value + "", 20))
.toList();
}
}

View File

@@ -273,7 +273,7 @@ public class RestartCaseTest {
// wait
Execution finishedRestartedExecution = runnerUtils.awaitExecution(
execution -> execution.getState().getCurrent() == State.Type.SUCCESS && execution.getId().equals(firstExecution.getId()),
execution -> executionService.isTerminated(flow, execution) && execution.getState().isSuccess() && execution.getId().equals(firstExecution.getId()),
throwRunnable(() -> {
Execution restartedExec = executionService.restart(firstExecution, null);
assertThat(restartedExec).isNotNull();

View File

@@ -9,6 +9,7 @@ import io.kestra.core.utils.TestsUtils;
import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.event.Level;
@@ -109,7 +110,8 @@ class RunContextLoggerTest {
logger.info("test myawesomepassmyawesomepass myawesomepass myawesomepassmyawesomepass");
logger.warn("test {}", URI.create("http://it-s.secret"));
matchingLog = TestsUtils.awaitLogs(logs, 3);
// the 3 logs will create 4 log entries as exceptions stacktraces are logged separately at the TRACE level
matchingLog = TestsUtils.awaitLogs(logs, 4);
receive.blockLast();
assertThat(matchingLog.stream().filter(logEntry -> logEntry.getLevel().equals(Level.DEBUG)).findFirst().orElseThrow().getMessage()).isEqualTo("test john@****** test");
assertThat(matchingLog.stream().filter(logEntry -> logEntry.getLevel().equals(Level.TRACE)).findFirst().orElseThrow().getMessage()).contains("exception from doe.com");

View File

@@ -71,6 +71,8 @@ class DateFilterTest {
{{ "2013-09-08T17:19:12+02:00" | date(timeZone="Europe/Paris") }}
{{ "2013-09-08T17:19:12" | date(timeZone="Europe/Paris") }}
{{ "2013-09-08" | date(timeZone="Europe/Paris") }}
{{ "08.09.2023" | date("yyyy-MM-dd", existingFormat="dd.MM.yyyy") }}
{{ "08092023" | date("yyyy-MM-dd", existingFormat="ddMMyyyy") }}
""",
Map.of()
);
@@ -80,6 +82,8 @@ class DateFilterTest {
2013-09-08T17:19:12.000000+02:00
2013-09-08T17:19:12.000000+02:00
2013-09-08T00:00:00.000000+02:00
2023-09-08
2023-09-08
""");
}
@@ -171,7 +175,9 @@ class DateFilterTest {
render = variableRenderer.render("{{ now(format=\"sql_milli\") }}", ImmutableMap.of());
assertThat(render).isEqualTo(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));
// a millisecond can pass between the render and now so we can't assert on a precise to millisecond date
assertThat(render).startsWith(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
assertThat(render).hasSize(23);
}
@Test
@@ -185,4 +191,41 @@ class DateFilterTest {
assertThat(render).isEqualTo("2013-09-07T17:19:12.123456+02:00");
}
@Test
void timestampDateFormat() throws IllegalVariableEvaluationException {
String render =
variableRenderer.render(
"""
{{ 1378653552 | date(format="iso_sec", timeZone="Europe/Paris") }}
{{ 1378653552123 | date(format="iso_milli", timeZone="Europe/Paris") }}
{{ 1378653552123 | date(timeZone="Europe/Paris") }}
{{ 1378653552123 | date(format="iso_zoned_date_time", timeZone="Europe/Paris") }}
{{ 1378653552123456000 | date(format="iso", timeZone="Europe/Paris") }}
{{ 1378653552000123456 | date(format="iso", timeZone="Europe/Paris") }}
{{ 1378653552 | date(format="sql_sec", timeZone="Europe/Paris") }}
{{ 1378653552123 | date(format="sql_milli", timeZone="Europe/Paris") }}
{{ 1378653552123456000 | date(format="sql", timeZone="Europe/Paris") }}
{{ 1378653552000123456 | date(format="sql", timeZone="Europe/Paris") }}
{{ 1378653552123 | date(format="sql_milli", timeZone="UTC") }}
{{ "1378653552123" | number | date(format="sql_milli", timeZone="UTC") }}
""",
Map.of());
assertThat(render).isEqualTo("""
2013-09-08T17:19:12+02:00
2013-09-08T17:19:12.123+02:00
2013-09-08T17:19:12.123000+02:00
2013-09-08T17:19:12.123+02:00[Europe/Paris]
2013-09-08T17:19:12.123456+02:00
2013-09-08T17:19:12.123456+02:00
2013-09-08 17:19:12
2013-09-08 17:19:12.123
2013-09-08 17:19:12.123456
2013-09-08 17:19:12.123456
2013-09-08 15:19:12.123
2013-09-08 15:19:12.123
""");
}
}

View File

@@ -372,4 +372,44 @@ class FlowServiceTest {
assertThat(exceptions.size()).isZero();
}
@Test
void shouldReturnValidationForRunnablePropsOnFlowable() {
// Given
String source = """
id: dolphin_164914
namespace: company.team
tasks:
- id: for
type: io.kestra.plugin.core.flow.ForEach
values: [1, 2, 3]
workerGroup:
key: toto
timeout: PT10S
taskCache:
enabled: true
tasks:
- id: hello
type: io.kestra.plugin.core.log.Log
message: Hello World! 🚀
workerGroup:
key: toto
timeout: PT10S
taskCache:
enabled: true
""";
// When
List<ValidateConstraintViolation> results = flowService.validate("my-tenant", source);
// Then
assertThat(results).hasSize(1);
assertThat(results.getFirst().getWarnings()).hasSize(3);
assertThat(results.getFirst().getWarnings()).containsExactlyInAnyOrder(
"The task 'for' cannot use the 'timeout' property as it's only relevant for runnable tasks.",
"The task 'for' cannot use the 'taskCache' property as it's only relevant for runnable tasks.",
"The task 'for' cannot use the 'workerGroup' property as it's only relevant for runnable tasks."
);
}
}

View File

@@ -0,0 +1,77 @@
package io.kestra.core.validations;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.exceptions.BeanInstantiationException;
import org.junit.jupiter.api.Test;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
class AppConfigValidatorTest {
@Test
void validateNoKestraUrl() {
assertThatCode(() -> {
try (ApplicationContext context = ApplicationContext.run()) {
context.getBean(AppConfigValidator.class);
}
})
.as("The bean got initialized properly including the PostConstruct validation")
.doesNotThrowAnyException();
}
@Test
void validateValidKestraUrl() {
assertThatCode(() -> {
try (ApplicationContext context = ApplicationContext.builder()
.deduceEnvironment(false)
.properties(
Map.of("kestra.url", "https://postgres-oss.preview.dev.kestra.io")
)
.start()
) {
context.getBean(AppConfigValidator.class);
}
})
.as("The bean got initialized properly including the PostConstruct validation")
.doesNotThrowAnyException();
}
@Test
void validateInvalidKestraUrl() {
assertThatThrownBy(() -> {
try (ApplicationContext context = ApplicationContext.builder()
.deduceEnvironment(false)
.properties(
Map.of("kestra.url", "postgres-oss.preview.dev.kestra.io")
)
.start()
) {
context.getBean(AppConfigValidator.class);
}
})
.as("The bean initialization failed at PostConstruct")
.isInstanceOf(BeanInstantiationException.class)
.hasMessageContaining("Invalid configuration");
}
@Test
void validateNonHttpKestraUrl() {
assertThatThrownBy(() -> {
try (ApplicationContext context = ApplicationContext.builder()
.deduceEnvironment(false)
.properties(
Map.of("kestra.url", "ftp://postgres-oss.preview.dev.kestra.io")
)
.start()
) {
context.getBean(AppConfigValidator.class);
}
})
.as("The bean initialization failed at PostConstruct")
.isInstanceOf(BeanInstantiationException.class)
.hasMessageContaining("Invalid configuration");
}
}

View File

@@ -54,4 +54,10 @@ class ForEachTest {
assertThat(execution.findTaskRunsByTaskId("e1").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(execution.findTaskRunsByTaskId("e2").getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
}
@Test
@ExecuteFlow("flows/valids/foreach-nested.yaml")
void nested(Execution execution) {
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
}
}

View File

@@ -2,7 +2,6 @@ package io.kestra.plugin.core.flow;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.is;
import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.KestraTest;
@@ -104,4 +103,61 @@ class RuntimeLabelsTest {
new Label("taskRunId", labelsTaskRunId),
new Label("existingLabel", "someValue"));
}
@Test
@LoadFlows({"flows/valids/primitive-labels-flow.yml"})
void primitiveTypeLabelsOverrideExistingLabels() throws TimeoutException, QueueException {
Execution execution = runnerUtils.runOne(
MAIN_TENANT,
"io.kestra.tests",
"primitive-labels-flow",
null,
(flow, createdExecution) -> Map.of(
"intLabel", 42,
"boolLabel", true,
"floatLabel", 3.14f
),
null,
List.of(
new Label("intValue", "1"),
new Label("boolValue", "false"),
new Label("floatValue", "4.2f")
)
);
assertThat(execution.getTaskRunList()).hasSize(1);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
String labelsTaskRunId = execution.findTaskRunsByTaskId("update-labels").getFirst().getId();
assertThat(execution.getLabels()).containsExactlyInAnyOrder(
new Label(Label.CORRELATION_ID, execution.getId()),
new Label("intValue", "42"),
new Label("boolValue", "true"),
new Label("floatValue", "3.14"),
new Label("taskRunId", labelsTaskRunId));
}
@Test
@LoadFlows({"flows/valids/labels-update-task-deduplicate.yml"})
void updateGetsDeduplicated() throws TimeoutException, QueueException {
Execution execution = runnerUtils.runOne(
MAIN_TENANT,
"io.kestra.tests",
"labels-update-task-deduplicate",
null,
(flow, createdExecution) -> Map.of(),
null,
List.of()
);
assertThat(execution.getTaskRunList()).hasSize(2);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(execution.getLabels()).containsExactlyInAnyOrder(
new Label(Label.CORRELATION_ID, execution.getId()),
new Label("fromStringKey", "value2"),
new Label("fromListKey", "value2")
);
}
}

View File

@@ -0,0 +1,90 @@
package io.kestra.plugin.core.flow;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.runners.RunnerUtils;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.junit.jupiter.api.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
@KestraTest(startRunner = true)
class SubflowRunnerTest {
@Inject
private RunnerUtils runnerUtils;
@Inject
private ExecutionRepositoryInterface executionRepository;
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue;
@Test
@LoadFlows({"flows/valids/subflow-inherited-labels-child.yaml", "flows/valids/subflow-inherited-labels-parent.yaml"})
void inheritedLabelsAreOverridden() throws QueueException, TimeoutException {
Execution parentExecution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "subflow-inherited-labels-parent");
assertThat(parentExecution.getLabels()).containsExactlyInAnyOrder(
new Label(Label.CORRELATION_ID, parentExecution.getId()),
new Label("parentFlowLabel1", "value1"),
new Label("parentFlowLabel2", "value2")
);
String childExecutionId = (String) parentExecution.findTaskRunsByTaskId("launch").getFirst().getOutputs().get("executionId");
assertThat(childExecutionId).isNotBlank();
Execution childExecution = executionRepository.findById(MAIN_TENANT, childExecutionId).orElseThrow();
assertThat(childExecution.getLabels()).containsExactlyInAnyOrder(
new Label(Label.CORRELATION_ID, parentExecution.getId()), // parent's correlation ID
new Label("childFlowLabel1", "value1"), // defined by the subtask flow
new Label("childFlowLabel2", "value2"), // defined by the subtask flow
new Label("launchTaskLabel", "launchFoo"), // added by Subtask
new Label("parentFlowLabel1", "launchBar"), // overridden by Subtask
new Label("parentFlowLabel2", "value2") // inherited from the parent flow
);
}
@Test
@LoadFlows({"flows/valids/subflow-parent-no-wait.yaml", "flows/valids/subflow-child-with-output.yaml"})
void subflowOutputWithoutWait() throws QueueException, TimeoutException, InterruptedException {
AtomicReference<Execution> childExecution = new AtomicReference<>();
CountDownLatch countDownLatch = new CountDownLatch(1);
Runnable closing = executionQueue.receive(either -> {
if (either.isLeft() && either.getLeft().getFlowId().equals("subflow-child-with-output") && either.getLeft().getState().isTerminated()) {
childExecution.set(either.getLeft());
countDownLatch.countDown();
}
});
Execution parentExecution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "subflow-parent-no-wait");
String childExecutionId = (String) parentExecution.findTaskRunsByTaskId("subflow").getFirst().getOutputs().get("executionId");
assertThat(childExecutionId).isNotBlank();
assertThat(parentExecution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(parentExecution.getTaskRunList()).hasSize(1);
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
assertThat(childExecution.get().getId()).isEqualTo(childExecutionId);
assertThat(childExecution.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(childExecution.get().getTaskRunList()).hasSize(1);
closing.run();
}
}

View File

@@ -0,0 +1,21 @@
id: flow-concurrency-for-each-item
namespace: io.kestra.tests
inputs:
- id: file
type: FILE
- id: batch
type: INT
tasks:
- id: each
type: io.kestra.plugin.core.flow.ForEachItem
items: "{{ inputs.file }}"
batch:
rows: "{{inputs.batch}}"
namespace: io.kestra.tests
flowId: flow-concurrency-queue
wait: true
transmitFailed: true
inputs:
items: "{{ taskrun.items }}"

View File

@@ -0,0 +1,21 @@
id: foreach-nested
namespace: io.kestra.tests
tasks:
- id: each0
type: io.kestra.plugin.core.flow.ForEach
values: ["l1", "l2"]
tasks:
- id: each1
type: io.kestra.plugin.core.flow.ForEach
concurrencyLimit: 0
values: ["d1", "d2", "d3"]
tasks:
- id: p1
type: io.kestra.plugin.core.debug.Return
format: "{{ parent.taskrun.value }}-{{ taskrun.value }}"
- id: p2
type: io.kestra.plugin.core.debug.Return
format: "{{ outputs.p1[parent.taskrun.value][taskrun.value].value }}"
- id: test
type: io.kestra.plugin.core.debug.Return
format: "{{ outputs.p1 }}"

View File

@@ -0,0 +1,14 @@
id: labels-update-task-deduplicate
namespace: io.kestra.tests
tasks:
- id: from-string
type: io.kestra.plugin.core.execution.Labels
labels: "{ \"fromStringKey\": \"value1\", \"fromStringKey\": \"value2\" }"
- id: from-list
type: io.kestra.plugin.core.execution.Labels
labels:
- key: "fromListKey"
value: "value1"
- key: "fromListKey"
value: "value2"

View File

@@ -0,0 +1,12 @@
id: subflow-child-with-output
namespace: io.kestra.tests
tasks:
- id: return
type: io.kestra.plugin.core.debug.Return
format: "Some value"
outputs:
- id: flow_a_output
type: STRING
value: "{{ outputs.return.value }}"

View File

@@ -0,0 +1,11 @@
id: subflow-inherited-labels-child
namespace: io.kestra.tests
labels:
childFlowLabel1: value1
childFlowLabel2: value2
tasks:
- id: return
type: io.kestra.plugin.core.log.Log
message: "{{ execution.id }}"

View File

@@ -0,0 +1,18 @@
id: subflow-inherited-labels-parent
namespace: io.kestra.tests
labels:
parentFlowLabel1: value1
parentFlowLabel2: value2
tasks:
- id: launch
type: io.kestra.plugin.core.flow.Subflow
namespace: io.kestra.tests
flowId: subflow-inherited-labels-child
wait: true
transmitFailed: true
inheritLabels: true
labels:
launchTaskLabel: launchFoo
parentFlowLabel1: launchBar

View File

@@ -0,0 +1,9 @@
id: subflow-parent-no-wait
namespace: io.kestra.tests
tasks:
- id: subflow
type: io.kestra.plugin.core.flow.Subflow
namespace: io.kestra.tests
flowId: subflow-child-with-output
wait: false

View File

@@ -1,4 +1,4 @@
version=0.24.0-SNAPSHOT
version=1.0.0-SNAPSHOT
org.gradle.jvmargs=-Xmx2g -XX:MaxMetaspaceSize=512m -XX:+HeapDumpOnOutOfMemoryError
org.gradle.parallel=true

View File

@@ -0,0 +1,15 @@
package io.kestra.runner.h2;
import io.kestra.core.runners.ExecutionRunning;
import io.kestra.jdbc.runner.AbstractJdbcExecutionRunningStorage;
import io.kestra.repository.h2.H2Repository;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@H2QueueEnabled
public class H2ExecutionRunningStorage extends AbstractJdbcExecutionRunningStorage {
public H2ExecutionRunningStorage(@Named("executionrunning") H2Repository<ExecutionRunning> repository) {
super(repository);
}
}

View File

@@ -144,4 +144,12 @@ public class H2QueueFactory implements QueueFactoryInterface {
public QueueInterface<SubflowExecutionEnd> subflowExecutionEnd() {
return new H2Queue<>(SubflowExecutionEnd.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<ExecutionRunning> executionRunning() {
return new H2Queue<>(ExecutionRunning.class, applicationContext);
}
}

View File

@@ -0,0 +1,29 @@
CREATE TABLE IF NOT EXISTS execution_running (
"key" VARCHAR(250) NOT NULL PRIMARY KEY,
"value" TEXT NOT NULL,
"tenant_id" VARCHAR(250) GENERATED ALWAYS AS (JQ_STRING("value", '.tenantId')),
"namespace" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.namespace')),
"flow_id" VARCHAR(150) NOT NULL GENERATED ALWAYS AS (JQ_STRING("value", '.flowId'))
);
CREATE INDEX IF NOT EXISTS execution_running__flow ON execution_running ("tenant_id", "namespace", "flow_id");
ALTER TABLE queues ALTER COLUMN "type" ENUM(
'io.kestra.core.models.executions.Execution',
'io.kestra.core.models.templates.Template',
'io.kestra.core.models.executions.ExecutionKilled',
'io.kestra.core.runners.WorkerJob',
'io.kestra.core.runners.WorkerTaskResult',
'io.kestra.core.runners.WorkerInstance',
'io.kestra.core.runners.WorkerTaskRunning',
'io.kestra.core.models.executions.LogEntry',
'io.kestra.core.models.triggers.Trigger',
'io.kestra.ee.models.audits.AuditLog',
'io.kestra.core.models.executions.MetricEntry',
'io.kestra.core.runners.WorkerTriggerResult',
'io.kestra.core.runners.SubflowExecutionResult',
'io.kestra.core.server.ClusterEvent',
'io.kestra.core.runners.SubflowExecutionEnd',
'io.kestra.core.models.flows.FlowInterface',
'io.kestra.core.runners.ExecutionRunning'
) NOT NULL

View File

@@ -0,0 +1,15 @@
package io.kestra.runner.mysql;
import io.kestra.core.runners.ExecutionRunning;
import io.kestra.jdbc.runner.AbstractJdbcExecutionRunningStorage;
import io.kestra.repository.mysql.MysqlRepository;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@MysqlQueueEnabled
public class MysqlExecutionRunningStorage extends AbstractJdbcExecutionRunningStorage {
public MysqlExecutionRunningStorage(@Named("executionrunning") MysqlRepository<ExecutionRunning> repository) {
super(repository);
}
}

View File

@@ -144,4 +144,12 @@ public class MysqlQueueFactory implements QueueFactoryInterface {
public QueueInterface<SubflowExecutionEnd> subflowExecutionEnd() {
return new MysqlQueue<>(SubflowExecutionEnd.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<ExecutionRunning> executionRunning() {
return new MysqlQueue<>(ExecutionRunning.class, applicationContext);
}
}

View File

@@ -0,0 +1,28 @@
CREATE TABLE IF NOT EXISTS execution_running (
`key` VARCHAR(250) NOT NULL PRIMARY KEY,
`value` JSON NOT NULL,
`tenant_id` VARCHAR(250) GENERATED ALWAYS AS (value ->> '$.tenantId') STORED,
`namespace` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.namespace') STORED NOT NULL,
`flow_id` VARCHAR(150) GENERATED ALWAYS AS (value ->> '$.flowId') STORED NOT NULL,
INDEX ix_flow (tenant_id, namespace, flow_id)
);
ALTER TABLE queues MODIFY COLUMN `type` ENUM(
'io.kestra.core.models.executions.Execution',
'io.kestra.core.models.templates.Template',
'io.kestra.core.models.executions.ExecutionKilled',
'io.kestra.core.runners.WorkerJob',
'io.kestra.core.runners.WorkerTaskResult',
'io.kestra.core.runners.WorkerInstance',
'io.kestra.core.runners.WorkerTaskRunning',
'io.kestra.core.models.executions.LogEntry',
'io.kestra.core.models.triggers.Trigger',
'io.kestra.ee.models.audits.AuditLog',
'io.kestra.core.models.executions.MetricEntry',
'io.kestra.core.runners.WorkerTriggerResult',
'io.kestra.core.runners.SubflowExecutionResult',
'io.kestra.core.server.ClusterEvent',
'io.kestra.core.runners.SubflowExecutionEnd',
'io.kestra.core.models.flows.FlowInterface',
'io.kestra.core.runners.ExecutionRunning'
) NOT NULL;

View File

@@ -0,0 +1 @@
ALTER TABLE queues MODIFY COLUMN `offset` BIGINT NOT NULL AUTO_INCREMENT;

View File

@@ -0,0 +1,15 @@
package io.kestra.runner.postgres;
import io.kestra.core.runners.ExecutionRunning;
import io.kestra.jdbc.runner.AbstractJdbcExecutionRunningStorage;
import io.kestra.repository.postgres.PostgresRepository;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
@Singleton
@PostgresQueueEnabled
public class PostgresExecutionRunningStorage extends AbstractJdbcExecutionRunningStorage {
public PostgresExecutionRunningStorage(@Named("executionrunning") PostgresRepository<ExecutionRunning> repository) {
super(repository);
}
}

View File

@@ -144,4 +144,12 @@ public class PostgresQueueFactory implements QueueFactoryInterface {
public QueueInterface<SubflowExecutionEnd> subflowExecutionEnd() {
return new PostgresQueue<>(SubflowExecutionEnd.class, applicationContext);
}
@Override
@Singleton
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
@Bean(preDestroy = "close")
public QueueInterface<ExecutionRunning> executionRunning() {
return new PostgresQueue<>(ExecutionRunning.class, applicationContext);
}
}

View File

@@ -0,0 +1,11 @@
CREATE TABLE IF NOT EXISTS execution_running (
key VARCHAR(250) NOT NULL PRIMARY KEY,
value JSONB NOT NULL,
tenant_id VARCHAR(250) GENERATED ALWAYS AS (value ->> 'tenantId') STORED,
namespace VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'namespace') STORED,
flow_id VARCHAR(150) NOT NULL GENERATED ALWAYS AS (value ->> 'flowId') STORED
);
CREATE INDEX IF NOT EXISTS execution_running__flow ON execution_running (tenant_id, namespace, flow_id);
ALTER TYPE queue_type ADD VALUE IF NOT EXISTS 'io.kestra.core.runners.ExecutionRunning';

View File

@@ -0,0 +1,3 @@
ALTER SEQUENCE queues_offset_seq AS BIGINT;
ALTER SEQUENCE queues_offset_seq MAXVALUE 9223372036854775807;
ALTER TABLE queues ALTER COLUMN "offset" TYPE BIGINT;

View File

@@ -125,6 +125,12 @@ public class JdbcTableConfigsFactory {
return new InstantiableJdbcTableConfig("dashboards", Dashboard.class, "dashboards");
}
@Bean
@Named("executionrunning")
public InstantiableJdbcTableConfig executionRunning() {
return new InstantiableJdbcTableConfig("executionrunning", ExecutionRunning.class, "execution_running");
}
public static class InstantiableJdbcTableConfig extends JdbcTableConfig {
public InstantiableJdbcTableConfig(String name, @Nullable Class<?> cls, String table) {
super(name, cls, table);

View File

@@ -869,8 +869,8 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
@Override
public List<Execution> lastExecutions(
@Nullable String tenantId,
List<FlowFilter> flows
String tenantId,
@Nullable List<FlowFilter> flows
) {
return this.jdbcRepository
.getDslContextWrapper()
@@ -892,14 +892,19 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
.and(NORMAL_KIND_CONDITION)
.and(field("end_date").isNotNull())
.and(DSL.or(
flows
.stream()
.map(flow -> DSL.and(
field("namespace").eq(flow.getNamespace()),
field("flow_id").eq(flow.getId())
))
.toList()
));
ListUtils.emptyOnNull(flows).isEmpty() ?
DSL.trueCondition()
:
DSL.or(
flows.stream()
.map(flow -> DSL.and(
field("namespace").eq(flow.getNamespace()),
field("flow_id").eq(flow.getId())
))
.toList()
)
)
);
Table<Record2<Object, Integer>> cte = subquery.asTable("cte");

View File

@@ -26,13 +26,11 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.Comparator;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -162,12 +160,28 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
return select;
}
private static <T extends Record> SelectConditionStep<T> addFlowId(SelectConditionStep<T> select, String flowId) {
if (flowId != null) {
select = select.and(field("flow_id").eq(flowId));
}
return select;
}
private static <T extends Record> SelectConditionStep<T> addExecutionId(SelectConditionStep<T> select, String executionId) {
if (executionId != null) {
select = select.and(field("execution_id").eq(executionId));
}
return select;
}
@Override
public Flux<LogEntry> findAsync(
@Nullable String tenantId,
@Nullable String namespace,
@Nullable Level minLevel,
ZonedDateTime startDate
@Nullable String tenantId,
@Nullable String namespace,
@Nullable String flowId,
@Nullable String executionId,
@Nullable Level minLevel,
ZonedDateTime startDate
){
return Flux.create(emitter -> this.jdbcRepository
.getDslContextWrapper()
@@ -181,6 +195,8 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
.where(this.defaultFilter(tenantId))
.and(NORMAL_KIND_CONDITION);
addNamespace(select, namespace);
addFlowId(select, flowId);
addExecutionId(select, executionId);
addMinLevel(select, minLevel);
select = select.and(field("timestamp").greaterThan(startDate.toOffsetDateTime()));
@@ -217,24 +233,6 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
}), FluxSink.OverflowStrategy.BUFFER);
}
private List<LogStatistics> fillDate(List<LogStatistics> result, ZonedDateTime startDate, ZonedDateTime endDate) {
DateUtils.GroupType groupByType = DateUtils.groupByType(Duration.between(startDate, endDate));
if (groupByType.equals(DateUtils.GroupType.MONTH)) {
return fillDate(result, startDate, endDate, ChronoUnit.MONTHS, "YYYY-MM");
} else if (groupByType.equals(DateUtils.GroupType.WEEK)) {
return fillDate(result, startDate, endDate, ChronoUnit.WEEKS, "YYYY-ww");
} else if (groupByType.equals(DateUtils.GroupType.DAY)) {
return fillDate(result, startDate, endDate, ChronoUnit.DAYS, "YYYY-MM-DD");
} else if (groupByType.equals(DateUtils.GroupType.HOUR)) {
return fillDate(result, startDate, endDate, ChronoUnit.HOURS, "YYYY-MM-DD HH");
} else {
return fillDate(result, startDate, endDate, ChronoUnit.MINUTES, "YYYY-MM-DD HH:mm");
}
}
private List<LogStatistics> fillDate(
List<LogStatistics> result,
ZonedDateTime startDate,

View File

@@ -4,6 +4,7 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.runners.ExecutionQueued;
import io.kestra.core.utils.IdUtils;
import io.kestra.jdbc.repository.AbstractJdbcRepository;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.impl.DSL;
@@ -19,9 +20,9 @@ public abstract class AbstractJdbcExecutionQueuedStorage extends AbstractJdbcRep
this.jdbcRepository = jdbcRepository;
}
public void save(ExecutionQueued executionQueued) {
public void save(DSLContext dslContext, ExecutionQueued executionQueued) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(executionQueued);
this.jdbcRepository.persist(executionQueued, fields);
this.jdbcRepository.persist(executionQueued, dslContext, fields);
}
public void pop(String tenantId, String namespace, String flowId, Consumer<Execution> consumer) {

View File

@@ -0,0 +1,71 @@
package io.kestra.jdbc.runner;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.runners.ExecutionRunning;
import io.kestra.core.utils.IdUtils;
import io.kestra.jdbc.repository.AbstractJdbcRepository;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.impl.DSL;
import java.util.Map;
import java.util.Optional;
import java.util.function.*;
public class AbstractJdbcExecutionRunningStorage extends AbstractJdbcRepository {
protected io.kestra.jdbc.AbstractJdbcRepository<ExecutionRunning> jdbcRepository;
public AbstractJdbcExecutionRunningStorage(io.kestra.jdbc.AbstractJdbcRepository<ExecutionRunning> jdbcRepository) {
this.jdbcRepository = jdbcRepository;
}
public void save(DSLContext dslContext, ExecutionRunning executionRunning) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(executionRunning);
this.jdbcRepository.persist(executionRunning, dslContext, fields);
}
/**
* Count for running executions then process the count using the consumer function.
* It locked the raw and is wrapped in a transaction so the consumer should use the provided dslContext for any database access.
* <p>
* Note: when there is no execution running, there will be no database locks, so multiple calls will return 0.
* This is only potentially an issue with multiple executor instances when the concurrency limit is set to 1.
*/
public ExecutionRunning countThenProcess(FlowInterface flow, BiFunction<DSLContext, Integer, ExecutionRunning> consumer) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
var dslContext = DSL.using(configuration);
var select = dslContext
.select(AbstractJdbcRepository.field("value"))
.from(this.jdbcRepository.getTable())
.where(this.buildTenantCondition(flow.getTenantId()))
.and(field("namespace").eq(flow.getNamespace()))
.and(field("flow_id").eq(flow.getId()));
Integer count = select.forUpdate().fetch().size();
return consumer.apply(dslContext, count);
});
}
/**
* Delete the execution running corresponding to the given execution.
*/
public void remove(Execution execution) {
this.jdbcRepository
.getDslContextWrapper()
.transaction(configuration -> {
var select = DSL
.using(configuration)
.select(AbstractJdbcRepository.field("value"))
.from(this.jdbcRepository.getTable())
.where(buildTenantCondition(execution.getTenantId()))
.and(field("key").eq(IdUtils.fromPartsAndSeparator('|', execution.getTenantId(), execution.getNamespace(), execution.getFlowId(), execution.getId())))
.forUpdate();
Optional<ExecutionRunning> maybeExecution = this.jdbcRepository.fetchOne(select);
maybeExecution.ifPresent(executionRunning -> this.jdbcRepository.delete(executionRunning));
});
}
}

View File

@@ -6,7 +6,6 @@ import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.executions.statistics.ExecutionCount;
import io.kestra.core.models.flows.*;
import io.kestra.core.models.flows.sla.*;
import io.kestra.core.models.tasks.ExecutableTask;
@@ -113,6 +112,10 @@ public class JdbcExecutor implements ExecutorInterface, Service {
@Named(QueueFactoryInterface.CLUSTER_EVENT_NAMED)
private Optional<QueueInterface<ClusterEvent>> clusterEventQueue;
@Inject
@Named(QueueFactoryInterface.EXECUTION_RUNNING_NAMED)
private QueueInterface<ExecutionRunning> executionRunningQueue;
@Inject
private RunContextFactory runContextFactory;
@@ -146,6 +149,9 @@ public class JdbcExecutor implements ExecutorInterface, Service {
@Inject
private AbstractJdbcExecutionQueuedStorage executionQueuedStorage;
@Inject
private AbstractJdbcExecutionRunningStorage executionRunningStorage;
@Inject
private AbstractJdbcExecutorStateStorage executorStateStorage;
@@ -303,6 +309,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
this.receiveCancellations.addFirst(this.killQueue.receive(Executor.class, this::killQueue));
this.receiveCancellations.addFirst(this.subflowExecutionResultQueue.receive(Executor.class, this::subflowExecutionResultQueue));
this.receiveCancellations.addFirst(this.subflowExecutionEndQueue.receive(Executor.class, this::subflowExecutionEndQueue));
this.receiveCancellations.addFirst(this.executionRunningQueue.receive(Executor.class, this::executionRunningQueue));
this.clusterEventQueue.ifPresent(clusterEventQueueInterface -> this.receiveCancellations.addFirst(clusterEventQueueInterface.receive(this::clusterEventQueue)));
ScheduledFuture<?> scheduledDelayFuture = scheduledDelay.scheduleAtFixedRate(
@@ -553,37 +560,22 @@ public class JdbcExecutor implements ExecutorInterface, Service {
monitors.forEach(monitor -> slaMonitorStorage.save(monitor));
}
// queue execution if needed (limit concurrency)
// handle concurrency limit, we need to use a different queue to be sure that execution running
// are processed sequentially so inside a queue with no parallelism
if (execution.getState().getCurrent() == State.Type.CREATED && flow.getConcurrency() != null) {
ExecutionCount count = executionRepository.executionCounts(
flow.getTenantId(),
List.of(new io.kestra.core.models.executions.statistics.Flow(flow.getNamespace(), flow.getId())),
List.of(State.Type.RUNNING, State.Type.PAUSED),
null,
null,
null
).getFirst();
ExecutionRunning executionRunning = ExecutionRunning.builder()
.tenantId(executor.getFlow().getTenantId())
.namespace(executor.getFlow().getNamespace())
.flowId(executor.getFlow().getId())
.execution(executor.getExecution())
.concurrencyState(ExecutionRunning.ConcurrencyState.CREATED)
.build();
executor = executorService.checkConcurrencyLimit(executor, flow, execution, count.getCount());
// the execution has been queued, we save the queued execution and stops here
if (executor.getExecutionRunning() != null && executor.getExecutionRunning().getConcurrencyState() == ExecutionRunning.ConcurrencyState.QUEUED) {
executionQueuedStorage.save(ExecutionQueued.fromExecutionRunning(executor.getExecutionRunning()));
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_QUEUED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_QUEUED_COUNT_DESCRIPTION, metricRegistry.tags(executor.getExecution())).increment();
return Pair.of(
executor,
executorState
);
}
// the execution has been moved to FAILED or CANCELLED, we stop here
if (executor.getExecution().getState().isTerminated()) {
return Pair.of(
executor,
executorState
);
}
executionRunningQueue.emit(executionRunning);
return Pair.of(
executor,
executorState
);
}
// handle execution changed SLA
@@ -790,7 +782,10 @@ public class JdbcExecutor implements ExecutorInterface, Service {
// move it to the state of the child flow, and merge the outputs.
// This is important to avoid races such as RUNNING that arrives after the first SUCCESS/FAILED.
RunContext runContext = runContextFactory.of(flow, task, current.getExecution(), message.getParentTaskRun());
taskRun = execution.findTaskRunByTaskRunId(message.getParentTaskRun().getId()).withState(message.getState());
taskRun = execution.findTaskRunByTaskRunId(message.getParentTaskRun().getId());
if (taskRun.getState().getCurrent() != message.getState()) {
taskRun = taskRun.withState(message.getState());
}
Map<String, Object> outputs = MapUtils.deepMerge(taskRun.getOutputs(), message.getParentTaskRun().getOutputs());
Variables variables = variablesService.of(StorageContext.forTask(taskRun), outputs);
taskRun = taskRun.withOutputs(variables);
@@ -981,6 +976,37 @@ public class JdbcExecutor implements ExecutorInterface, Service {
}
}
private void executionRunningQueue(Either<ExecutionRunning, DeserializationException> either) {
if (either.isRight()) {
log.error("Unable to deserialize a running execution: {}", either.getRight().getMessage());
return;
}
ExecutionRunning executionRunning = either.getLeft();
FlowInterface flow = flowMetaStore.findByExecution(executionRunning.getExecution()).orElseThrow();
ExecutionRunning processed = executionRunningStorage.countThenProcess(flow, (dslContext, count) -> {
ExecutionRunning computed = executorService.processExecutionRunning(flow, count, executionRunning);
if (computed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.RUNNING && !computed.getExecution().getState().isTerminated()) {
executionRunningStorage.save(dslContext, computed);
} else if (computed.getConcurrencyState() == ExecutionRunning.ConcurrencyState.QUEUED) {
executionQueuedStorage.save(dslContext, ExecutionQueued.fromExecutionRunning(computed));
}
return computed;
});
try {
executionQueue.emit(processed.getExecution());
} catch (QueueException e) {
try {
this.executionQueue.emit(
processed.getExecution().failedExecutionFromExecutor(e).getExecution().withState(State.Type.FAILED)
);
} catch (QueueException ex) {
log.error("Unable to emit the execution {}", processed.getExecution().getId(), ex);
}
}
}
private Executor killingOrAfterKillState(final String executionId, Optional<State.Type> afterKillState) {
return executionRepository.lock(executionId, pair -> {
Execution currentExecution = pair.getLeft();
@@ -1083,6 +1109,11 @@ public class JdbcExecutor implements ExecutorInterface, Service {
slaMonitorStorage.purge(executor.getExecution().getId());
}
// purge execution running
if (executor.getFlow().getConcurrency() != null) {
executionRunningStorage.remove(execution);
}
// check if there exist a queued execution and submit it to the execution queue
if (executor.getFlow().getConcurrency() != null && executor.getFlow().getConcurrency().getBehavior() == Concurrency.Behavior.QUEUE) {
executionQueuedStorage.pop(executor.getFlow().getTenantId(),

View File

@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import static io.kestra.core.utils.Rethrow.throwConsumer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
@KestraTest
abstract public class JdbcQueueTest {
@@ -54,7 +55,7 @@ abstract public class JdbcQueueTest {
flowQueue.emit(builder("io.kestra.f1"));
countDownLatch.await(5, TimeUnit.SECONDS);
assertTrue(countDownLatch.await(5, TimeUnit.SECONDS));
receive.blockLast();
assertThat(countDownLatch.getCount()).isEqualTo(0L);
@@ -75,7 +76,7 @@ abstract public class JdbcQueueTest {
flowQueue.emit("consumer_group", builder("io.kestra.f1"));
countDownLatch.await(5, TimeUnit.SECONDS);
assertTrue(countDownLatch.await(5, TimeUnit.SECONDS));
receive.blockLast();
assertThat(countDownLatch.getCount()).isEqualTo(0L);
@@ -83,52 +84,48 @@ abstract public class JdbcQueueTest {
@Test
void withType() throws InterruptedException, QueueException {
// first one
flowQueue.emit(builder("io.kestra.f1"));
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux<FlowInterface> receive = TestsUtils.receive(flowQueue, Indexer.class, either -> {
countDownLatch.countDown();
});
countDownLatch.await(5, TimeUnit.SECONDS);
// first one
flowQueue.emit(builder("io.kestra.f1"));
assertTrue(countDownLatch.await(5, TimeUnit.SECONDS));
assertThat(receive.blockLast().getNamespace()).isEqualTo("io.kestra.f1");
// second one only
flowQueue.emit(builder("io.kestra.f2"));
CountDownLatch countDownLatch2 = new CountDownLatch(1);
receive = TestsUtils.receive(flowQueue, Indexer.class, either -> {
countDownLatch2.countDown();
});
countDownLatch2.await(5, TimeUnit.SECONDS);
// second one only
flowQueue.emit(builder("io.kestra.f2"));
assertTrue(countDownLatch2.await(5, TimeUnit.SECONDS));
assertThat(receive.blockLast().getNamespace()).isEqualTo("io.kestra.f2");
}
// FIXME
@Test
void withGroupAndType() throws InterruptedException, QueueException {
// first one
flowQueue.emit("consumer_group", builder("io.kestra.f1"));
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux<FlowInterface> receive = TestsUtils.receive(flowQueue, "consumer_group", Indexer.class, either -> {
countDownLatch.countDown();
});
countDownLatch.await(5, TimeUnit.SECONDS);
// first one
flowQueue.emit("consumer_group", builder("io.kestra.f1"));
assertTrue(countDownLatch.await(5, TimeUnit.SECONDS));
assertThat(receive.blockLast().getNamespace()).isEqualTo("io.kestra.f1");
// second one only
flowQueue.emit("consumer_group", builder("io.kestra.f2"));
CountDownLatch countDownLatch2 = new CountDownLatch(1);
receive = TestsUtils.receive(flowQueue, "consumer_group", Indexer.class, either -> {
countDownLatch2.countDown();
});
countDownLatch2.await(5, TimeUnit.SECONDS);
// second one only
flowQueue.emit("consumer_group", builder("io.kestra.f2"));
assertTrue(countDownLatch2.await(5, TimeUnit.SECONDS));
assertThat(receive.blockLast().getNamespace()).isEqualTo("io.kestra.f2");
}

6
package-lock.json generated Normal file
View File

@@ -0,0 +1,6 @@
{
"name": "kestra",
"lockfileVersion": 3,
"requires": true,
"packages": {}
}

View File

@@ -32,7 +32,7 @@ dependencies {
// we define cloud bom here for GCP, Azure and AWS so they are aligned for all plugins that use them (secret, storage, oss and ee plugins)
api platform('com.google.cloud:libraries-bom:26.64.0')
api platform("com.azure:azure-sdk-bom:1.2.36")
api platform('software.amazon.awssdk:bom:2.32.6')
api platform('software.amazon.awssdk:bom:2.32.11')
constraints {
@@ -69,6 +69,8 @@ dependencies {
// we need at least 0.14, it could be removed when Micronaut contains a recent only version in their BOM
api "io.micrometer:micrometer-core:1.15.2"
// We need at least 6.17, it could be removed when Micronaut contains a recent only version in their BOM
api "io.micronaut.openapi:micronaut-openapi-bom:6.17.3"
// Other libs
api("org.projectlombok:lombok:1.18.38")
@@ -78,7 +80,7 @@ dependencies {
api group: 'org.slf4j', name: 'jcl-over-slf4j', version: slf4jVersion
api group: 'org.fusesource.jansi', name: 'jansi', version: '2.4.2'
api group: 'com.devskiller.friendly-id', name: 'friendly-id', version: '1.1.0'
api group: 'net.thisptr', name: 'jackson-jq', version: '1.3.0'
api group: 'net.thisptr', name: 'jackson-jq', version: '1.4.0'
api group: 'com.google.guava', name: 'guava', version: '33.4.8-jre'
api group: 'commons-io', name: 'commons-io', version: '2.20.0'
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.18.0'
@@ -103,7 +105,7 @@ dependencies {
api group: 'com.github.victools', name: 'jsonschema-module-swagger-2', version: jsonschemaVersion
api 'com.h2database:h2:2.3.232'
api 'com.mysql:mysql-connector-j:9.3.0'
api 'org.postgresql:postgresql:42.7.6'
api 'org.postgresql:postgresql:42.7.7'
api 'com.github.docker-java:docker-java:3.5.3'
api 'com.github.docker-java:docker-java-transport-httpclient5:3.5.3'
api (group: 'org.opensearch.client', name: 'opensearch-java', version: "$opensearchVersion")
@@ -114,7 +116,7 @@ dependencies {
api "org.xhtmlrenderer:flying-saucer-pdf:$flyingSaucerVersion"
api group: 'jakarta.mail', name: 'jakarta.mail-api', version: '2.1.3'
api group: 'org.eclipse.angus', name: 'jakarta.mail', version: '2.0.3'
api group: 'com.github.ben-manes.caffeine', name: 'caffeine', version: '3.2.1'
api group: 'com.github.ben-manes.caffeine', name: 'caffeine', version: '3.2.2'
api group: 'de.siegmar', name: 'fastcsv', version: '4.0.0'
// Json Diff
api group: 'com.github.java-json-tools', name: 'json-patch', version: '1.13'

View File

@@ -14,5 +14,5 @@ public class Cpu {
title = "The maximum amount of CPU resources a container can use.",
description = "Make sure to set that to a numeric value e.g. `cpus: \"1.5\"` or `cpus: \"4\"` or For instance, if the host machine has two CPUs and you set `cpus: \"1.5\"`, the container is guaranteed **at most** one and a half of the CPUs."
)
private Property<Long> cpus;
private Property<Double> cpus;
}

View File

@@ -765,7 +765,8 @@ public class Docker extends TaskRunner<Docker.DockerTaskRunnerDetailResult> {
}
if (this.getCpu() != null && this.getCpu().getCpus() != null) {
hostConfig.withCpuQuota(runContext.render(this.getCpu().getCpus()).as(Long.class).orElseThrow() * 10000L);
Double cpuValue = runContext.render(this.getCpu().getCpus()).as(Double.class).orElseThrow();
hostConfig.withNanoCPUs((long)(cpuValue * 1_000_000_000L));
}
if (this.getMemory() != null) {

View File

@@ -2,15 +2,18 @@ package io.kestra.plugin.scripts.runner.docker;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.runners.AbstractTaskRunnerTest;
import io.kestra.core.models.tasks.runners.TaskCommands;
import io.kestra.core.models.tasks.runners.TaskRunner;
import io.kestra.plugin.scripts.exec.scripts.runners.CommandsWrapper;
import org.assertj.core.api.Assertions;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.List;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.hamcrest.Matchers.containsString;
class DockerTest extends AbstractTaskRunnerTest {
@Override
@@ -37,4 +40,30 @@ class DockerTest extends AbstractTaskRunnerTest {
assertThat(result.getExitCode()).isZero();
Assertions.assertThat(result.getLogConsumer().getStdOutCount()).isEqualTo(1);
}
@Test
void shouldSetCorrectCPULimitsInContainer() throws Exception {
var runContext = runContext(this.runContextFactory);
var cpuConfig = Cpu.builder()
.cpus(Property.ofValue(1.5))
.build();
var docker = Docker.builder()
.image("rockylinux:9.3-minimal")
.cpu(cpuConfig)
.build();
var taskCommands = new CommandsWrapper(runContext).withCommands(Property.ofValue(List.of(
"/bin/sh", "-c",
"CPU_LIMIT=$(cat /sys/fs/cgroup/cpu.max || cat /sys/fs/cgroup/cpu/cpu.cfs_quota_us) && " +
"echo \"::{\\\"outputs\\\":{\\\"cpuLimit\\\":\\\"$CPU_LIMIT\\\"}}::\""
)));
var result = docker.run(runContext, taskCommands, Collections.emptyList());
assertThat(result).isNotNull();
assertThat(result.getExitCode()).isZero();
MatcherAssert.assertThat((String) result.getLogConsumer().getOutputs().get("cpuLimit"), containsString("150000"));
assertThat(result.getLogConsumer().getStdOutCount()).isEqualTo(1);
}
}

View File

@@ -116,7 +116,7 @@ class LogConsumerTest {
Collections.emptyList()
);
Await.until(() -> logs.size() >= 10, null, Duration.ofSeconds(5));
Await.until(() -> logs.size() >= 10, null, Duration.ofSeconds(10));
receive.blockLast();
assertThat(logs.stream().filter(m -> m.getLevel().equals(Level.INFO)).count()).isEqualTo(1L);

14
ui/package-lock.json generated
View File

@@ -10,7 +10,7 @@
"hasInstallScript": true,
"dependencies": {
"@js-joda/core": "^5.6.5",
"@kestra-io/ui-libs": "^0.0.224",
"@kestra-io/ui-libs": "^0.0.232",
"@vue-flow/background": "^1.3.2",
"@vue-flow/controls": "^1.1.2",
"@vue-flow/core": "^1.45.0",
@@ -1792,9 +1792,9 @@
}
},
"node_modules/@eslint/plugin-kit": {
"version": "0.3.3",
"resolved": "https://registry.npmjs.org/@eslint/plugin-kit/-/plugin-kit-0.3.3.tgz",
"integrity": "sha512-1+WqvgNMhmlAambTvT3KPtCl/Ibr68VldY2XY40SL1CE0ZXiakFR/cbTspaF5HsnpDMvcYYoJHfl4980NBjGag==",
"version": "0.3.4",
"resolved": "https://registry.npmjs.org/@eslint/plugin-kit/-/plugin-kit-0.3.4.tgz",
"integrity": "sha512-Ul5l+lHEcw3L5+k8POx6r74mxEYKG5kOb6Xpy2gCRW6zweT6TEhAf8vhxGgjhqrd/VO/Dirhsb+1hNpD1ue9hw==",
"dev": true,
"license": "Apache-2.0",
"dependencies": {
@@ -3133,9 +3133,9 @@
"license": "BSD-3-Clause"
},
"node_modules/@kestra-io/ui-libs": {
"version": "0.0.224",
"resolved": "https://registry.npmjs.org/@kestra-io/ui-libs/-/ui-libs-0.0.224.tgz",
"integrity": "sha512-upEsKh8rfonNGW+EvA+ql2DaDc6umBX96xWb49lufmQGpm8xMEO+hhWbBRRXU+7egTYJW/6yaFPShNXWddHB4Q==",
"version": "0.0.232",
"resolved": "https://registry.npmjs.org/@kestra-io/ui-libs/-/ui-libs-0.0.232.tgz",
"integrity": "sha512-4Z1DNxWEZSEEy2Tv63uNf2remxb/IqVUY01/qCaeYjLcp5axrS7Dn43N8DspA4EPdlhe4JFq2RhG13Pom+JDQA==",
"dependencies": {
"@nuxtjs/mdc": "^0.16.1",
"@popperjs/core": "^2.11.8",

View File

@@ -24,7 +24,7 @@
},
"dependencies": {
"@js-joda/core": "^5.6.5",
"@kestra-io/ui-libs": "^0.0.224",
"@kestra-io/ui-libs": "^0.0.232",
"@vue-flow/background": "^1.3.2",
"@vue-flow/controls": "^1.1.2",
"@vue-flow/core": "^1.45.0",
@@ -149,7 +149,7 @@
"@popperjs/core": "npm:@sxzz/popperjs-es@^2.11.7"
},
"el-table-infinite-scroll": {
"vue": "$vue"
"vue": "^3.5.18"
},
"storybook": "$storybook"
},

View File

@@ -254,6 +254,7 @@ The table below lists common Pebble expressions and functions.
| `{{ "apple" \| upper \| abbreviate(3) }}` | Chains multiple filters together. |
| `{{ now(timeZone='Europe/Paris') }}` | Returns the current datetime in a specific timezone. |
| `{{ now(format='sql_milli') }}` | Returns the current datetime in a specific format. |
| `{{ 1378653552123 \| date(format="iso_milli", timeZone="Europe/Paris") }}` | Format a given timestamp to datetime in a specific format and timezone. |
| `{% macro input(type='text', name, value) %} ... {% endmacro %}` | Macro with default argument values. |
| `{# THIS IS A COMMENT #}` | Adds a comment that won't appear in the output. |
| `{{ foo.bar }}` | Accesses a child attribute of a variable. |

View File

@@ -163,16 +163,11 @@
},
getTabClasses(tab) {
const isEnterpriseTab = tab.locked;
const isGanttTab = tab.name === "gantt";
const ROUTES = ["/flows/edit/", "/namespaces/edit/"];
const EDIT_ROUTES = ROUTES.some(route => this.$route.path.startsWith(route));
const isOverviewTab = EDIT_ROUTES && tab.title === "Overview";
return {
"container": !isEnterpriseTab && !isOverviewTab,
"mt-4": !isEnterpriseTab && !isOverviewTab,
"px-0": isEnterpriseTab && isOverviewTab,
"gantt-container": isGanttTab
"container": !isEnterpriseTab,
"mt-4": !isEnterpriseTab,
"px-0": isEnterpriseTab,
};
},
},

View File

@@ -339,6 +339,12 @@
}
});
},
triggerLoadDataAfterBulkEditAction() {
this.loadData();
setTimeout(() => this.loadData(), 200);
setTimeout(() => this.loadData(), 1000);
setTimeout(() => this.loadData(), 5000);
},
async unlock() {
const namespace = this.triggerToUnlock.namespace;
const flowId = this.triggerToUnlock.flowId;
@@ -391,10 +397,10 @@
this.triggerStore.update({...trigger, disabled: !value})
.then(updatedTrigger => {
this.triggers = this.triggers.map(t => {
const triggerContextMatches = t.triggerContext &&
const triggerContextMatches = t.triggerContext &&
t.triggerContext.flowId === updatedTrigger.flowId &&
t.triggerContext.triggerId === updatedTrigger.triggerId;
if (triggerContextMatches) {
return {triggerContext: updatedTrigger, abstractTrigger: t.abstractTrigger};
}
@@ -404,7 +410,7 @@
},
genericConfirmAction(toast, queryAction, byIdAction, success, data) {
this.$toast().confirm(
this.$t(toast, {"count": this.queryBulkAction ? this.total : this.selection.length}),
this.$t(toast, {"count": this.queryBulkAction ? this.total : this.selection.length}) + ". " + this.$t("bulk action async warning"),
() => this.genericConfirmCallback(queryAction, byIdAction, success, data),
() => {
}
@@ -432,7 +438,7 @@
.then(data => {
this.$toast().success(this.$t(success, {count: data.count}));
this.toggleAllUnselected();
this.loadData();
this.triggerLoadDataAfterBulkEditAction();
})
} else {
const selection = this.selection;
@@ -442,7 +448,7 @@
.then(data => {
this.$toast().success(this.$t(success, {count: data.count}));
this.toggleAllUnselected();
this.loadData();
this.triggerLoadDataAfterBulkEditAction();
}).catch(e => {
this.$toast().error(e?.invalids.map(exec => {
return {message: this.$t(exec.message, {triggers: exec.invalidValue})}

View File

@@ -0,0 +1,89 @@
<template>
<div class="ai-trigger-box" v-if="show">
<el-button
v-if="enabled && !opened"
class="ai-trigger-button"
:icon="AiIcon"
@click="handleClick"
>
{{ t("ai.flow.title") }}
</el-button>
</div>
</template>
<script lang="ts" setup>
import {useI18n} from "vue-i18n";
import AiIcon from "./AiIcon.vue";
interface AITriggerButtonProps {
show: boolean;
enabled: boolean;
opened: boolean;
}
interface AITriggerButtonEmits {
(event: "click"): void;
}
const {t} = useI18n();
withDefaults(defineProps<AITriggerButtonProps>(), {
show: false,
enabled: false,
opened: false,
});
const emit = defineEmits<AITriggerButtonEmits>();
function handleClick(): void {
emit("click");
}
</script>
<style scoped lang="scss">
.ai-trigger-box {
--border-angle: 0turn;
--main-bg: conic-gradient(from calc(var(--border-angle) + 50.37deg) at 50% 50%, #3991FF 0deg, #8C4BFF 124.62deg, #A396FF 205.96deg, #3991FF 299.42deg, #E0E0FF 342.69deg, #3991FF 360deg);
--gradient-border: conic-gradient(from calc(var(--border-angle) + 50.37deg) at 50% 50%, #3991FF 0deg, #8C4BFF 124.62deg, #A396FF 205.96deg, #3991FF 299.42deg, #E0E0FF 342.69deg, #3991FF 360deg);
display: flex;
flex-direction: column;
align-items: end;
gap: 0.5rem;
margin-top: 0.5rem;
border: solid 1px transparent;
border-radius: 3rem;
background:
var(--main-bg) padding-box,
var(--gradient-border) border-box,
var(--main-bg) border-box;
background-position: center center;
animation: bg-spin 3s linear infinite;
@keyframes bg-spin {
to {
--border-angle: 1turn;
}
}
.ai-trigger-button {
display: inline-flex;
align-items: center;
gap: 6px;
background-color: var(--ks-button-background-secondary);
color: var(--ks-content-primary);
box-shadow: 0px 4px 4px 0px #00000040;
font-size: 12px;
font-weight: 700;
border: none;
border-radius: 3rem;
}
}
@property --border-angle {
syntax: "<angle>";
inherits: true;
initial-value: 0turn;
}
</style>

View File

@@ -5,10 +5,7 @@
<style scoped lang="scss">
.icon {
height: 20px;
width: 20px;
display: inline-flex;
align-self: center;
justify-self: center;
min-width: 20px;
background: center url("../../assets/icons/ai-agent.svg#file");
html.light & {

View File

@@ -1,6 +1,6 @@
<template>
<div @click="handleClick" class="d-flex my-2 p-2 rounded element" :class="{'moved': moved}">
<div class="me-2 icon">
<div v-if="props.parentPathComplete !== 'inputs'" class="me-2 icon">
<TaskIcon :cls="element.type" :icons only-icon />
</div>
@@ -85,6 +85,7 @@
<style scoped lang="scss">
@import "../../styles/code.scss";
@import "@kestra-io/ui-libs/src/scss/_color-palette";
.element {
cursor: pointer;
@@ -107,7 +108,8 @@
}
.playground-run-task{
background-color: blue;
color: $base-white;
background-color: $base-blue-400;
height: 16px;
width: 16px;
font-size: 4px;

View File

@@ -30,7 +30,7 @@
</template>
<script setup lang="ts">
import {onMounted, computed, inject, ref, provide} from "vue";
import {onMounted, computed, inject, ref, provide, onActivated} from "vue";
import {useI18n} from "vue-i18n";
import {useStore} from "vuex";
import {usePluginsStore} from "../../../stores/plugins";
@@ -73,6 +73,10 @@
return !complexObject
}
onActivated(() => {
pluginsStore.updateDocumentation();
});
function onTaskUpdateField(key: string, val: any) {
const realValue = val === null || val === undefined ? undefined :
// allow array to be created with null values (specifically for metadata)
@@ -160,11 +164,8 @@
task: parsedFlow.value,
})
const fieldsFromSchemaTop = computed(() => MAIN_KEYS.map(key => getFieldFromKey(key, "main")))
const fieldsFromSchemaRest = computed(() => {
return Object.keys(pluginsStore.flowRootProperties ?? {})
.filter((key) => !MAIN_KEYS.includes(key) && !HIDDEN_FIELDS.includes(key))

View File

@@ -1,7 +1,7 @@
<template>
<Header v-if="header" :dashboard />
<section id="filter">
<section id="filter" :class="{filterPadding: padding}">
<KestraFilter
:prefix="`dashboard__${dashboard.id}`"
:language
@@ -14,11 +14,11 @@
/>
</section>
<Sections :key :dashboard :charts :show-default="dashboard.id === 'default'" padding />
<Sections :key :dashboard :charts :show-default="dashboard.id === 'default'" :padding="padding" />
</template>
<script setup lang="ts">
import {computed, onBeforeMount, ref} from "vue";
import {computed, onBeforeMount, ref, watch} from "vue";
import type {Dashboard, Chart} from "./composables/useDashboards";
import {ALLOWED_CREATION_ROUTES, getDashboard, processFlowYaml} from "./composables/useDashboards";
@@ -60,6 +60,8 @@
isNamespace: {type: Boolean, default: false},
});
const padding = computed(() => !props.isFlow && !props.isNamespace);
const dashboard = ref<Dashboard>({id: "", charts: []});
const charts = ref<Chart[]>([]);
@@ -102,12 +104,14 @@
if (props.isFlow && ID === "default") load("default", processFlowYaml(YAML_FLOW, route.params.namespace as string, route.params.id as string));
else if (props.isNamespace && ID === "default") load("default", YAML_NAMESPACE);
});
watch(route, async (_) => refreshCharts());
</script>
<style scoped lang="scss">
@import "@kestra-io/ui-libs/src/scss/variables";
section#filter {
.filterPadding {
margin: 2rem 0.25rem 0;
padding: 0 2rem;
}

View File

@@ -131,7 +131,7 @@ export function useChartGenerator(props: {chart: Chart; filters: string[]; showD
const data = ref();
const generate = async (id: string, pagination?: { pageNumber: number; pageSize: number }) => {
const filters = props.filters.concat(decodeSearchParams(route.query, undefined, []) ?? []);
const parameters: Parameters = {...(pagination ?? {}), ...(filters ?? {})};
const parameters: Parameters = {...(pagination ?? {}), filters: (filters ?? {})};
if (!props.showDefault) {
data.value = await dashboardStore.generate(id, props.chart.id, parameters);

View File

@@ -13,6 +13,7 @@
v-bind="props"
:title="props.data.flowId"
:state="props.data.state"
:icon-component="iconVNode"
@expand-dependencies="expand"
@mouseover="onMouseOver"
@mouseleave="onMouseLeave"
@@ -35,7 +36,7 @@
</template>
<script setup>
import {ref, onMounted, inject, nextTick, onBeforeUnmount, watch} from "vue";
import {ref, onMounted, inject, nextTick, onBeforeUnmount, watch, h, computed} from "vue";
import {useRoute, useRouter} from "vue-router";
import {
VueFlow,
@@ -51,6 +52,20 @@
import {cssVariable} from "@kestra-io/ui-libs";
import BasicNode from "@kestra-io/ui-libs/src/components/nodes/BasicNode.vue";
import TaskIcon from "@kestra-io/ui-libs/src/components/misc/TaskIcon.vue";
const icon = computed(() => {
const GRAY = "#2f3342";
return window.btoa(`
<svg xmlns="http://www.w3.org/2000/svg" width="24" height="25" viewBox="0 0 24 25" fill="none">
<path fill-rule="evenodd" clip-rule="evenodd"
d="M4.34546 9.63757C4.74074 10.5277 5.31782 11.3221 6.03835 11.9681L7.03434 10.8209C6.4739 10.3185 6.02504 9.70059 5.71758 9.00824C5.41012 8.3159 5.25111 7.56496 5.25111 6.80532C5.25111 6.04568 5.41012 5.29475 5.71758 4.6024C6.02504 3.91006 6.4739 3.29216 7.03434 2.78977L6.03835 1.64258C5.31782 2.28851 4.74074 3.08293 4.34546 3.97307C3.95019 4.86321 3.74575 5.82867 3.74575 6.80532C3.74575 7.78197 3.95019 8.74744 4.34546 9.63757ZM16.955 4.38931C17.4802 3.97411 18.1261 3.74777 18.7913 3.74576C19.5894 3.74576 20.3547 4.06807 20.919 4.64177C21.4833 5.21548 21.8004 5.9936 21.8004 6.80494C21.8004 7.61628 21.4833 8.3944 20.919 8.96811C20.3547 9.54181 19.5894 9.86412 18.7913 9.86412C18.2559 9.86126 17.7312 9.71144 17.2725 9.43048L12.3325 14.4529L11.2688 13.3715L16.2088 8.34906C16.0668 8.10583 15.9592 7.84348 15.8891 7.56973H11.2688V6.04014H15.8891C16.055 5.38511 16.4298 4.80451 16.955 4.38931ZM17.9555 8.07674C18.2029 8.24482 18.4938 8.33453 18.7913 8.33453C19.1902 8.33412 19.5727 8.17284 19.8548 7.88607C20.1368 7.59931 20.2955 7.21049 20.2959 6.80494C20.2959 6.50241 20.2076 6.20668 20.0423 5.95514C19.877 5.70361 19.642 5.50756 19.3671 5.39178C19.0922 5.27601 18.7897 5.24572 18.4978 5.30474C18.206 5.36376 17.9379 5.50944 17.7275 5.72336C17.5171 5.93727 17.3738 6.20982 17.3157 6.50653C17.2577 6.80324 17.2875 7.11079 17.4014 7.39029C17.5152 7.66978 17.7081 7.90867 17.9555 8.07674ZM3.74621 15.2177V16.7473H7.19606L2.2417 21.7842L3.30539 22.8656L8.25975 17.8287V21.336H9.76427V15.2177H3.74621ZM15.7823 18.2769H12.7733V19.8064H15.7823V22.1008H21.8004V15.9825H15.7823V18.2769ZM17.2868 20.5712V17.5121H20.2959V20.5712H17.2868ZM8.02885 9.67292C7.62863 9.31407 7.30809 8.87275 7.08853 8.37827C6.86897 7.88378 6.75542 7.34747 6.75542 6.80494C6.75542 6.26241 6.86897 5.72609 7.08853 5.23161C7.30809 4.73713 7.62863 4.29581 8.02885 3.93696L9.02484 5.08415C8.78458 5.29946 8.59215 5.5643 8.46034 5.86106C8.32853 6.15782 8.26035 6.47971 8.26035 6.80532C8.26035 7.13094 8.32853 7.45282 8.46034 7.74958C8.59215 8.04634 8.78458 8.31118 9.02484 8.52649L8.02885 9.67292Z"
fill="${GRAY}" />
</svg>
`);
});
const iconVNode = h(TaskIcon, {customIcon: {icon: icon.value}});
import {apiUrl} from "override/utils/route";
import {linkedElements} from "../../utils/vueFlow";

View File

@@ -1,24 +1,22 @@
<template>
<div class="execution-pending">
<EmptyTemplate class="queued">
<img src="../../assets/queued_visual.svg" alt="Queued Execution">
<h5 class="mt-4 fw-bold">
{{ $t('execution_status') }}
<span
class="ms-2 px-2 py-1 rounded fs-7 fw-normal"
:style="getStyle(execution.state.current)"
>
{{ execution.state.current }}
</span>
</h5>
<p class="mt-4 mb-0">
{{ $t('no_tasks_running') }}
</p>
<p>
{{ $t('execution_starts_progress') }}
</p>
</EmptyTemplate>
</div>
<EmptyTemplate class="queued">
<img src="../../assets/queued_visual.svg" alt="Queued Execution">
<h5 class="mt-4 fw-bold">
{{ $t('execution_status') }}
<span
class="ms-2 px-2 py-1 rounded fs-7 fw-normal"
:style="getStyle(execution.state.current)"
>
{{ execution.state.current }}
</span>
</h5>
<p class="mt-4 mb-0">
{{ $t('no_tasks_running') }}
</p>
<p>
{{ $t('execution_starts_progress') }}
</p>
</EmptyTemplate>
</template>
<script setup>

View File

@@ -80,13 +80,6 @@
this.executionsStore.followExecution(this.$route.params, this.$t);
},
getTabs() {
},
},
computed: {
...mapState("auth", ["user"]),
...mapStores(useCoreStore, useExecutionsStore),
tabs() {
return [
{
name: undefined,
@@ -135,6 +128,13 @@
locked: true
}
];
}
},
computed: {
...mapState("auth", ["user"]),
...mapStores(useCoreStore, useExecutionsStore),
tabs() {
return this.getTabs();
},
routeInfo() {
const ns = this.$route.params.namespace;

View File

@@ -260,7 +260,7 @@
class-name="shrink"
>
<template #default="scope">
<code>{{ scope.row.flowRevision }}</code>
<code class="code-text">{{ scope.row.flowRevision }}</code>
</template>
</el-table-column>
@@ -293,7 +293,7 @@
</el-tooltip>
</template>
<template #default="scope">
<code>
<code class="code-text">
{{ scope.row.taskRunList?.slice(-1)[0].taskId }}
{{
scope.row.taskRunList?.slice(-1)[0].attempts?.length > 1 ? `(${scope.row.taskRunList?.slice(-1)[0].attempts.length})` : ""
@@ -625,9 +625,6 @@
}
this.displayColumns = localStorage.getItem("columns_executions")?.split(",")
|| this.optionalColumns.filter(col => col.default).map(col => col.prop);
if (this.isConcurrency) {
this.emitStateCount([State.RUNNING, State.PAUSED])
}
},
computed: {
...mapState("auth", ["user"]),
@@ -796,6 +793,11 @@
queryFilter["filters[flowId][EQUALS]"] = this.flowId;
}
const hasStateFilters = Object.keys(queryFilter).some(key => key.startsWith("filters[state]")) || queryFilter.state;
if (!hasStateFilters && this.statuses?.length > 0) {
queryFilter["filters[state][IN]"] = this.statuses.join(",");
}
return _merge(base, queryFilter)
},
loadData(callback) {
@@ -806,7 +808,11 @@
page: parseInt(this.$route.query.page || this.internalPageNumber),
sort: this.$route.query.sort || "state.startDate:desc",
state: this.$route.query.state ? [this.$route.query.state] : this.statuses
})).finally(callback);
})).then(() => {
if (this.isConcurrency) {
this.emitStateCount();
}
}).finally(callback);
},
durationFrom(item) {
return (+new Date() - new Date(item.state.startDate).getTime()) / 1000
@@ -1070,15 +1076,12 @@
}
})
},
emitStateCount(states) {
this.executionsStore.findExecutions(this.loadQuery({
size: parseInt(this.$route.query.size || this.internalPageSize),
page: parseInt(this.$route.query.page || this.internalPageNumber),
sort: this.$route.query.sort || "state.startDate:desc",
state: states
})).then(() => {
this.$emit("state-count", this.executionsStore.total);
});
emitStateCount() {
const runningCount = this.executionsStore.executions.filter(execution =>
execution.state.current === State.RUNNING
)?.length;
const totalCount = this.executionsStore.total;
this.$emit("state-count", {runningCount, totalCount});
}
},
watch: {
@@ -1119,6 +1122,9 @@
color: #ffb703;
}
}
.code-text {
color: var(--ks-content-primary);
}
</style>
<style lang="scss">

View File

@@ -45,7 +45,7 @@
</el-tooltip>
</el-form-item>
<el-form-item>
<el-button-group class="min-w-auto">
<el-button-group class="ks-b-group">
<restart :execution="executionsStore.execution" class="ms-0" @follow="forwardEvent('follow', $event)" />
<el-button @click="downloadContent()">
<kicon :tooltip="$t('download logs')">
@@ -60,7 +60,7 @@
</el-button-group>
</el-form-item>
<el-form-item>
<el-button-group class="min-w-auto">
<el-button-group class="ks-b-group">
<el-button @click="loadLogs()">
<kicon :tooltip="$t('refresh')">
<refresh />
@@ -361,4 +361,9 @@
align-items: flex-start;
}
}
.ks-b-group {
min-width: auto!important;
max-width: max-content !important;
}
</style>

View File

@@ -37,13 +37,14 @@
</div>
<div class="d-flex flex-column p-3 debug">
<editor
<Editor
ref="debugEditor"
:full-height="false"
:custom-height="20"
:input="true"
:navbar="false"
:model-value="computedDebugValue"
@update:model-value="editorValue = $event"
@confirm="onDebugExpression($event)"
class="w-100"
/>
@@ -53,7 +54,7 @@
:icon="Refresh"
@click="
onDebugExpression(
debugEditor.editor.getValue(),
editorValue.length > 0 ? editorValue : computedDebugValue,
)
"
class="mt-3"
@@ -61,7 +62,7 @@
{{ $t("eval.render") }}
</el-button>
<editor
<Editor
v-if="debugExpression"
:read-only="true"
:input="true"
@@ -98,7 +99,7 @@
<VarValue
v-if="selectedValue && displayVarValue()"
:value="selectedValue.uri ? selectedValue.uri : selectedValue"
:value="selectedValue?.uri ? selectedValue?.uri : selectedValue"
:execution="execution"
/>
</div>
@@ -129,8 +130,9 @@
}>();
const cascader = ref<any>(null);
const debugEditor = ref<any>(null);
const debugEditor = ref<InstanceType<typeof Editor>>();
const selected = ref<string[]>([]);
const editorValue = ref("");
const debugExpression = ref("");
const debugError = ref("");
const debugStackTrace = ref("");
@@ -425,4 +427,4 @@
font-size: var(--el-font-size-base);
}
}
</style>
</style>

View File

@@ -80,6 +80,7 @@
:input="true"
:navbar="false"
:model-value="computedDebugValue"
@update:model-value="editorValue = $event"
@confirm="onDebugExpression($event)"
class="w-100"
/>
@@ -88,8 +89,9 @@
type="primary"
@click="
onDebugExpression(
debugEditor.editor.getValue(),
editorValue.length > 0 ? editorValue : computedDebugValue,
)
"
class="mt-3"
>
@@ -163,8 +165,9 @@
import CopyToClipboard from "../../layout/CopyToClipboard.vue";
import Editor from "../../inputs/Editor.vue";
const editorValue = ref("");
const debugCollapse = ref("");
const debugEditor = ref(null);
const debugEditor = ref<InstanceType<typeof Editor>>();
const debugExpression = ref("");
const computedDebugValue = computed(() => {
const formatTask = (task) => {
@@ -422,7 +425,7 @@
const displayVarValue = () =>
isFile(selectedValue.value) ||
selectedValue.value !== debugExpression.value;
const leftWidth = ref(70);
const startDragging = (event: MouseEvent) => {
const startX = event.clientX;

View File

@@ -1,6 +1,6 @@
<template>
<template v-if="flow.concurrency">
<div v-if="runningCount > 0 || !runningCountSet" :class="{'d-none': !runningCountSet}">
<div v-if="totalCount > 0 || !runningCountSet" :class="{'d-none': !runningCountSet}">
<el-card class="mb-3">
<div class="row mb-3">
<span class="col d-flex align-items-center">
@@ -50,13 +50,20 @@
data() {
return {
runningCount: 0,
totalCount: 0,
runningCountSet: false,
}
},
methods: {
setRunningCount(count) {
this.runningCount = count
this.runningCountSet = true
if (typeof count === "object") {
this.runningCount = count.runningCount;
this.totalCount = count.totalCount;
} else {
this.runningCount = count;
this.totalCount = count;
}
this.runningCountSet = true;
}
},
computed: {
@@ -89,4 +96,8 @@
border-radius: var(--bs-border-radius);
}
}
:deep(.el-card) {
background-color: var(--ks-background-panel);
}
</style>

Some files were not shown because too many files have changed in this diff Show More