Compare commits

...

78 Commits

Author SHA1 Message Date
François Delbrayelle
343c66f125 build(release): only release changed plugins 2025-08-12 11:59:15 +02:00
Loïc Mathieu
577f813eef fix(executions): SLA monitor should take into account restarted executions 2025-08-12 11:46:58 +02:00
Loïc Mathieu
06a9f13676 fix(executions): concurrency limit exceeded when restarting an execution
Fixes #7880
2025-08-12 11:46:58 +02:00
Loïc Mathieu
1fd6e23f96 feat(flows): Flow SLA out of beta
Part-of: https://github.com/kestra-io/kestra-ee/issues/4555
2025-08-12 11:29:32 +02:00
Piyush Bhaskar
9a32780c8c fix(flow): fixes flow deletion inside actions (#10693) 2025-08-12 14:56:31 +05:30
Nicolas K.
af140baa66 Feat/add filters to repositories (#10629)
* wip(repositories): use query filter in the log repository

* feat(repositories): #10628 refactor query builder engine

* fix(repositories): #10628 add sort to findAsych query

* Update core/src/main/java/io/kestra/core/utils/ListUtils.java

Co-authored-by: Loïc Mathieu <loikeseke@gmail.com>

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
Co-authored-by: Loïc Mathieu <loikeseke@gmail.com>
2025-08-12 11:17:47 +02:00
Florian Hussonnois
54b0183b95 fix(system): avoid unsupported type error on ServiceType enum 2025-08-12 10:01:30 +02:00
Loïc Mathieu
64de3d5fa8 fix(executions): correctly fail the request when trying to resume an execution with the wrong inputs
Fixes #9959
2025-08-12 09:39:02 +02:00
Piyush Bhaskar
4c17aadb81 fix(ui): more visible color for deafult edge (#10690) 2025-08-12 12:44:20 +05:30
Piyush Bhaskar
bf424fbf53 fix(core): reduce size of code block text and padding (#10689) 2025-08-12 11:46:52 +05:30
brian.mulier
edcdb88559 fix(dashboard): avoid duplicate dashboard calls + properly refresh dashboards on refresh button + don't discard component entirely on refresh 2025-08-11 22:28:19 +02:00
brian.mulier
9a9d0b995a fix(dashboard): properly use time filters in queries
closes kestra-io/kestra-ee#4389
2025-08-11 22:28:19 +02:00
brian-mulier-p
5c5d313fb0 fix(metrics): restore autocompletion on metrics filter (#10688) 2025-08-11 21:08:56 +02:00
Nicolas K.
dfd4d87867 feat(releases): add test jar to meven central deployment (#10675)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-08-11 15:56:51 +02:00
Piyush Bhaskar
367d773a86 fix(flows): enable the save and makes tab dirty when have unsaved changes in no code (#10671) 2025-08-11 18:35:56 +05:30
brian.mulier
c819f15c66 tests(core): add a test to taskrunners to ensure it's working multiple times on the same working directory
part of kestra-io/plugin-ee-kubernetes#45
2025-08-11 14:59:15 +02:00
Loïc Mathieu
673b5c994c feat(flows): add upstream dependencies in flow dependencies
Closes #10638
2025-08-11 12:43:33 +02:00
Loïc Mathieu
2acf37e0e6 fix(executions): properly fail the task if it contains unsupported unicode sequence
This occurs in Postgres using the `\u0000` unicode sequence. Postgres refuse to store any JSONB with this sequence as it has no textual representation.
We now properly detect that and fail the task.

Fixes #10326
2025-08-11 11:53:39 +02:00
Ludovic DEHON
0d7fcbb936 build(core): create a docker image for each pull request (#10644)
relate to kestra-io/kestra#10643
2025-08-09 00:18:28 +02:00
Miloš Paunović
42b01d6951 chore(core): reload number of dependencies on flow save action (#10663)
Closes https://github.com/kestra-io/kestra/issues/10484.
2025-08-08 15:11:41 +02:00
Miloš Paunović
9edfb01920 chore(core): uniform dependency table namespace label (#10655) 2025-08-08 13:14:53 +02:00
Miloš Paunović
7813337f48 fix(core): ensure dependency table updates occur after dom is fully rendered (#10654)
Closes https://github.com/kestra-io/kestra/issues/10639.
2025-08-08 12:52:16 +02:00
Miloš Paunović
ea0342f82a refactor(core): remove revision property from flow nodes in dependency graph (#10650)
Related to https://github.com/kestra-io/kestra/issues/10633.
2025-08-08 12:21:01 +02:00
Piyush Bhaskar
ca8f25108e fix(core): update flow store usage. (#10649) 2025-08-08 11:34:09 +02:00
Miloš Paunović
49b6c331a6 chore(core): amend edge color scheme in execution dependency graph (#10648)
Related to https://github.com/kestra-io/kestra/issues/10639.
2025-08-08 11:29:11 +02:00
Miloš Paunović
e409fb7ac0 chore(core): lower the wheel sensitivity on zooming of dependency graph (#10647)
Relates to https://github.com/kestra-io/kestra/issues/10639.
2025-08-08 10:27:51 +02:00
Miloš Paunović
0b64c29794 fix(flows): properly import pinia store into a dependency graph composable (#10646) 2025-08-08 10:25:58 +02:00
Piyush Bhaskar
c4665460aa fix(flows): copy trigger url propely. (#10645) 2025-08-08 12:57:41 +05:30
Barthélémy Ledoux
5423b6e3a7 refactor: move flow store to pinia (#10620) 2025-08-08 09:04:33 +02:00
Vanshika Kumar
114669e1b5 chore(core): add padding around user image in left sidebar (#10553)
Co-authored-by: Vanshika Kumar <vanshika.kumar-ext@infra.market>
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-08-08 08:34:23 +02:00
Loïc Mathieu
d75f0ced38 fix(executions): allow caching tasks that use the 'workingDir' variable
Fixes #10253
2025-08-07 17:26:24 +02:00
brian.mulier
0a788d8429 fix(core): ensure props with defaults are not marked as required in generated doc 2025-08-07 15:07:00 +02:00
brian.mulier
8c25d1bbd7 fix(core): wrong @NotNull import leading to key not being marked as required
closes #9287
2025-08-07 15:07:00 +02:00
YannC
4e2e8f294f fix: avoid calling nextExecutionDate if value is null when resetting trigger (#10547) 2025-08-07 14:51:27 +02:00
Barthélémy Ledoux
2c34804ce2 fix(core): update necessary node viewer in gradle build (#10624) 2025-08-07 13:38:29 +02:00
Piyush Bhaskar
bab4eef790 refactor(namespace): migrate namespace module to pinia (#10571)
* refactor(namespace): migrate namespace module to pinia

* refactor(namespaces): override the store and fix the test

* fix:  test in good way

* refactor: rename action as ee

* refactor: state and action is different

* refactor:  namespaces store in composition  API and composable to use the common state, actions

* fix: export validate
2025-08-07 16:20:51 +05:30
Miloš Paunović
94aa628ac1 feat(core): implement different graph type for dependencies view (#10240)
Closes https://github.com/kestra-io/kestra/issues/5350.
Closes https://github.com/kestra-io/kestra/issues/10446.
Closes https://github.com/kestra-io/kestra/issues/10563.
Closes https://github.com/kestra-io/kestra-ee/issues/3431.
Closes https://github.com/kestra-io/kestra-ee/issues/4509.

Relates to https://github.com/kestra-io/kestra/issues/10484.
Relates to https://github.com/kestra-io/kestra-ee/issues/3550.
2025-08-07 12:12:12 +02:00
Loïc Mathieu
da180fbc00 chore(system): add a note on MapUtils.nestedToFlattenMap() method 2025-08-07 12:01:31 +02:00
Anna Geller
c7bd592bc7 fix(ai-agent): add prompt suggestion 2025-08-07 10:42:35 +02:00
Florian Hussonnois
693d174960 chore(system): provide a more useful Either utility class
Rewrite and add tests to Either class to be a bit
more useable
2025-08-07 10:31:28 +02:00
Florian Hussonnois
8ee492b9c5 fix(system): fix consumer commit on JDBC queue
Ensure that JDBC queue records are committed to the consumer
after processing. This fixes a rare issue where executions could be blocked after a runner crash.
2025-08-07 10:31:17 +02:00
Loïc Mathieu
d6b8ba34ea chore(system): provide a MapUtils.nestedToFlattenMap() method
It will be used to nest a previously flatten map when needed.
2025-08-07 10:00:13 +02:00
dependabot[bot]
08cc853e00 build(deps): bump software.amazon.awssdk.crt:aws-crt
Bumps [software.amazon.awssdk.crt:aws-crt](https://github.com/awslabs/aws-crt-java) from 0.38.7 to 0.38.8.
- [Release notes](https://github.com/awslabs/aws-crt-java/releases)
- [Commits](https://github.com/awslabs/aws-crt-java/compare/v0.38.7...v0.38.8)

---
updated-dependencies:
- dependency-name: software.amazon.awssdk.crt:aws-crt
  dependency-version: 0.38.8
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-07 09:19:01 +02:00
dependabot[bot]
4f68715483 build(deps): bump org.apache.commons:commons-compress
Bumps [org.apache.commons:commons-compress](https://github.com/apache/commons-compress) from 1.27.1 to 1.28.0.
- [Changelog](https://github.com/apache/commons-compress/blob/master/RELEASE-NOTES.txt)
- [Commits](https://github.com/apache/commons-compress/compare/rel/commons-compress-1.27.1...rel/commons-compress-1.28.0)

---
updated-dependencies:
- dependency-name: org.apache.commons:commons-compress
  dependency-version: 1.28.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-07 09:18:02 +02:00
Karthik D
edde1b6730 fix(core): fixes overflow of outputs content
* fix

* fix

* fix: minor tweaks

* fix: scope the style

---------

Co-authored-by: Piyush-r-bhaskar <impiyush0012@gmail.com>
2025-08-07 12:37:44 +05:30
Biplab Bera
399446f52e feat: disabled the preview button in output tabs for zip files (#10535)
Co-authored-by: Piyush Bhaskar <102078527+Piyush-r-bhaskar@users.noreply.github.com>
2025-08-07 11:56:58 +05:30
Florian Hussonnois
c717890fbc fix(build): fix and enhance release-plugins.sh
Skip gradle release when tag already exists
Check for staging files before commiting
2025-08-06 17:17:50 +02:00
Barthélémy Ledoux
5328b0c574 fix(flows): allow date inputs in playground (#10611) 2025-08-06 15:36:29 +02:00
Barthélémy Ledoux
de14cae1f0 fix(flows): playground only clear highlighted lines on leave task (#10612) 2025-08-06 15:36:17 +02:00
Miloš Paunović
d8a3e703e7 feat(core): add animated edges to topology graph (#10616)
Closes kestra-io/kestra#10614.
2025-08-06 14:49:31 +02:00
dependabot[bot]
90659bc320 build(deps): bump com.azure:azure-sdk-bom from 1.2.36 to 1.2.37
Bumps [com.azure:azure-sdk-bom](https://github.com/azure/azure-sdk-for-java) from 1.2.36 to 1.2.37.
- [Release notes](https://github.com/azure/azure-sdk-for-java/releases)
- [Commits](https://github.com/azure/azure-sdk-for-java/compare/azure-sdk-bom_1.2.36...azure-sdk-bom_1.2.37)

---
updated-dependencies:
- dependency-name: com.azure:azure-sdk-bom
  dependency-version: 1.2.37
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-06 12:55:33 +02:00
dependabot[bot]
37d1d8856e build(deps): bump software.amazon.awssdk:bom from 2.32.11 to 2.32.16
Bumps software.amazon.awssdk:bom from 2.32.11 to 2.32.16.

---
updated-dependencies:
- dependency-name: software.amazon.awssdk:bom
  dependency-version: 2.32.16
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-06 11:56:59 +02:00
Florian Hussonnois
93a4eb5cbc build: add plugin-datagen to plugin list 2025-08-06 11:11:46 +02:00
Miloš Paunović
de160c8a2d chore(deps): regular dependency update (#10607)
Performing a weekly round of dependency updates in the NPM ecosystem to keep everything up to date.
2025-08-06 10:20:32 +02:00
dependabot[bot]
28458b59eb build(deps): bump com.mysql:mysql-connector-j from 9.3.0 to 9.4.0
Bumps [com.mysql:mysql-connector-j](https://github.com/mysql/mysql-connector-j) from 9.3.0 to 9.4.0.
- [Changelog](https://github.com/mysql/mysql-connector-j/blob/release/9.x/CHANGES)
- [Commits](https://github.com/mysql/mysql-connector-j/compare/9.3.0...9.4.0)

---
updated-dependencies:
- dependency-name: com.mysql:mysql-connector-j
  dependency-version: 9.4.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-06 09:50:39 +02:00
dependabot[bot]
2a256d9505 build(deps): bump org.eclipse.angus:jakarta.mail from 2.0.3 to 2.0.4
Bumps org.eclipse.angus:jakarta.mail from 2.0.3 to 2.0.4.

---
updated-dependencies:
- dependency-name: org.eclipse.angus:jakarta.mail
  dependency-version: 2.0.4
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-06 09:50:20 +02:00
dependabot[bot]
9008b21007 build(deps): bump com.google.cloud:libraries-bom from 26.64.0 to 26.65.0
Bumps [com.google.cloud:libraries-bom](https://github.com/googleapis/java-cloud-bom) from 26.64.0 to 26.65.0.
- [Release notes](https://github.com/googleapis/java-cloud-bom/releases)
- [Changelog](https://github.com/googleapis/java-cloud-bom/blob/main/release-please-config.json)
- [Commits](https://github.com/googleapis/java-cloud-bom/compare/v26.64.0...v26.65.0)

---
updated-dependencies:
- dependency-name: com.google.cloud:libraries-bom
  dependency-version: 26.65.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-06 09:49:35 +02:00
dependabot[bot]
8c13bf6a71 build(deps): bump com.gradleup.shadow from 8.3.8 to 8.3.9
Bumps [com.gradleup.shadow](https://github.com/GradleUp/shadow) from 8.3.8 to 8.3.9.
- [Release notes](https://github.com/GradleUp/shadow/releases)
- [Commits](https://github.com/GradleUp/shadow/compare/8.3.8...8.3.9)

---
updated-dependencies:
- dependency-name: com.gradleup.shadow
  dependency-version: 8.3.9
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-06 09:49:06 +02:00
dependabot[bot]
43888cc3dd build(deps): bump actions/download-artifact from 4 to 5
Bumps [actions/download-artifact](https://github.com/actions/download-artifact) from 4 to 5.
- [Release notes](https://github.com/actions/download-artifact/releases)
- [Commits](https://github.com/actions/download-artifact/compare/v4...v5)

---
updated-dependencies:
- dependency-name: actions/download-artifact
  dependency-version: '5'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-06 09:48:47 +02:00
Piyush Bhaskar
c94093d9f6 fix(flows): ensure plugin documentation change on flow switch (#10546)
Co-authored-by: Barthélémy Ledoux <bledoux@kestra.io>
2025-08-05 14:29:36 +05:30
Barthélémy Ledoux
8779dec28a fix(flows): add conditional rendering for restart button based on execution (#10570) 2025-08-05 10:22:13 +02:00
Nicolas K.
41614c3a6e feat(stores): #4353 list all KV for namespace and parent namespaces (#10470)
* feat(stores): #4353 list all KV for namespace and parent namespaces

* feat(stores): #4353 list all KV for namespace and parent namespaces

* feat(stores): #4353 list all KV for namespace and parent namespaces

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-08-05 09:55:41 +02:00
Barthélémy Ledoux
6b4fdd0688 fix: restore InputForm (#10568) 2025-08-05 09:44:39 +02:00
Loïc Mathieu
0319f3d267 feat(system): set the default number of worker threads to 8x available cpu cores
This is a better default for mixed workloads and provides better tail latency.
This is also what we advise to our customer.
2025-08-05 09:19:14 +02:00
brian.mulier
0b37fe2cb8 fix(namespaces): autocomplete in kv & secrets
related to kestra-io/kestra-ee#4559
2025-08-04 20:29:56 +02:00
brian.mulier
e623dd7729 fix(executions): avoid SSE error in follow execution dependencies
closes #10560
2025-08-04 20:22:32 +02:00
Barthélémy Ledoux
db4f7cb4ff fix(flows)*: load flow for execution needs to be stored most of the time (#10566) 2025-08-04 18:54:01 +02:00
Abhilash T
b14b16db0e fix: Updated InputsForm.vue to clear Radio Button Selection (#9654)
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
Co-authored-by: Bart Ledoux <bledoux@kestra.io>
2025-08-04 16:03:25 +02:00
brian.mulier
77f6cec0e4 fix(executions): restore execution redirect & subflow logs view from parent
closes #10528
closes #10551
2025-08-04 15:46:48 +02:00
Piyush Bhaskar
1748b18d66 chore(core): remove variable and directly assign. (#10554) 2025-08-04 18:45:19 +05:30
Piyush Bhaskar
32f96348c1 fix(core): proper state detection from parsed data (#10527) 2025-08-04 18:41:05 +05:30
Barthélémy Ledoux
07db0a8c80 fix(flows): no-code - when changing type message avoid warning (#10498) 2025-08-04 14:57:28 +02:00
Barthélémy Ledoux
2035fd42c3 refactor: use composition api and ts on revision component (#10529) 2025-08-04 14:56:36 +02:00
Barthélémy Ledoux
2856bf07e8 refactor: move editor from vuex to pinia (#10533)
Co-authored-by: Piyush-r-bhaskar <impiyush0012@gmail.com>
2025-08-04 14:55:55 +02:00
Barthélémy Ledoux
f5327cec33 fix: remove debugging value from playground (#10541) 2025-08-04 14:54:45 +02:00
Anna Geller
42955936b2 fix: demo no longer exists 2025-08-04 14:38:13 +02:00
Miloš Paunović
771b98e023 chore(namespaces): add the needed prop for loading all namespaces inside a selector (#10544) 2025-08-04 12:44:38 +02:00
Miloš Paunović
b80e8487e3 fix(namespaces): amend problems with namespace secrets and kv pairs (#10543)
Closes https://github.com/kestra-io/kestra-ee/issues/4584.
2025-08-04 12:19:52 +02:00
205 changed files with 6206 additions and 3925 deletions

View File

@@ -1,20 +1,28 @@
name: 'Load Kestra Plugin List'
description: 'Composite action to load list of plugins'
description: 'Composite action to load list of plugins (from .plugins) and output repositories and GA coordinates'
inputs:
plugin-version:
description: "Kestra version"
description: "Kestra version placeholder to replace LATEST in GA coordinates"
default: 'LATEST'
required: true
plugin-file:
description: "File of the plugins"
description: "Path to the .plugins file"
default: './.plugins'
required: true
include:
description: "Regex include filter applied on repository names"
required: false
default: ''
exclude:
description: "Regex exclude filter applied on repository names"
required: false
default: ''
outputs:
plugins:
description: "List of all Kestra plugins"
description: "Space-separated list of GA coordinates (group:artifact:version)"
value: ${{ steps.plugins.outputs.plugins }}
repositories:
description: "List of all Kestra repositories of plugins"
description: "Space-separated list of repository names (e.g., plugin-ai plugin-airbyte)"
value: ${{ steps.plugins.outputs.repositories }}
runs:
using: composite
@@ -23,7 +31,35 @@ runs:
id: plugins
shell: bash
run: |
PLUGINS=$([ -f ${{ inputs.plugin-file }} ] && cat ${{ inputs.plugin-file }} | grep "io\\.kestra\\." | sed -e '/#/s/^.//' | sed -e "s/LATEST/${{ inputs.plugin-version }}/g" | cut -d':' -f2- | xargs || echo '');
REPOSITORIES=$([ -f ${{ inputs.plugin-file }} ] && cat ${{ inputs.plugin-file }} | grep "io\\.kestra\\." | sed -e '/#/s/^.//' | cut -d':' -f1 | uniq | sort | xargs || echo '')
echo "plugins=$PLUGINS" >> $GITHUB_OUTPUT
echo "repositories=$REPOSITORIES" >> $GITHUB_OUTPUT
set -euo pipefail
# Read only uncommented lines that contain io.kestra.* coordinates.
# This avoids the previous approach that 'uncommented' lines by stripping the first char after '#'.
if [[ -f "${{ inputs.plugin-file }}" ]]; then
ENABLED_LINES=$(grep -E '^\s*[^#]' "${{ inputs.plugin-file }}" | grep "io\.kestra\." || true)
else
ENABLED_LINES=""
fi
# Build GA coordinates by replacing LATEST with the provided plugin-version (if present)
PLUGINS=$(echo "$ENABLED_LINES" \
| sed -e "s/LATEST/${{ inputs.plugin-version }}/g" \
| cut -d':' -f2- \
| xargs || echo '')
# Extract repository names (first column), unique + sorted
REPOSITORIES=$(echo "$ENABLED_LINES" \
| cut -d':' -f1 \
| uniq | sort \
| xargs || echo '')
# Apply include/exclude filters if provided (POSIX ERE via grep -E)
if [ -n "${{ inputs.include }}" ] && [ -n "$REPOSITORIES" ]; then
REPOSITORIES=$(echo "$REPOSITORIES" | xargs -n1 | grep -E "${{ inputs.include }}" | xargs || true)
fi
if [ -n "${{ inputs.exclude }}" ] && [ -n "$REPOSITORIES" ]; then
REPOSITORIES=$(echo "$REPOSITORIES" | xargs -n1 | grep -Ev "${{ inputs.exclude }}" | xargs || true)
fi
echo "plugins=$PLUGINS" >> "$GITHUB_OUTPUT"
echo "repositories=$REPOSITORIES" >> "$GITHUB_OUTPUT"

View File

@@ -107,7 +107,7 @@ jobs:
# Download executable from artifact
- name: Artifacts - Download executable
if: github.event_name != 'workflow_dispatch' || steps.download-github-release.outcome == 'skipped'
uses: actions/download-artifact@v4
uses: actions/download-artifact@v5
with:
name: exe
path: build/executable

View File

@@ -15,24 +15,111 @@ on:
description: 'Use DRY_RUN mode'
required: false
default: 'false'
type: choice
options: ['false', 'true']
repositories:
description: 'Space-separated repo names to release (e.g. "plugin-ai plugin-airbyte"). If empty, uses .plugins.'
required: false
type: string
include:
description: 'Regex include filter on repo names (applied when using .plugins)'
required: false
type: string
exclude:
description: 'Regex exclude filter on repo names (applied when using .plugins)'
required: false
type: string
onlyChanged:
description: 'Release only repos changed since last tag (or sinceTag if provided)'
required: false
default: 'false'
type: choice
options: ['false', 'true']
sinceTag:
description: 'Optional tag used as base for change detection (e.g. v0.24.0)'
required: false
type: string
jobs:
release:
name: Release plugins
prepare:
name: Compute target repositories
runs-on: ubuntu-latest
outputs:
matrix: ${{ steps.compute.outputs.matrix }}
steps:
# Checkout
# Checkout the current repo (assumed to contain .plugins and the workflow)
- uses: actions/checkout@v4
with:
fetch-depth: 0
# Checkout GitHub Actions
# Checkout the kestra-io/actions repo (for setup-build, etc.)
- uses: actions/checkout@v4
with:
repository: kestra-io/actions
path: actions
ref: main
# Setup build
- name: Install tools
run: sudo apt-get update && sudo apt-get install -y jq
# Load repositories from .plugins (only uncommented lines) with optional include/exclude filters
- name: Get Plugins List
id: plugins-list
uses: ./.github/actions/plugins-list
with:
plugin-version: 'LATEST'
plugin-file: './.plugins'
include: ${{ github.event.inputs.include }}
exclude: ${{ github.event.inputs.exclude }}
# Finalize repo list:
# - If "repositories" input is provided, it takes precedence.
# - Otherwise, use the filtered list from the composite action.
- name: Build repo list
id: build-list
shell: bash
env:
INP_REPOS: ${{ github.event.inputs.repositories }}
run: |
set -euo pipefail
if [ -n "${INP_REPOS:-}" ]; then
LIST="${INP_REPOS}"
else
LIST="${{ steps.plugins-list.outputs.repositories }}"
fi
# Convert to JSON array for matrix
arr_json=$(printf '%s\n' $LIST | jq -R . | jq -s .)
echo "list=$LIST" >> "$GITHUB_OUTPUT"
echo "arr_json=$arr_json" >> "$GITHUB_OUTPUT"
- name: Compute matrix
id: compute
shell: bash
run: |
set -euo pipefail
echo "matrix={\"repo\": ${{ steps.build-list.outputs.arr_json }}}" >> "$GITHUB_OUTPUT"
release:
name: Release ${{ matrix.repo }}
needs: [prepare]
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix: ${{ fromJson(needs.prepare.outputs.matrix) }}
steps:
# Checkout the current repo (for dev-tools/release-plugins.sh)
- uses: actions/checkout@v4
with:
fetch-depth: 0
# Checkout the kestra-io/actions repo (for setup-build, etc.)
- uses: actions/checkout@v4
with:
repository: kestra-io/actions
path: actions
ref: main
# Build toolchain used by plugin builds
- uses: ./actions/.github/actions/setup-build
id: build
with:
@@ -41,42 +128,45 @@ jobs:
python-enabled: true
caches-enabled: true
# Get Plugins List
- name: Get Plugins List
uses: ./.github/actions/plugins-list
id: plugins-list
with:
plugin-version: 'LATEST'
- name: 'Configure Git'
- name: Configure Git
run: |
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
git config --global user.name "github-actions[bot]"
# Execute
- name: Run Gradle Release
if: ${{ github.event.inputs.dryRun == 'false' }}
env:
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
run: |
chmod +x ./dev-tools/release-plugins.sh;
./dev-tools/release-plugins.sh \
--release-version=${{github.event.inputs.releaseVersion}} \
--next-version=${{github.event.inputs.nextVersion}} \
--yes \
${{ steps.plugins-list.outputs.repositories }}
chmod +x ./dev-tools/release-plugins.sh
ARGS=()
ARGS+=(--release-version="${{ github.event.inputs.releaseVersion }}")
ARGS+=(--next-version="${{ github.event.inputs.nextVersion }}")
ARGS+=(--yes)
if [ "${{ github.event.inputs.onlyChanged }}" = "true" ]; then
ARGS+=(--only-changed)
fi
if [ -n "${{ github.event.inputs.sinceTag }}" ]; then
ARGS+=(--since-tag="${{ github.event.inputs.sinceTag }}")
fi
./dev-tools/release-plugins.sh "${ARGS[@]}" "${{ matrix.repo }}"
# Dry-run release
- name: Run Gradle Release (DRY_RUN)
if: ${{ github.event.inputs.dryRun == 'true' }}
env:
GITHUB_PAT: ${{ secrets.GH_PERSONAL_TOKEN }}
run: |
chmod +x ./dev-tools/release-plugins.sh;
./dev-tools/release-plugins.sh \
--release-version=${{github.event.inputs.releaseVersion}} \
--next-version=${{github.event.inputs.nextVersion}} \
--dry-run \
--yes \
${{ steps.plugins-list.outputs.repositories }}
chmod +x ./dev-tools/release-plugins.sh
ARGS=()
ARGS+=(--release-version="${{ github.event.inputs.releaseVersion }}")
ARGS+=(--next-version="${{ github.event.inputs.nextVersion }}")
ARGS+=(--dry-run)
ARGS+=(--yes)
if [ "${{ github.event.inputs.onlyChanged }}" = "true" ]; then
ARGS+=(--only-changed)
fi
if [ -n "${{ github.event.inputs.sinceTag }}" ]; then
ARGS+=(--since-tag="${{ github.event.inputs.sinceTag }}")
fi
./dev-tools/release-plugins.sh "${ARGS[@]}" "${{ matrix.repo }}"

View File

@@ -38,7 +38,7 @@ jobs:
# Download Exec
# Must be done after checkout actions
- name: Artifacts - Download executable
uses: actions/download-artifact@v4
uses: actions/download-artifact@v5
if: startsWith(github.ref, 'refs/tags/v')
with:
name: exe

View File

@@ -120,7 +120,7 @@ jobs:
# Build Docker Image
- name: Artifacts - Download executable
uses: actions/download-artifact@v4
uses: actions/download-artifact@v5
with:
name: exe
path: build/executable

View File

@@ -0,0 +1,15 @@
name: Pull Request - Delete Docker
on:
pull_request:
types: [closed]
jobs:
publish:
name: Pull Request - Delete Docker
runs-on: ubuntu-latest
steps:
- uses: dataaxiom/ghcr-cleanup-action@v1
with:
package: kestra-pr
delete-tags: ${{ github.event.pull_request.number }}

View File

@@ -0,0 +1,76 @@
name: Pull Request - Publish Docker
on:
pull_request:
branches:
- develop
jobs:
build-artifacts:
name: Build Artifacts
uses: ./.github/workflows/workflow-build-artifacts.yml
publish:
name: Publish Docker
runs-on: ubuntu-latest
needs: build-artifacts
env:
GITHUB_IMAGE_PATH: "ghcr.io/kestra-io/kestra-pr"
steps:
- name: Checkout - Current ref
uses: actions/checkout@v4
with:
fetch-depth: 0
# Docker setup
- name: Docker - Setup QEMU
uses: docker/setup-qemu-action@v3
- name: Docker - Setup Docker Buildx
uses: docker/setup-buildx-action@v3
# Docker Login
- name: Login to GHCR
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
# Build Docker Image
- name: Artifacts - Download executable
uses: actions/download-artifact@v5
with:
name: exe
path: build/executable
- name: Docker - Copy exe to image
shell: bash
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
- name: Docker - Build image
uses: docker/build-push-action@v6
with:
context: .
file: ./Dockerfile.pr
push: true
tags: ${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }}
platforms: linux/amd64,linux/arm64
# Add comment on pull request
- name: Add comment to PR
uses: actions/github-script@v7
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |
await github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: `**🐋 Docker image**: \`${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }}\`\n` +
`\n` +
`\`\`\`bash\n` +
`docker run --pull=always --rm -it -p 8080:8080 --user=root -v /var/run/docker.sock:/var/run/docker.sock -v /tmp:/tmp ${{ env.GITHUB_IMAGE_PATH }}:${{ github.event.pull_request.number }} server local\n` +
`\`\`\``
})

View File

@@ -19,6 +19,7 @@
#plugin-databricks:io.kestra.plugin:plugin-databricks:LATEST
#plugin-datahub:io.kestra.plugin:plugin-datahub:LATEST
#plugin-dataform:io.kestra.plugin:plugin-dataform:LATEST
#plugin-datagen:io.kestra.plugin:plugin-datagen:LATEST
#plugin-dbt:io.kestra.plugin:plugin-dbt:LATEST
#plugin-debezium:io.kestra.plugin:plugin-debezium-db2:LATEST
#plugin-debezium:io.kestra.plugin:plugin-debezium-mongodb:LATEST

7
Dockerfile.pr Normal file
View File

@@ -0,0 +1,7 @@
FROM kestra/kestra:develop
USER root
COPY --chown=kestra:kestra docker /
USER kestra

View File

@@ -65,10 +65,6 @@ Kestra is an open-source, event-driven orchestration platform that makes both **
## 🚀 Quick Start
### Try the Live Demo
Try Kestra with our [**Live Demo**](https://demo.kestra.io/ui/login?auto). No installation required!
### Get Started Locally in 5 Minutes
#### Launch Kestra in Docker

View File

@@ -16,7 +16,7 @@ plugins {
id "java"
id 'java-library'
id "idea"
id "com.gradleup.shadow" version "8.3.8"
id "com.gradleup.shadow" version "8.3.9"
id "application"
// test
@@ -620,6 +620,28 @@ subprojects {subProject ->
}
}
}
if (subProject.name != 'platform' && subProject.name != 'cli') {
// only if a test source set actually exists (avoids empty artifacts)
def hasTests = subProject.extensions.findByName('sourceSets')?.findByName('test') != null
if (hasTests) {
// wire the artifact onto every Maven publication of this subproject
publishing {
publications {
withType(MavenPublication).configureEach { pub ->
// keep the normal java component + sources/javadoc already configured
pub.artifact(subProject.tasks.named('testsJar').get())
}
}
}
// make sure publish tasks build the tests jar first
tasks.matching { it.name.startsWith('publish') }.configureEach {
dependsOn subProject.tasks.named('testsJar')
}
}
}
}
}

View File

@@ -16,6 +16,6 @@ abstract public class AbstractServerCommand extends AbstractCommand implements S
}
protected static int defaultWorkerThread() {
return Runtime.getRuntime().availableProcessors() * 4;
return Runtime.getRuntime().availableProcessors() * 8;
}
}

View File

@@ -48,7 +48,7 @@ public class StandAloneCommand extends AbstractServerCommand {
@CommandLine.Option(names = "--tenant", description = "Tenant identifier, Required to load flows from path with the enterprise edition")
private String tenantId;
@CommandLine.Option(names = {"--worker-thread"}, description = "the number of worker threads, defaults to four times the number of available processors. Set it to 0 to avoid starting a worker.")
@CommandLine.Option(names = {"--worker-thread"}, description = "the number of worker threads, defaults to eight times the number of available processors. Set it to 0 to avoid starting a worker.")
private int workerThread = defaultWorkerThread();
@CommandLine.Option(names = {"--skip-executions"}, split=",", description = "a list of execution identifiers to skip, separated by a coma; for troubleshooting purpose only")

View File

@@ -22,7 +22,7 @@ public class WorkerCommand extends AbstractServerCommand {
@Inject
private ApplicationContext applicationContext;
@Option(names = {"-t", "--thread"}, description = "The max number of worker threads, defaults to four times the number of available processors")
@Option(names = {"-t", "--thread"}, description = "The max number of worker threads, defaults to eight times the number of available processors")
private int thread = defaultWorkerThread();
@Option(names = {"-g", "--worker-group"}, description = "The worker group key, must match the regex [a-zA-Z0-9_-]+ (EE only)")

View File

@@ -122,12 +122,13 @@ public class JsonSchemaGenerator {
if (jsonNode instanceof ObjectNode clazzSchema && clazzSchema.get("required") instanceof ArrayNode requiredPropsNode && clazzSchema.get("properties") instanceof ObjectNode properties) {
List<String> requiredFieldValues = StreamSupport.stream(requiredPropsNode.spliterator(), false)
.map(JsonNode::asText)
.toList();
.collect(Collectors.toList());
properties.fields().forEachRemaining(e -> {
int indexInRequiredArray = requiredFieldValues.indexOf(e.getKey());
if (indexInRequiredArray != -1 && e.getValue() instanceof ObjectNode valueNode && valueNode.has("default")) {
requiredPropsNode.remove(indexInRequiredArray);
requiredFieldValues.remove(indexInRequiredArray);
}
});

View File

@@ -139,6 +139,12 @@ public record QueryFilter(
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
}
},
EXECUTION_ID("executionId") {
@Override
public List<Op> supportedOp() {
return List.of(Op.EQUALS, Op.NOT_EQUALS, Op.CONTAINS, Op.STARTS_WITH, Op.ENDS_WITH, Op.IN, Op.NOT_IN);
}
},
CHILD_FILTER("childFilter") {
@Override
public List<Op> supportedOp() {
@@ -213,7 +219,7 @@ public record QueryFilter(
@Override
public List<Field> supportedField() {
return List.of(Field.QUERY, Field.SCOPE, Field.NAMESPACE, Field.START_DATE,
Field.END_DATE, Field.FLOW_ID, Field.TRIGGER_ID, Field.MIN_LEVEL
Field.END_DATE, Field.FLOW_ID, Field.TRIGGER_ID, Field.MIN_LEVEL, Field.EXECUTION_ID
);
}
},

View File

@@ -122,7 +122,7 @@ public class Flow extends AbstractFlow implements HasUID {
AbstractRetry retry;
@Valid
@PluginProperty(beta = true)
@PluginProperty
List<SLA> sla;
public Stream<String> allTypes() {

View File

@@ -4,6 +4,7 @@ import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.utils.IdUtils;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import java.util.Optional;
@@ -57,6 +58,7 @@ public interface FlowId {
@Getter
@AllArgsConstructor
@EqualsAndHashCode
class Default implements FlowId {
private final String tenantId;
private final String namespace;

View File

@@ -116,7 +116,7 @@ public class State {
}
public Instant maxDate() {
if (this.histories.size() == 0) {
if (this.histories.isEmpty()) {
return Instant.now();
}
@@ -124,7 +124,7 @@ public class State {
}
public Instant minDate() {
if (this.histories.size() == 0) {
if (this.histories.isEmpty()) {
return Instant.now();
}
@@ -173,6 +173,11 @@ public class State {
return this.current.isBreakpoint();
}
@JsonIgnore
public boolean isQueued() {
return this.current.isQueued();
}
@JsonIgnore
public boolean isRetrying() {
return this.current.isRetrying();
@@ -206,6 +211,14 @@ public class State {
return this.histories.get(this.histories.size() - 2).state.isPaused();
}
/**
* Return true if the execution has failed, then was restarted.
* This is to disambiguate between a RESTARTED after PAUSED and RESTARTED after FAILED state.
*/
public boolean failedThenRestarted() {
return this.current == Type.RESTARTED && this.histories.get(this.histories.size() - 2).state.isFailed();
}
@Introspected
public enum Type {
CREATED,
@@ -264,6 +277,10 @@ public class State {
return this == Type.KILLED;
}
public boolean isQueued(){
return this == Type.QUEUED;
}
/**
* @return states that are terminal to an execution
*/

View File

@@ -222,6 +222,7 @@ public class Trigger extends TriggerContext implements HasUID {
}
// If trigger is a schedule and execution ended after the next execution date
else if (abstractTrigger instanceof Schedule schedule &&
this.getNextExecutionDate() != null &&
execution.getState().getEndDate().get().isAfter(this.getNextExecutionDate().toInstant())
) {
RecoverMissedSchedules recoverMissedSchedules = Optional.ofNullable(schedule.getRecoverMissedSchedules())

View File

@@ -0,0 +1,12 @@
package io.kestra.core.queues;
import java.io.Serial;
public class UnsupportedMessageException extends QueueException {
@Serial
private static final long serialVersionUID = 1L;
public UnsupportedMessageException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -81,11 +81,7 @@ public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry
Flux<LogEntry> findAsync(
@Nullable String tenantId,
@Nullable String namespace,
@Nullable String flowId,
@Nullable String executionId,
@Nullable Level minLevel,
ZonedDateTime startDate
List<QueryFilter> filters
);
Flux<LogEntry> findAllAsync(@Nullable String tenantId);
@@ -98,5 +94,7 @@ public interface LogRepositoryInterface extends SaveRepositoryInterface<LogEntry
void deleteByQuery(String tenantId, String namespace, String flowId, String triggerId);
void deleteByFilters(String tenantId, List<QueryFilter> filters);
int deleteByQuery(String tenantId, String namespace, String flowId, String executionId, List<Level> logLevels, ZonedDateTime startDate, ZonedDateTime endDate);
}

View File

@@ -86,7 +86,7 @@ public class Executor {
public Boolean canBeProcessed() {
return !(this.getException() != null || this.getFlow() == null || this.getFlow() instanceof FlowWithException || this.getFlow().getTasks() == null ||
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint());
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint() || this.getExecution().getState().isQueued());
}
public Executor withFlow(FlowWithSource flow) {

View File

@@ -764,6 +764,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
workerTask = workerTask.withTaskRun(workerTask.getTaskRun().withState(state));
WorkerTaskResult workerTaskResult = new WorkerTaskResult(workerTask.getTaskRun(), dynamicTaskRuns);
this.workerTaskResultQueue.emit(workerTaskResult);
// upload the cache file, hash may not be present if we didn't succeed in computing it
@@ -796,6 +797,10 @@ public class Worker implements Service, Runnable, AutoCloseable {
// If it's a message too big, we remove the outputs
failed = failed.withOutputs(Variables.empty());
}
if (e instanceof UnsupportedMessageException) {
// we expect the offending char is in the output so we remove it
failed = failed.withOutputs(Variables.empty());
}
WorkerTaskResult workerTaskResult = new WorkerTaskResult(failed);
RunContextLogger contextLogger = runContextLoggerFactory.create(workerTask);
contextLogger.logger().error("Unable to emit the worker task result to the queue: {}", e.getMessage(), e);
@@ -818,7 +823,11 @@ public class Worker implements Service, Runnable, AutoCloseable {
private Optional<String> hashTask(RunContext runContext, Task task) {
try {
var map = JacksonMapper.toMap(task);
var rMap = runContext.render(map);
// If there are task provided variables, rendering the task may fail.
// The best we can do is to add a fake 'workingDir' as it's an often added variables,
// and it should not be part of the task hash.
Map<String, Object> variables = Map.of("workingDir", "workingDir");
var rMap = runContext.render(map, variables);
var json = JacksonMapper.ofJson().writeValueAsBytes(rMap);
MessageDigest digest = MessageDigest.getInstance("SHA-256");
digest.update(json);

View File

@@ -1,5 +1,8 @@
package io.kestra.core.server;
import com.fasterxml.jackson.annotation.JsonCreator;
import io.kestra.core.utils.Enums;
/**
* Supported Kestra's service types.
*/
@@ -9,4 +12,14 @@ public enum ServiceType {
SCHEDULER,
WEBSERVER,
WORKER,
INVALID;
@JsonCreator
public static ServiceType fromString(final String value) {
try {
return Enums.getForNameIgnoreCase(value, ServiceType.class, INVALID);
} catch (IllegalArgumentException e) {
return INVALID;
}
}
}

View File

@@ -547,29 +547,26 @@ public class FlowService {
throw noRepositoryException();
}
List<FlowTopology> flowTopologies = flowTopologyRepository.get().findByFlow(tenant, namespace, id, destinationOnly);
return expandAll ? recursiveFlowTopology(tenant, namespace, id, destinationOnly) : flowTopologies.stream();
return expandAll ? recursiveFlowTopology(new ArrayList<>(), tenant, namespace, id, destinationOnly) : flowTopologyRepository.get().findByFlow(tenant, namespace, id, destinationOnly).stream();
}
private Stream<FlowTopology> recursiveFlowTopology(String tenantId, String namespace, String flowId, boolean destinationOnly) {
private Stream<FlowTopology> recursiveFlowTopology(List<FlowId> flowIds, String tenantId, String namespace, String id, boolean destinationOnly) {
if (flowTopologyRepository.isEmpty()) {
throw noRepositoryException();
}
List<FlowTopology> flowTopologies = flowTopologyRepository.get().findByFlow(tenantId, namespace, flowId, destinationOnly);
List<FlowTopology> subTopologies = flowTopologies.stream()
// filter on destination is not the current node to avoid an infinite loop
.filter(topology -> !(topology.getDestination().getTenantId().equals(tenantId) && topology.getDestination().getNamespace().equals(namespace) && topology.getDestination().getId().equals(flowId)))
.toList();
List<FlowTopology> flowTopologies = flowTopologyRepository.get().findByFlow(tenantId, namespace, id, destinationOnly);
if (subTopologies.isEmpty()) {
FlowId flowId = FlowId.of(tenantId, namespace, id, null);
if (flowIds.contains(flowId)) {
return flowTopologies.stream();
} else {
return Stream.concat(flowTopologies.stream(), subTopologies.stream()
.map(topology -> topology.getDestination())
// recursively fetch child nodes
.flatMap(destination -> recursiveFlowTopology(destination.getTenantId(), destination.getNamespace(), destination.getId(), destinationOnly)));
}
flowIds.add(flowId);
return flowTopologies.stream()
.flatMap(topology -> Stream.of(topology.getDestination(), topology.getSource()))
// recursively fetch child nodes
.flatMap(node -> recursiveFlowTopology(flowIds, node.getTenantId(), node.getNamespace(), node.getId(), destinationOnly));
}
private IllegalStateException noRepositoryException() {

View File

@@ -1,37 +1,193 @@
package io.kestra.core.utils;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
public class Either<L, R> {
private final Optional<L> left;
private final Optional<R> right;
private Either(Optional<L> left, Optional<R> right) {
this.left = left;
this.right = right;
}
/**
* Simple {@link Either} monad type.
*
* @param <L> the {@link Left} type.
* @param <R> the {@link Right} type.
*/
public abstract sealed class Either<L, R> permits Either.Left, Either.Right {
public static <L, R> Either<L, R> left(L value) {
return new Either<>(Optional.ofNullable(value), Optional.empty());
return new Left<>(value);
}
public boolean isLeft() {
return this.left.isPresent();
}
public L getLeft() {
return this.left.get();
}
public static <L, R> Either<L, R> right(R value) {
return new Either<>(Optional.empty(), Optional.ofNullable(value));
return new Right<>(value);
}
public boolean isRight() {
return this.right.isPresent();
/**
* Returns {@code true} if this is a {@link Left}, {@code false} otherwise.
*/
public abstract boolean isLeft();
/**
* Returns {@code true} if this is a {@link Right}, {@code false} otherwise.
*/
public abstract boolean isRight();
/**
* Returns the left value.
*
* @throws NoSuchElementException if is not left.
*/
public abstract L getLeft();
/**
* Returns the right value.
*
* @throws NoSuchElementException if is not right.
*/
public abstract R getRight();
public LeftProjection<L, R> left() {
return new LeftProjection<>(this);
}
public R getRight() {
return this.right.get();
public RightProjection<L, R> right() {
return new RightProjection<>(this);
}
}
public <T> T fold(final Function<L, T> fl, final Function<R, T> fr) {
return isLeft() ? fl.apply(getLeft()) : fr.apply(getRight());
}
public static final class Left<L, R> extends Either<L, R> {
private final L value;
private Left(L value) {
this.value = value;
}
/**
* @return {@code true}.
*/
@Override
public boolean isLeft() {
return true;
}
/**
* @return {@code false}.
*/
@Override
public boolean isRight() {
return false;
}
@Override
public L getLeft() {
return value;
}
@Override
public R getRight() {
throw new NoSuchElementException("This is Left");
}
}
public static final class Right<L, R> extends Either<L, R> {
private final R value;
private Right(R value) {
this.value = value;
}
/**
* @return {@code false}.
*/
@Override
public boolean isLeft() {
return false;
}
/**
* @return {@code true}.
*/
@Override
public boolean isRight() {
return true;
}
@Override
public L getLeft() {
throw new NoSuchElementException("This is Right");
}
@Override
public R getRight() {
return value;
}
}
public static class LeftProjection<L, R> {
private final Either<L, R> either;
LeftProjection(final Either<L, R> either) {
Objects.requireNonNull(either, "either can't be null");
this.either = either;
}
public boolean exists() {
return either.isLeft();
}
public L get() {
return either.getLeft();
}
public <LL> Either<LL, R> map(final Function<? super L, ? extends LL> fn) {
if (either.isLeft()) return Either.left(fn.apply(either.getLeft()));
else return Either.right(either.getRight());
}
public <LL> Either<LL, R> flatMap(final Function<? super L, Either<LL, R>> fn) {
if (either.isLeft()) return fn.apply(either.getLeft());
else return Either.right(either.getRight());
}
public Optional<L> toOptional() {
return exists() ? Optional.of(either.getLeft()) : Optional.empty();
}
}
public static class RightProjection<L, R> {
private final Either<L, R> either;
RightProjection(final Either<L, R> either) {
Objects.requireNonNull(either, "either can't be null");
this.either = either;
}
public boolean exists() {
return either.isRight();
}
public R get() {
return either.getRight();
}
public <RR> Either<L, RR> map(final Function<? super R, ? extends RR> fn) {
if (either.isRight()) return Either.right(fn.apply(either.getRight()));
else return Either.left(either.getLeft());
}
public <RR> Either<L, RR> flatMap(final Function<? super R, Either<L, RR>> fn) {
if (either.isRight()) return fn.apply(either.getRight());
else return Either.left(either.getLeft());
}
public Optional<R> toOptional() {
return exists() ? Optional.of(either.getRight()) : Optional.empty();
}
}
}

View File

@@ -4,6 +4,7 @@ import jakarta.annotation.Nullable;
import jakarta.validation.constraints.NotNull;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
@@ -118,6 +119,25 @@ public final class Enums {
));
}
/**
* Convert an object to a list of a specific enum.
* @param value the object to convert to list of enum.
* @param enumClass the class of the enum to convert to.
* @return A list of the corresponding enum type
* @param <T> The type of the enum.
* @throws IllegalArgumentException If the value does not match any enum value.
*/
public static <T extends Enum<T>> List<T> fromList(Object value, Class<T> enumClass) {
return switch (value) {
case List<?> list when !list.isEmpty() && enumClass.isInstance(list.getFirst()) -> (List<T>) list;
case List<?> list when !list.isEmpty() && list.getFirst() instanceof String ->
list.stream().map(item -> Enum.valueOf(enumClass, item.toString().toUpperCase())).collect(Collectors.toList());
case Enum<?> enumValue when enumClass.isInstance(enumValue) -> List.of(enumClass.cast(enumValue));
case String stringValue -> List.of(Enum.valueOf(enumClass, stringValue.toUpperCase()));
default -> throw new IllegalArgumentException("Field requires a " + enumClass.getSimpleName() + " or List<" + enumClass.getSimpleName() + "> value");
};
}
private Enums() {
}
}

View File

@@ -55,4 +55,20 @@ public class ListUtils {
return newList;
}
public static List<?> convertToList(Object object){
if (object instanceof List<?> list) {
return list;
} else {
throw new IllegalArgumentException("%s in not an instance of List".formatted(object.getClass()));
}
}
public static List<String> convertToListString(Object object){
if (object instanceof List<?> list && (list.isEmpty() || list.getFirst() instanceof String)) {
return (List<String>) list;
} else {
throw new IllegalArgumentException("%s in not an instance of List of String".formatted(object));
}
}
}

View File

@@ -169,7 +169,7 @@ public class MapUtils {
}
/**
* Utility method nested a flattened map.
* Utility method that nests a flattened map.
*
* @param flatMap the flattened map.
* @return the nested map.
@@ -203,4 +203,44 @@ public class MapUtils {
}
return result;
}
/**
* Utility method that flatten a nested map.
* <p>
* NOTE: for simplicity, this method didn't allow to flatten maps with conflicting keys that would end up in different flatten keys,
* this could be related later if needed by flattening {k1: k2: {k3: v1}, k1: {k4: v2}} to {k1.k2.k3: v1, k1.k4: v2} is prohibited for now.
*
* @param nestedMap the nested map.
* @return the flattened map.
*
* @throws IllegalArgumentException if any entry contains a map of more than one element.
*/
public static Map<String, Object> nestedToFlattenMap(@NotNull Map<String, Object> nestedMap) {
Map<String, Object> result = new TreeMap<>();
for (Map.Entry<String, Object> entry : nestedMap.entrySet()) {
if (entry.getValue() instanceof Map<?, ?> map) {
Map.Entry<String, Object> flatten = flattenEntry(entry.getKey(), (Map<String, Object>) map);
result.put(flatten.getKey(), flatten.getValue());
} else {
result.put(entry.getKey(), entry.getValue());
}
}
return result;
}
private static Map.Entry<String, Object> flattenEntry(String key, Map<String, Object> value) {
if (value.size() > 1) {
throw new IllegalArgumentException("You cannot flatten a map with an entry that is a map of more than one element, conflicting key: " + key);
}
Map.Entry<String, Object> entry = value.entrySet().iterator().next();
String newKey = key + "." + entry.getKey();
Object newValue = entry.getValue();
if (newValue instanceof Map<?, ?> map) {
return flattenEntry(newKey, (Map<String, Object>) map);
} else {
return Map.entry(newKey, newValue);
}
}
}

View File

@@ -9,11 +9,11 @@ import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.services.FlowService;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import org.codehaus.commons.nullanalysis.NotNull;
import java.util.NoSuchElementException;

View File

@@ -112,7 +112,7 @@ class JsonSchemaGeneratorTest {
var requiredWithDefault = definitions.get("io.kestra.core.docs.JsonSchemaGeneratorTest-RequiredWithDefault");
assertThat(requiredWithDefault, is(notNullValue()));
assertThat((List<String>) requiredWithDefault.get("required"), not(contains("requiredWithDefault")));
assertThat((List<String>) requiredWithDefault.get("required"), not(containsInAnyOrder("requiredWithDefault", "anotherRequiredWithDefault")));
var properties = (Map<String, Map<String, Object>>) flow.get("properties");
var listeners = properties.get("listeners");
@@ -253,7 +253,7 @@ class JsonSchemaGeneratorTest {
void requiredAreRemovedIfThereIsADefault() {
Map<String, Object> generate = jsonSchemaGenerator.properties(Task.class, RequiredWithDefault.class);
assertThat(generate, is(not(nullValue())));
assertThat((List<String>) generate.get("required"), not(containsInAnyOrder("requiredWithDefault")));
assertThat((List<String>) generate.get("required"), not(containsInAnyOrder("requiredWithDefault", "anotherRequiredWithDefault")));
assertThat((List<String>) generate.get("required"), containsInAnyOrder("requiredWithNoDefault"));
}
@@ -466,6 +466,11 @@ class JsonSchemaGeneratorTest {
@Builder.Default
private Property<TaskWithEnum.TestClass> requiredWithDefault = Property.ofValue(TaskWithEnum.TestClass.builder().testProperty("test").build());
@PluginProperty
@NotNull
@Builder.Default
private Property<TaskWithEnum.TestClass> anotherRequiredWithDefault = Property.ofValue(TaskWithEnum.TestClass.builder().testProperty("test2").build());
@PluginProperty
@NotNull
private Property<TaskWithEnum.TestClass> requiredWithNoDefault;

View File

@@ -94,6 +94,14 @@ public class QueryFilterTest {
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.ENDS_WITH).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.CONTAINS).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.EQUALS).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.NOT_EQUALS).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.IN).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.NOT_IN).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.STARTS_WITH).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.ENDS_WITH).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.CONTAINS).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.EQUALS).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.NOT_EQUALS).build(), Resource.EXECUTION),
@@ -204,6 +212,13 @@ public class QueryFilterTest {
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.REGEX).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.TRIGGER_ID).operation(Op.PREFIX).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.GREATER_THAN).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.LESS_THAN).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.GREATER_THAN_OR_EQUAL_TO).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.LESS_THAN_OR_EQUAL_TO).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.REGEX).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.EXECUTION_ID).operation(Op.PREFIX).build(), Resource.LOG),
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.GREATER_THAN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.LESS_THAN).build(), Resource.EXECUTION),
Arguments.of(QueryFilter.builder().field(Field.CHILD_FILTER).operation(Op.GREATER_THAN_OR_EQUAL_TO).build(), Resource.EXECUTION),

View File

@@ -199,6 +199,7 @@ public abstract class AbstractExecutionRepositoryTest {
return Stream.of(
QueryFilter.builder().field(Field.TIME_RANGE).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.TRIGGER_ID).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.EXECUTION_ID).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.WORKER_ID).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.EXISTING_ONLY).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.MIN_LEVEL).value(Level.DEBUG).operation(Op.EQUALS).build()

View File

@@ -160,6 +160,7 @@ public abstract class AbstractFlowRepositoryTest {
QueryFilter.builder().field(Field.TIME_RANGE).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).value("executionTriggerId").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.TRIGGER_ID).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.EXECUTION_ID).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.CHILD_FILTER).value(ChildFilter.CHILD).operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.WORKER_ID).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.EXISTING_ONLY).value("test").operation(Op.EQUALS).build(),

View File

@@ -34,6 +34,7 @@ import static io.kestra.core.models.flows.FlowScope.SYSTEM;
import static io.kestra.core.models.flows.FlowScope.USER;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatReflectiveOperationException;
import static org.junit.jupiter.api.Assertions.assertThrows;
@KestraTest
@@ -42,11 +43,15 @@ public abstract class AbstractLogRepositoryTest {
protected LogRepositoryInterface logRepository;
protected static LogEntry.LogEntryBuilder logEntry(Level level) {
return logEntry(level, IdUtils.create());
}
protected static LogEntry.LogEntryBuilder logEntry(Level level, String executionId) {
return LogEntry.builder()
.flowId("flowId")
.namespace("io.kestra.unittest")
.taskId("taskId")
.executionId("executionId")
.executionId(executionId)
.taskRunId(IdUtils.create())
.attemptNumber(0)
.timestamp(Instant.now())
@@ -60,13 +65,36 @@ public abstract class AbstractLogRepositoryTest {
@ParameterizedTest
@MethodSource("filterCombinations")
void should_find_all(QueryFilter filter){
logRepository.save(logEntry(Level.INFO).build());
logRepository.save(logEntry(Level.INFO, "executionId").build());
ArrayListTotal<LogEntry> entries = logRepository.find(Pageable.UNPAGED, MAIN_TENANT, List.of(filter));
assertThat(entries).hasSize(1);
}
@ParameterizedTest
@MethodSource("filterCombinations")
void should_find_async(QueryFilter filter){
logRepository.save(logEntry(Level.INFO, "executionId").build());
Flux<LogEntry> find = logRepository.findAsync(MAIN_TENANT, List.of(filter));
List<LogEntry> logEntries = find.collectList().block();
assertThat(logEntries).hasSize(1);
}
@ParameterizedTest
@MethodSource("filterCombinations")
void should_delete_with_filter(QueryFilter filter){
logRepository.save(logEntry(Level.INFO, "executionId").build());
logRepository.deleteByFilters(MAIN_TENANT, List.of(filter));
assertThat(logRepository.findAllAsync(MAIN_TENANT).collectList().block()).isEmpty();
}
static Stream<QueryFilter> filterCombinations() {
return Stream.of(
QueryFilter.builder().field(Field.QUERY).value("flowId").operation(Op.EQUALS).build(),
@@ -105,6 +133,13 @@ public abstract class AbstractLogRepositoryTest {
QueryFilter.builder().field(Field.TRIGGER_ID).value("Id").operation(Op.ENDS_WITH).build(),
QueryFilter.builder().field(Field.TRIGGER_ID).value(List.of("triggerId")).operation(Op.IN).build(),
QueryFilter.builder().field(Field.TRIGGER_ID).value(List.of("anotherId")).operation(Op.NOT_IN).build(),
QueryFilter.builder().field(Field.EXECUTION_ID).value("executionId").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.EXECUTION_ID).value("anotherId").operation(Op.NOT_EQUALS).build(),
QueryFilter.builder().field(Field.EXECUTION_ID).value("xecution").operation(Op.CONTAINS).build(),
QueryFilter.builder().field(Field.EXECUTION_ID).value("execution").operation(Op.STARTS_WITH).build(),
QueryFilter.builder().field(Field.EXECUTION_ID).value("Id").operation(Op.ENDS_WITH).build(),
QueryFilter.builder().field(Field.EXECUTION_ID).value(List.of("executionId")).operation(Op.IN).build(),
QueryFilter.builder().field(Field.EXECUTION_ID).value(List.of("anotherId")).operation(Op.NOT_IN).build(),
QueryFilter.builder().field(Field.MIN_LEVEL).value(Level.DEBUG).operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.MIN_LEVEL).value(Level.ERROR).operation(Op.NOT_EQUALS).build()
);
@@ -284,36 +319,6 @@ public abstract class AbstractLogRepositoryTest {
assertThat(find.size()).isZero();
}
@Test
void findAsync() {
logRepository.save(logEntry(Level.INFO).build());
logRepository.save(logEntry(Level.ERROR).build());
logRepository.save(logEntry(Level.WARN).build());
logRepository.save(logEntry(Level.INFO).executionKind(ExecutionKind.TEST).build()); // should not be visible here
ZonedDateTime startDate = ZonedDateTime.now().minusSeconds(1);
Flux<LogEntry> find = logRepository.findAsync(MAIN_TENANT, "io.kestra.unittest", null, null, Level.INFO, startDate);
List<LogEntry> logEntries = find.collectList().block();
assertThat(logEntries).hasSize(3);
find = logRepository.findAsync(MAIN_TENANT, null, null, null, Level.ERROR, startDate);
logEntries = find.collectList().block();
assertThat(logEntries).hasSize(1);
find = logRepository.findAsync(MAIN_TENANT, "io.kestra.unittest", "flowId", null, Level.ERROR, startDate);
logEntries = find.collectList().block();
assertThat(logEntries).hasSize(1);
find = logRepository.findAsync(MAIN_TENANT, "io.kestra.unused", "flowId", "executionId", Level.INFO, startDate);
logEntries = find.collectList().block();
assertThat(logEntries).hasSize(0);
find = logRepository.findAsync(MAIN_TENANT, null, null, null, Level.INFO, startDate.plusSeconds(2));
logEntries = find.collectList().block();
assertThat(logEntries).hasSize(0);
}
@Test
void findAllAsync() {
logRepository.save(logEntry(Level.INFO).build());

View File

@@ -101,6 +101,7 @@ public abstract class AbstractTriggerRepositoryTest {
QueryFilter.builder().field(Field.STATE).value(State.Type.RUNNING).operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.TIME_RANGE).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.TRIGGER_EXECUTION_ID).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.EXECUTION_ID).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.CHILD_FILTER).value(ChildFilter.CHILD).operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.EXISTING_ONLY).value("test").operation(Op.EQUALS).build(),
QueryFilter.builder().field(Field.MIN_LEVEL).value(Level.DEBUG).operation(Op.EQUALS).build()

View File

@@ -423,6 +423,12 @@ public abstract class AbstractRunnerTest {
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem();
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-queue-fail.yml"})
void concurrencyQueueRestarted() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueueRestarted();
}
@Test
@ExecuteFlow("flows/valids/executable-fail.yml")
void badExecutable(Execution execution) {

View File

@@ -8,6 +8,7 @@ import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
@@ -53,6 +54,9 @@ public class FlowConcurrencyCaseTest {
@Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue;
@Inject
private ExecutionService executionService;
public void flowConcurrencyCancel() throws TimeoutException, QueueException, InterruptedException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
Execution execution2 = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel");
@@ -278,6 +282,66 @@ public class FlowConcurrencyCaseTest {
assertThat(terminated.getState().getCurrent()).isEqualTo(Type.SUCCESS);
}
public void flowConcurrencyQueueRestarted() throws Exception {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-fail", null, null, Duration.ofSeconds(30));
Flow flow = flowRepository
.findById(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-fail", Optional.empty())
.orElseThrow();
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
executionQueue.emit(execution2);
assertThat(execution1.getState().isRunning()).isTrue();
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CREATED);
var executionResult1 = new AtomicReference<Execution>();
var executionResult2 = new AtomicReference<Execution>();
CountDownLatch latch1 = new CountDownLatch(2);
AtomicReference<Execution> failedExecution = new AtomicReference<>();
CountDownLatch latch2 = new CountDownLatch(1);
CountDownLatch latch3 = new CountDownLatch(1);
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
if (e.getLeft().getId().equals(execution1.getId())) {
executionResult1.set(e.getLeft());
if (e.getLeft().getState().getCurrent() == Type.FAILED) {
failedExecution.set(e.getLeft());
latch1.countDown();
}
}
if (e.getLeft().getId().equals(execution2.getId())) {
executionResult2.set(e.getLeft());
if (e.getLeft().getState().getCurrent() == State.Type.RUNNING) {
latch2.countDown();
}
if (e.getLeft().getState().getCurrent() == Type.FAILED) {
latch3.countDown();
}
}
});
assertTrue(latch2.await(1, TimeUnit.MINUTES));
assertThat(failedExecution.get()).isNotNull();
// here the first fail and the second is now running.
// we restart the first one, it should be queued then fail again.
Execution restarted = executionService.restart(failedExecution.get(), null);
executionQueue.emit(restarted);
assertTrue(latch3.await(1, TimeUnit.MINUTES));
assertTrue(latch1.await(1, TimeUnit.MINUTES));
receive.blockLast();
assertThat(executionResult1.get().getState().getCurrent()).isEqualTo(Type.FAILED);
// it should have been queued after restarted
assertThat(executionResult1.get().getState().getHistories().stream().anyMatch(history -> history.getState() == Type.RESTARTED)).isTrue();
assertThat(executionResult1.get().getState().getHistories().stream().anyMatch(history -> history.getState() == Type.QUEUED)).isTrue();
assertThat(executionResult2.get().getState().getCurrent()).isEqualTo(Type.FAILED);
assertThat(executionResult2.get().getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
assertThat(executionResult2.get().getState().getHistories().get(1).getState()).isEqualTo(State.Type.QUEUED);
assertThat(executionResult2.get().getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
}
private URI storageUpload() throws URISyntaxException, IOException {
File tempFile = File.createTempFile("file", ".txt");

View File

@@ -5,6 +5,7 @@ import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.queues.QueueException;
@@ -18,6 +19,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -77,8 +79,12 @@ public class TaskCacheTest {
@Plugin
public static class CounterTask extends Task implements RunnableTask<CounterTask.Output> {
private String workingDir;
@Override
public Output run(RunContext runContext) throws Exception {
Map<String, Object> variables = Map.of("workingDir", runContext.workingDir().path().toString());
runContext.render(this.workingDir, variables);
return Output.builder()
.counter(COUNTER.incrementAndGet())
.build();

View File

@@ -0,0 +1,462 @@
package io.kestra.core.utils;
import org.junit.jupiter.api.Test;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.function.Function;
import static org.assertj.core.api.Assertions.*;
class EitherTest {
@Test
void shouldCreateLeftInstance() {
// Given
String leftValue = "error";
// When
Either<String, Integer> either = Either.left(leftValue);
// Then
assertThat(either).isInstanceOf(Either.Left.class);
assertThat(either.isLeft()).isTrue();
assertThat(either.isRight()).isFalse();
assertThat(either.getLeft()).isEqualTo(leftValue);
}
@Test
void shouldCreateRightInstance() {
// Given
Integer rightValue = 42;
// When
Either<String, Integer> either = Either.right(rightValue);
// Then
assertThat(either).isInstanceOf(Either.Right.class);
assertThat(either.isRight()).isTrue();
assertThat(either.isLeft()).isFalse();
assertThat(either.getRight()).isEqualTo(rightValue);
}
@Test
void shouldCreateLeftWithNullValue() {
// When
Either<String, Integer> either = Either.left(null);
// Then
assertThat(either.isLeft()).isTrue();
assertThat(either.getLeft()).isNull();
}
@Test
void shouldCreateRightWithNullValue() {
// When
Either<String, Integer> either = Either.right(null);
// Then
assertThat(either.isRight()).isTrue();
assertThat(either.getRight()).isNull();
}
@Test
void leftShouldReturnCorrectValues() {
// Given
String leftValue = "error message";
Either<String, Integer> either = Either.left(leftValue);
// Then
assertThat(either.isLeft()).isTrue();
assertThat(either.isRight()).isFalse();
assertThat(either.getLeft()).isEqualTo(leftValue);
}
@Test
void leftShouldThrowExceptionWhenGettingRightValue() {
// Given
Either<String, Integer> either = Either.left("error");
// When/Then
assertThatThrownBy(either::getRight)
.isInstanceOf(NoSuchElementException.class)
.hasMessage("This is Left");
}
@Test
void rightShouldReturnCorrectValues() {
// Given
Integer rightValue = 100;
Either<String, Integer> either = Either.right(rightValue);
// Then
assertThat(either.isRight()).isTrue();
assertThat(either.isLeft()).isFalse();
assertThat(either.getRight()).isEqualTo(rightValue);
}
@Test
void rightShouldThrowExceptionWhenGettingLeftValue() {
// Given
Either<String, Integer> either = Either.right(42);
// When/Then
assertThatThrownBy(either::getLeft)
.isInstanceOf(NoSuchElementException.class)
.hasMessage("This is Right");
}
@Test
void shouldApplyLeftFunctionForLeftInstanceInFold() {
// Given
Either<String, Integer> either = Either.left("error");
Function<String, String> leftFn = s -> "Left: " + s;
Function<Integer, String> rightFn = i -> "Right: " + i;
// When
String result = either.fold(leftFn, rightFn);
// Then
assertThat(result).isEqualTo("Left: error");
}
@Test
void shouldApplyRightFunctionForRightInstanceInFold() {
// Given
Either<String, Integer> either = Either.right(42);
Function<String, String> leftFn = s -> "Left: " + s;
Function<Integer, String> rightFn = i -> "Right: " + i;
// When
String result = either.fold(leftFn, rightFn);
// Then
assertThat(result).isEqualTo("Right: 42");
}
@Test
void shouldHandleNullReturnValuesInFold() {
// Given
Either<String, Integer> leftEither = Either.left("error");
Either<String, Integer> rightEither = Either.right(42);
// When
String leftResult = leftEither.fold(s -> null, i -> "not null");
String rightResult = rightEither.fold(s -> "not null", i -> null);
// Then
assertThat(leftResult).isNull();
assertThat(rightResult).isNull();
}
@Test
void leftProjectionShouldExistForLeftInstance() {
// Given
Either<String, Integer> either = Either.left("error");
// When
Either.LeftProjection<String, Integer> projection = either.left();
// Then
assertThat(projection.exists()).isTrue();
assertThat(projection.get()).isEqualTo("error");
}
@Test
void leftProjectionShouldNotExistForRightInstance() {
// Given
Either<String, Integer> either = Either.right(42);
// When
Either.LeftProjection<String, Integer> projection = either.left();
// Then
assertThat(projection.exists()).isFalse();
assertThatThrownBy(projection::get)
.isInstanceOf(NoSuchElementException.class)
.hasMessage("This is Right");
}
@Test
void leftProjectionMapShouldTransformLeftValue() {
// Given
Either<String, Integer> either = Either.left("error");
// When
Either<Integer, Integer> result = either.left().map(String::length);
// Then
assertThat(result.isLeft()).isTrue();
assertThat(result.getLeft()).isEqualTo(5);
}
@Test
void leftProjectionMapShouldPreserveRightValue() {
// Given
Either<String, Integer> either = Either.right(42);
// When
Either<Integer, Integer> result = either.left().map(String::length);
// Then
assertThat(result.isRight()).isTrue();
assertThat(result.getRight()).isEqualTo(42);
}
@Test
void leftProjectionFlatMapShouldTransformLeftValue() {
// Given
Either<String, Integer> either = Either.left("error");
// When
Either<Integer, Integer> result = either.left().flatMap(s -> Either.left(s.length()));
// Then
assertThat(result.isLeft()).isTrue();
assertThat(result.getLeft()).isEqualTo(5);
}
@Test
void leftProjectionFlatMapShouldPreserveRightValue() {
// Given
Either<String, Integer> either = Either.right(42);
// When
Either<Integer, Integer> result = either.left().flatMap(s -> Either.left(s.length()));
// Then
assertThat(result.isRight()).isTrue();
assertThat(result.getRight()).isEqualTo(42);
}
@Test
void leftProjectionFlatMapCanReturnRight() {
// Given
Either<String, Integer> either = Either.left("error");
// When
Either<String, Integer> result = either.left().flatMap(s -> Either.right(999));
// Then
assertThat(result.isRight()).isTrue();
assertThat(result.getRight()).isEqualTo(999);
}
@Test
void leftProjectionToOptionalShouldReturnPresentForLeft() {
// Given
Either<String, Integer> either = Either.left("error");
// When
Optional<String> optional = either.left().toOptional();
// Then
assertThat(optional).isPresent();
assertThat(optional.get()).isEqualTo("error");
}
@Test
void leftProjectionToOptionalShouldReturnEmptyForRight() {
// Given
Either<String, Integer> either = Either.right(42);
// When
Optional<String> optional = either.left().toOptional();
// Then
assertThat(optional).isEmpty();
}
@Test
void leftProjectionConstructorShouldThrowForNullEither() {
// When/Then
assertThatThrownBy(() -> new Either.LeftProjection<>(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("either can't be null");
}
@Test
void rightProjectionShouldExistForRightInstance() {
// Given
Either<String, Integer> either = Either.right(42);
// When
Either.RightProjection<String, Integer> projection = either.right();
// Then
assertThat(projection.exists()).isTrue();
assertThat(projection.get()).isEqualTo(42);
}
@Test
void rightProjectionShouldNotExistForLeftInstance() {
// Given
Either<String, Integer> either = Either.left("error");
// When
Either.RightProjection<String, Integer> projection = either.right();
// Then
assertThat(projection.exists()).isFalse();
assertThatThrownBy(projection::get)
.isInstanceOf(NoSuchElementException.class)
.hasMessage("This is Left");
}
@Test
void rightProjectionMapShouldTransformRightValue() {
// Given
Either<String, Integer> either = Either.right(42);
// When
Either<String, String> result = either.right().map(Object::toString);
// Then
assertThat(result.isRight()).isTrue();
assertThat(result.getRight()).isEqualTo("42");
}
@Test
void rightProjectionMapShouldPreserveLeftValue() {
// Given
Either<String, Integer> either = Either.left("error");
// When
Either<String, String> result = either.right().map(Object::toString);
// Then
assertThat(result.isLeft()).isTrue();
assertThat(result.getLeft()).isEqualTo("error");
}
@Test
void rightProjectionFlatMapShouldTransformRightValue() {
// Given
Either<String, Integer> either = Either.right(42);
// When
Either<String, String> result = either.right().flatMap(i -> Either.right(i.toString()));
// Then
assertThat(result.isRight()).isTrue();
assertThat(result.getRight()).isEqualTo("42");
}
@Test
void rightProjectionFlatMapShouldPreserveLeftValue() {
// Given
Either<String, Integer> either = Either.left("error");
// When
Either<String, String> result = either.right().flatMap(i -> Either.right(i.toString()));
// Then
assertThat(result.isLeft()).isTrue();
assertThat(result.getLeft()).isEqualTo("error");
}
@Test
void rightProjectionFlatMapCanReturnLeft() {
// Given
Either<String, Integer> either = Either.right(42);
// When
Either<String, Integer> result = either.right().flatMap(i -> Either.left("converted"));
// Then
assertThat(result.isLeft()).isTrue();
assertThat(result.getLeft()).isEqualTo("converted");
}
@Test
void rightProjectionToOptionalShouldReturnPresentForRight() {
// Given
Either<String, Integer> either = Either.right(42);
// When
Optional<Integer> optional = either.right().toOptional();
// Then
assertThat(optional).isPresent();
assertThat(optional.get()).isEqualTo(42);
}
@Test
void rightProjectionToOptionalShouldReturnEmptyForLeft() {
// Given
Either<String, Integer> either = Either.left("error");
// When
Optional<Integer> optional = either.right().toOptional();
// Then
assertThat(optional).isEmpty();
}
@Test
void rightProjectionConstructorShouldThrowForNullEither() {
// When/Then
assertThatThrownBy(() -> new Either.RightProjection<>(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("either can't be null");
}
@Test
void shouldHandleNullValuesInTransformations() {
// Given
Either<String, Integer> leftEither = Either.left(null);
Either<String, Integer> rightEither = Either.right(null);
// When/Then
assertThat(leftEither.left().map(s -> s == null ? "was null" : s).getLeft())
.isEqualTo("was null");
assertThat(rightEither.right().map(i -> i == null ? "was null" : i.toString()).getRight())
.isEqualTo("was null");
}
@Test
void shouldHandleComplexTypeTransformations() {
// Given
Either<Exception, String> either = Either.right("hello world");
// When
Either<String, Integer> result = either
.left().map(Exception::getMessage)
.right().map(String::length);
// Then
assertThat(result.isRight()).isTrue();
assertThat(result.getRight()).isEqualTo(11);
}
@Test
void shouldChainTransformationsCorrectly() {
// Given
Either<String, Integer> either = Either.right(10);
// When
Either<String, String> result = either
.right().flatMap(i -> i > 5 ? Either.right(i * 2) : Either.left("too small"))
.right().map(i -> "Result: " + i);
// Then
assertThat(result.isRight()).isTrue();
assertThat(result.getRight()).isEqualTo("Result: 20");
}
@Test
void shouldHandleProjectionChainingWithErrorCases() {
// Given
Either<String, Integer> either = Either.right(3);
// When
Either<String, String> result = either
.right().flatMap(i -> i > 5 ? Either.right(i * 2) : Either.left("too small"))
.right().map(i -> "Result: " + i);
// Then
assertThat(result.isLeft()).isTrue();
assertThat(result.getLeft()).isEqualTo("too small");
}
}

View File

@@ -1,6 +1,9 @@
package io.kestra.core.utils;
import org.junit.Assert;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -25,7 +28,7 @@ class EnumsTest {
@Test
void shouldThrowExceptionGivenInvalidString() {
Assertions.assertThrows(IllegalArgumentException.class, () -> {
assertThrows(IllegalArgumentException.class, () -> {
Enums.getForNameIgnoreCase("invalid", TestEnum.class);
});
}
@@ -49,11 +52,22 @@ class EnumsTest {
String invalidValue = "invalidValue";
// Act & Assert
IllegalArgumentException exception = Assert.assertThrows(IllegalArgumentException.class, () ->
IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () ->
Enums.fromString(invalidValue, mapping, "TestEnumWithValue")
);
}
@Test
void should_get_from_list(){
assertThat(Enums.fromList(List.of(TestEnum.ENUM1, TestEnum.ENUM2), TestEnum.class)).isEqualTo(List.of(TestEnum.ENUM1, TestEnum.ENUM2));
assertThat(Enums.fromList(List.of("ENUM1", "ENUM2"), TestEnum.class)).isEqualTo(List.of(TestEnum.ENUM1, TestEnum.ENUM2));
assertThat(Enums.fromList(TestEnum.ENUM1, TestEnum.class)).isEqualTo(List.of(TestEnum.ENUM1));
assertThat(Enums.fromList("ENUM1", TestEnum.class)).isEqualTo(List.of(TestEnum.ENUM1));
assertThrows(IllegalArgumentException.class, () -> Enums.fromList(List.of("string1", "string2"), TestEnum.class));
assertThrows(IllegalArgumentException.class, () -> Enums.fromList("non enum value", TestEnum.class));
}
enum TestEnum {
ENUM1, ENUM2
}

View File

@@ -6,6 +6,7 @@ import java.util.Collections;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
class ListUtilsTest {
@@ -36,4 +37,19 @@ class ListUtilsTest {
assertThat(ListUtils.concat(list1, null)).isEqualTo(List.of("1", "2"));
assertThat(ListUtils.concat(null, list2)).isEqualTo(List.of("3", "4"));
}
@Test
void convertToList(){
assertThat(ListUtils.convertToList(List.of(1, 2, 3))).isEqualTo(List.of(1, 2, 3));
assertThrows(IllegalArgumentException.class, () -> ListUtils.convertToList("not a list"));
}
@Test
void convertToListString(){
assertThat(ListUtils.convertToListString(List.of("string1", "string2"))).isEqualTo(List.of("string1", "string2"));
assertThat(ListUtils.convertToListString(List.of())).isEqualTo(List.of());
assertThrows(IllegalArgumentException.class, () -> ListUtils.convertToListString("not a list"));
assertThrows(IllegalArgumentException.class, () -> ListUtils.convertToListString(List.of(1, 2, 3)));
}
}

View File

@@ -9,6 +9,7 @@ import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
class MapUtilsTest {
@SuppressWarnings("unchecked")
@@ -194,4 +195,23 @@ class MapUtilsTest {
assertThat(results).hasSize(1);
// due to ordering change on each JVM restart, the result map would be different as different entries will be skipped
}
@Test
void shouldFlattenANestedMap() {
Map<String, Object> results = MapUtils.nestedToFlattenMap(Map.of("k1",Map.of("k2", Map.of("k3", "v1")), "k4", "v2"));
assertThat(results).hasSize(2);
assertThat(results).containsAllEntriesOf(Map.of(
"k1.k2.k3", "v1",
"k4", "v2"
));
}
@Test
void shouldThrowIfNestedMapContainsMultipleEntries() {
var exception = assertThrows(IllegalArgumentException.class,
() -> MapUtils.nestedToFlattenMap(Map.of("k1", Map.of("k2", Map.of("k3", "v1"), "k4", "v2")))
);
assertThat(exception.getMessage()).isEqualTo("You cannot flatten a map with an entry that is a map of more than one element, conflicting key: k1");
}
}

View File

@@ -4,6 +4,7 @@ namespace: io.kestra.tests
tasks:
- id: cache
type: io.kestra.core.runners.TaskCacheTest$CounterTask
workingDir: "{{workingDir}}"
taskCache:
enabled: true
ttl: PT1S

View File

@@ -0,0 +1,13 @@
id: flow-concurrency-queue-fail
namespace: io.kestra.tests
concurrency:
behavior: QUEUE
limit: 1
tasks:
- id: sleep
type: io.kestra.plugin.core.flow.Sleep
duration: PT2S
- id: fail
type: io.kestra.plugin.core.execution.Fail

View File

@@ -1,152 +1,199 @@
#!/bin/bash
#!/usr/bin/env bash
#===============================================================================
# SCRIPT: release-plugins.sh
#
# DESCRIPTION:
# This script can be used to run a ./gradlew release command on each kestra plugin repository.
# By default, if no `GITHUB_PAT` environment variable exist, the script will attempt to clone GitHub repositories using SSH_KEY.
# Runs Gradle release for one or multiple Kestra plugin repositories.
# - If $GITHUB_PAT is set, HTTPS cloning via PAT is used.
# - Otherwise, SSH cloning is used (requires SSH key configured on runner).
#
# USAGE:
# ./release-plugins.sh [options] [plugin-repositories...]
#
# USAGE: ./release-plugins.sh [options]
# OPTIONS:
# --release-version <version> Specify the release version (required)
# --next-version <version> Specify the next version (required)
# --dry-run Specify to run in DRY_RUN.
# -y, --yes Automatically confirm prompts (non-interactive).
# -h, --help Show the help message and exit
# --release-version <version> Specify the release version (required).
# --next-version <version> Specify the next (development) version (required).
# --plugin-file <path> File containing the plugin list (default: ../.plugins).
# --dry-run Run in DRY_RUN mode (no publish, no changes pushed).
# --only-changed Skip repositories with no commits since last tag (or --since-tag).
# --since-tag <tag> Use this tag as base for change detection (default: last tag).
# -y, --yes Automatically confirm prompts (non-interactive).
# -h, --help Show this help message and exit.
#
# EXAMPLES:
# To release all plugins:
# ./release-plugins.sh --release-version=0.20.0 --next-version=0.21.0-SNAPSHOT
# To release a specific plugin:
# ./release-plugins.sh --release-version=0.20.0 --next-version=0.21.0-SNAPSHOT plugin-kubernetes
# To release specific plugins from file:
# ./release-plugins.sh --release-version=0.20.0 --plugin-file .plugins
# # Release all plugins from .plugins:
# ./release-plugins.sh --release-version=1.0.0 --next-version=1.1.0-SNAPSHOT
#
# # Release a specific plugin:
# ./release-plugins.sh --release-version=1.0.0 --next-version=1.1.0-SNAPSHOT plugin-kubernetes
#
# # Release specific plugins from file:
# ./release-plugins.sh --release-version=1.0.0 --next-version=1.1.0-SNAPSHOT --plugin-file .plugins
#
# # Release only plugins that have changed since the last tag:
# ./release-plugins.sh --release-version=1.0.0 --next-version=1.1.0-SNAPSHOT --only-changed --yes
#===============================================================================
set -e;
set -euo pipefail
###############################################################
# Global vars
# Globals
###############################################################
BASEDIR=$(dirname "$(readlink -f $0)")
WORKING_DIR=/tmp/kestra-release-plugins-$(date +%s);
BASEDIR=$(dirname "$(readlink -f "$0")")
WORKING_DIR="/tmp/kestra-release-plugins-$(date +%s)"
PLUGIN_FILE="$BASEDIR/../.plugins"
GIT_BRANCH=master
GIT_BRANCH="master" # Fallback if default branch cannot be detected
###############################################################
# Functions
###############################################################
# Function to display the help message
usage() {
echo "Usage: $0 --release-version <version> --next-version [plugin-repositories...]"
echo
echo "Options:"
echo " --release-version <version> Specify the release version (required)."
echo " --next-version <version> Specify the next version (required)."
echo " --plugin-file File containing the plugin list (default: .plugins)"
echo " --dry-run Specify to run in DRY_RUN."
echo " -y, --yes Automatically confirm prompts (non-interactive)."
echo " -h, --help Show this help message and exit."
exit 1
echo "Usage: $0 --release-version <version> --next-version <version> [options] [plugin-repositories...]"
echo
echo "Options:"
echo " --release-version <version> Specify the release version (required)."
echo " --next-version <version> Specify the next version (required)."
echo " --plugin-file <path> File containing the plugin list (default: ../.plugins)."
echo " --dry-run Run in DRY_RUN mode."
echo " --only-changed Skip repositories with no commits since last tag (or --since-tag)."
echo " --since-tag <tag> Use this tag as base for change detection (default: last tag)."
echo " -y, --yes Automatically confirm prompts (non-interactive)."
echo " -h, --help Show this help message and exit."
exit 1
}
# Function to ask to continue
function askToContinue() {
read -p "Are you sure you want to continue? [y/N] " confirm
askToContinue() {
read -r -p "Are you sure you want to continue? [y/N] " confirm
[[ "$confirm" =~ ^[Yy]$ ]] || { echo "Operation cancelled."; exit 1; }
}
# Detect default branch from remote; fallback to $GIT_BRANCH if unknown
detect_default_branch() {
local default_branch
default_branch=$(git remote show origin | sed -n '/HEAD branch/s/.*: //p' || true)
if [[ -z "${default_branch:-}" ]]; then
default_branch="$GIT_BRANCH"
fi
echo "$default_branch"
}
# Return last tag that matches v* or any tag if v* not found; empty if none
last_tag_or_empty() {
local tag
tag=$(git tag --list 'v*' --sort=-v:refname | head -n1 || true)
if [[ -z "${tag:-}" ]]; then
tag=$(git tag --sort=-creatordate | head -n1 || true)
fi
echo "$tag"
}
# True (0) if there are commits since tag on branch, False (1) otherwise.
has_changes_since_tag() {
local tag="$1"
local branch="$2"
if [[ -z "$tag" ]]; then
# No tag => consider it as changed (first release)
return 0
fi
git fetch --tags --quiet
git fetch origin "$branch" --quiet
local count
count=$(git rev-list --count "${tag}..origin/${branch}" || echo "0")
[[ "${count}" -gt 0 ]]
}
###############################################################
# Options
# Options parsing
###############################################################
PLUGINS_ARGS=()
AUTO_YES=false
DRY_RUN=false
# Get the options
ONLY_CHANGED=false
SINCE_TAG=""
RELEASE_VERSION=""
NEXT_VERSION=""
while [[ "$#" -gt 0 ]]; do
case "$1" in
--release-version)
RELEASE_VERSION="$2"
shift 2
;;
--release-version=*)
RELEASE_VERSION="${1#*=}"
shift
;;
--next-version)
NEXT_VERSION="$2"
shift 2
;;
--next-version=*)
NEXT_VERSION="${1#*=}"
shift
;;
--plugin-file)
PLUGIN_FILE="$2"
shift 2
;;
--plugin-file=*)
PLUGIN_FILE="${1#*=}"
shift
;;
--dry-run)
DRY_RUN=true
shift
;;
-y|--yes)
AUTO_YES=true
shift
;;
-h|--help)
usage
;;
*)
PLUGINS_ARGS+=("$1")
shift
;;
esac
case "$1" in
--release-version)
RELEASE_VERSION="$2"; shift 2 ;;
--release-version=*)
RELEASE_VERSION="${1#*=}"; shift ;;
--next-version)
NEXT_VERSION="$2"; shift 2 ;;
--next-version=*)
NEXT_VERSION="${1#*=}"; shift ;;
--plugin-file)
PLUGIN_FILE="$2"; shift 2 ;;
--plugin-file=*)
PLUGIN_FILE="${1#*=}"; shift ;;
--dry-run)
DRY_RUN=true; shift ;;
--only-changed)
ONLY_CHANGED=true; shift ;;
--since-tag)
SINCE_TAG="$2"; shift 2 ;;
--since-tag=*)
SINCE_TAG="${1#*=}"; shift ;;
-y|--yes)
AUTO_YES=true; shift ;;
-h|--help)
usage ;;
*)
PLUGINS_ARGS+=("$1"); shift ;;
esac
done
## Check options
# Required options
if [[ -z "$RELEASE_VERSION" ]]; then
echo -e "Missing required argument: --release-version\n";
usage
echo -e "Missing required argument: --release-version\n"; usage
fi
if [[ -z "$NEXT_VERSION" ]]; then
echo -e "Missing required argument: --next-version\n";
usage
echo -e "Missing required argument: --next-version\n"; usage
fi
## Get plugin list
###############################################################
# Build plugin list (from args or from .plugins)
###############################################################
PLUGINS_ARRAY=()
PLUGINS_COUNT=0
if [[ "${#PLUGINS_ARGS[@]}" -eq 0 ]]; then
if [ -f "$PLUGIN_FILE" ]; then
PLUGINS=$(cat "$PLUGIN_FILE" | grep "io\\.kestra\\." | sed -e '/#/s/^.//' | cut -d':' -f1 | uniq | sort);
PLUGINS_COUNT=$(echo "$PLUGINS" | wc -l);
PLUGINS_ARRAY=$(echo "$PLUGINS" | xargs || echo '');
PLUGINS_ARRAY=($PLUGINS_ARRAY);
if [[ -f "$PLUGIN_FILE" ]]; then
# Keep only uncommented lines, then keep the first column (repo name)
mapfile -t PLUGINS_ARRAY < <(
grep -E '^\s*[^#]' "$PLUGIN_FILE" 2>/dev/null \
| grep "io\.kestra\." \
| cut -d':' -f1 \
| uniq | sort
)
PLUGINS_COUNT="${#PLUGINS_ARRAY[@]}"
else
echo "Plugin file not found: $PLUGIN_FILE"
exit 1
fi
else
PLUGINS_ARRAY=("${PLUGINS_ARGS[@]}")
PLUGINS_COUNT="${#PLUGINS_ARGS[@]}"
fi
# Extract the major and minor versions
# Extract major.minor (e.g. 0.21) to build the release branch name
BASE_VERSION=$(echo "$RELEASE_VERSION" | sed -E 's/^([0-9]+\.[0-9]+)\..*/\1/')
PUSH_RELEASE_BRANCH="releases/v${BASE_VERSION}.x"
## Get plugin list
echo "RELEASE_VERSION=$RELEASE_VERSION"
echo "NEXT_VERSION=$NEXT_VERSION"
echo "PUSH_RELEASE_BRANCH=$PUSH_RELEASE_BRANCH"
echo "GIT_BRANCH=$GIT_BRANCH"
echo "GIT_BRANCH=$GIT_BRANCH (fallback)"
echo "DRY_RUN=$DRY_RUN"
echo "Found ($PLUGINS_COUNT) plugin repositories:";
echo "ONLY_CHANGED=$ONLY_CHANGED"
echo "SINCE_TAG=${SINCE_TAG:-<auto>}"
echo "Found ($PLUGINS_COUNT) plugin repositories:"
for PLUGIN in "${PLUGINS_ARRAY[@]}"; do
echo "$PLUGIN"
echo " - $PLUGIN"
done
if [[ "$AUTO_YES" == false ]]; then
@@ -156,49 +203,77 @@ fi
###############################################################
# Main
###############################################################
mkdir -p $WORKING_DIR
mkdir -p "$WORKING_DIR"
COUNTER=1;
for PLUGIN in "${PLUGINS_ARRAY[@]}"
do
cd $WORKING_DIR;
COUNTER=1
for PLUGIN in "${PLUGINS_ARRAY[@]}"; do
cd "$WORKING_DIR"
echo "---------------------------------------------------------------------------------------"
echo "[$COUNTER/$PLUGINS_COUNT] Release Plugin: $PLUGIN"
echo "---------------------------------------------------------------------------------------"
if [[ -z "${GITHUB_PAT}" ]]; then
git clone git@github.com:kestra-io/$PLUGIN
# Clone the repo using SSH, otherwise PAT if provided
if [[ -z "${GITHUB_PAT:-}" ]]; then
git clone "git@github.com:kestra-io/${PLUGIN}.git"
else
echo "Clone git repository using GITHUB PAT"
git clone https://${GITHUB_PAT}@github.com/kestra-io/$PLUGIN.git
git clone "https://${GITHUB_PAT}@github.com/kestra-io/${PLUGIN}.git"
fi
cd "$PLUGIN";
if [[ "$PLUGIN" == "plugin-transform" ]] && [[ "$GIT_BRANCH" == "master" ]]; then # quickfix
git checkout main;
else
git checkout "$GIT_BRANCH";
cd "$PLUGIN"
# Determine the default branch dynamically to avoid hardcoding "master"/"main"
DEFAULT_BRANCH=$(detect_default_branch)
git checkout "$DEFAULT_BRANCH"
# Skip if the release tag already exists on remote (check both with and without 'v' prefix)
TAG_EXISTS=$(
{ git ls-remote --tags origin "refs/tags/v${RELEASE_VERSION}" \
&& git ls-remote --tags origin "refs/tags/${RELEASE_VERSION}"; } | wc -l
)
if [[ "$TAG_EXISTS" -ne 0 ]]; then
echo "Tag ${RELEASE_VERSION} already exists for $PLUGIN. Skipping..."
COUNTER=$(( COUNTER + 1 ))
continue
fi
# Change detection (if requested)
if [[ "$ONLY_CHANGED" == true ]]; then
git fetch --tags --quiet
git fetch origin "$DEFAULT_BRANCH" --quiet
BASE_TAG="$SINCE_TAG"
if [[ -z "$BASE_TAG" ]]; then
BASE_TAG=$(last_tag_or_empty)
fi
if has_changes_since_tag "$BASE_TAG" "$DEFAULT_BRANCH"; then
echo "Changes detected since ${BASE_TAG:-<no-tag>} on ${DEFAULT_BRANCH}, proceeding."
else
echo "No changes since ${BASE_TAG:-<no-tag>} on ${DEFAULT_BRANCH} for $PLUGIN. Skipping..."
COUNTER=$(( COUNTER + 1 ))
continue
fi
fi
if [[ "$DRY_RUN" == false ]]; then
CURRENT_BRANCH=$(git branch --show-current);
echo "Run gradle release for plugin: $PLUGIN";
echo "Branch: $CURRENT_BRANCH";
CURRENT_BRANCH=$(git branch --show-current)
echo "Run gradle release for plugin: $PLUGIN"
echo "Branch: $CURRENT_BRANCH"
if [[ "$AUTO_YES" == false ]]; then
askToContinue
fi
# Create and push release branch
git checkout -b "$PUSH_RELEASE_BRANCH";
git push -u origin "$PUSH_RELEASE_BRANCH";
# Create and push the release branch (branch that will hold the release versions)
git checkout -b "$PUSH_RELEASE_BRANCH"
git push -u origin "$PUSH_RELEASE_BRANCH"
# Run gradle release
git checkout "$CURRENT_BRANCH";
# Switch back to the working branch to run the gradle release
git checkout "$CURRENT_BRANCH"
# Run Gradle release with snapshot tolerance if releaseVersion contains -SNAPSHOT
if [[ "$RELEASE_VERSION" == *"-SNAPSHOT" ]]; then
# -SNAPSHOT qualifier maybe used to test release-candidates
./gradlew release -Prelease.useAutomaticVersion=true \
-Prelease.releaseVersion="${RELEASE_VERSION}" \
-Prelease.newVersion="${NEXT_VERSION}" \
@@ -211,19 +286,28 @@ do
-Prelease.pushReleaseVersionBranch="${PUSH_RELEASE_BRANCH}"
fi
git push;
# Update the upper bound version of kestra
# Push new commits/tags created by the release plugin
git push --follow-tags
# Update the upper bound version of Kestra on the release branch (e.g., [0.21,))
PLUGIN_KESTRA_VERSION="[${BASE_VERSION},)"
git checkout "$PUSH_RELEASE_BRANCH" && git pull;
git checkout "$PUSH_RELEASE_BRANCH" && git pull --ff-only
sed -i "s/^kestraVersion=.*/kestraVersion=${PLUGIN_KESTRA_VERSION}/" ./gradle.properties
git add ./gradle.properties
git commit -m"chore(deps): update kestraVersion to ${PLUGIN_KESTRA_VERSION}."
git push
sleep 5; # add a short delay to not spam Maven Central
else
echo "Skip gradle release [DRY_RUN=true]";
fi
COUNTER=$(( COUNTER + 1 ));
done;
exit 0;
# Commit only if there are actual changes staged
if ! git diff --cached --quiet; then
git commit -m "chore(deps): update kestraVersion to ${PLUGIN_KESTRA_VERSION}."
git push
fi
# Small delay to avoid hammering Maven Central
sleep 5
else
echo "Skip gradle release [DRY_RUN=true]"
fi
COUNTER=$(( COUNTER + 1 ))
done
exit 0

View File

@@ -4,6 +4,7 @@ import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.Variables;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.UnsupportedMessageException;
import io.kestra.core.runners.WorkerTaskResult;
import io.kestra.core.utils.IdUtils;
import io.kestra.jdbc.runner.JdbcQueueTest;
@@ -31,7 +32,8 @@ class PostgresQueueTest extends JdbcQueueTest {
.build();
var exception = assertThrows(QueueException.class, () -> workerTaskResultQueue.emit(workerTaskResult));
assertThat(exception.getMessage()).isEqualTo("Unable to emit a message to the queue");
assertThat(exception).isInstanceOf(UnsupportedMessageException.class);
assertThat(exception.getMessage()).contains("ERROR: unsupported Unicode escape sequence");
assertThat(exception.getCause()).isInstanceOf(DataException.class);
}
}

View File

@@ -303,7 +303,7 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
.where(this.defaultFilter(tenantId, false))
.and(NORMAL_KIND_CONDITION);
select = this.filter(select, filters, "start_date", Resource.EXECUTION);
select = select.and(this.filter(filters, "start_date", Resource.EXECUTION));
return select;
}

View File

@@ -594,9 +594,9 @@ public abstract class AbstractJdbcFlowRepository extends AbstractJdbcRepository
}
@SuppressWarnings("unchecked")
private <R extends Record, E> SelectConditionStep<R> getFindFlowSelect(String tenantId, List<QueryFilter> filters, DSLContext context, List<Field<Object>> additionalFieldsToSelect) {
private <R extends Record> SelectConditionStep<R> getFindFlowSelect(String tenantId, List<QueryFilter> filters, DSLContext context, List<Field<Object>> additionalFieldsToSelect) {
var select = this.fullTextSelect(tenantId, context, additionalFieldsToSelect != null ? additionalFieldsToSelect : List.of());
select = this.filter(select, filters, null, Resource.FLOW);
select = select.and(this.filter(filters, null, Resource.FLOW));
return (SelectConditionStep<R>) select;
}

View File

@@ -8,7 +8,6 @@ import io.kestra.core.models.dashboards.DataFilterKPI;
import io.kestra.core.models.dashboards.filters.AbstractFilter;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.statistics.LogStatistics;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.utils.DateUtils;
@@ -25,17 +24,15 @@ import org.slf4j.event.Level;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.time.Duration;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository implements LogRepositoryInterface {
private static final Condition NORMAL_KIND_CONDITION = field("execution_kind").isNull();
public static final String DATE_COLUMN = "timestamp";
protected io.kestra.jdbc.AbstractJdbcRepository<LogEntry> jdbcRepository;
public AbstractJdbcLogRepository(io.kestra.jdbc.AbstractJdbcRepository<LogEntry> jdbcRepository,
@@ -56,7 +53,7 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
protected Map<Logs.Fields, String> getFieldsMapping() {
return Map.of(
Logs.Fields.DATE, "timestamp",
Logs.Fields.DATE, DATE_COLUMN,
Logs.Fields.NAMESPACE, "namespace",
Logs.Fields.FLOW_ID, "flow_id",
Logs.Fields.TASK_ID, "task_id",
@@ -101,87 +98,16 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
.where(this.defaultFilter(tenantId))
.and(NORMAL_KIND_CONDITION);
select = this.filter(select, filters, "timestamp", Resource.LOG);
select = select.and(this.filter(filters, DATE_COLUMN, Resource.LOG));
return this.jdbcRepository.fetchPage(context, select, pageable);
});
}
private <T extends Record> SelectConditionStep<T> filter(
SelectConditionStep<T> select,
@Nullable String query,
@Nullable String namespace,
@Nullable String flowId,
@Nullable String triggerId,
@Nullable Level minLevel,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate
) {
select = addNamespace(select, namespace);
if (flowId != null) {
select = select.and(field("flow_id").eq(flowId));
}
if (triggerId != null) {
select = select.and(field("trigger_id").eq(triggerId));
}
select = addMinLevel(select, minLevel);
if (query != null) {
select = select.and(this.findCondition(query));
}
if (startDate != null) {
select = select.and(field("timestamp").greaterOrEqual(startDate.toOffsetDateTime()));
}
if (endDate != null) {
select = select.and(field("timestamp").lessOrEqual(endDate.toOffsetDateTime()));
}
return select;
}
private <T extends Record> SelectConditionStep<T> addMinLevel(SelectConditionStep<T> select,
Level minLevel) {
if (minLevel != null) {
select = select.and(minLevel(minLevel));
}
return select;
}
private static <T extends Record> SelectConditionStep<T> addNamespace(SelectConditionStep<T> select,
String namespace) {
if (namespace != null) {
select = select.and(DSL.or(field("namespace").eq(namespace), field("namespace").likeIgnoreCase(namespace + ".%")));
}
return select;
}
private static <T extends Record> SelectConditionStep<T> addFlowId(SelectConditionStep<T> select, String flowId) {
if (flowId != null) {
select = select.and(field("flow_id").eq(flowId));
}
return select;
}
private static <T extends Record> SelectConditionStep<T> addExecutionId(SelectConditionStep<T> select, String executionId) {
if (executionId != null) {
select = select.and(field("execution_id").eq(executionId));
}
return select;
}
@Override
public Flux<LogEntry> findAsync(
@Nullable String tenantId,
@Nullable String namespace,
@Nullable String flowId,
@Nullable String executionId,
@Nullable Level minLevel,
ZonedDateTime startDate
@Nullable String tenantId,
List<QueryFilter> filters
){
return Flux.create(emitter -> this.jdbcRepository
.getDslContextWrapper()
@@ -194,15 +120,11 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId))
.and(NORMAL_KIND_CONDITION);
addNamespace(select, namespace);
addFlowId(select, flowId);
addExecutionId(select, executionId);
addMinLevel(select, minLevel);
select = select.and(field("timestamp").greaterThan(startDate.toOffsetDateTime()));
Select<Record1<Object>> query = this.jdbcRepository.buildQuery(context, select, "timestamp");
select = select.and(this.filter(filters, DATE_COLUMN, Resource.LOG));
select.orderBy(field(DATE_COLUMN).asc());
try (Stream<Record1<Object>> stream = query.fetchSize(FETCH_SIZE).stream()){
try (Stream<Record1<Object>> stream = select.fetchSize(FETCH_SIZE).stream()){
stream.map((Record record) -> jdbcRepository.map(record))
.forEach(emitter::next);
} finally {
@@ -233,44 +155,6 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
}), FluxSink.OverflowStrategy.BUFFER);
}
private List<LogStatistics> fillDate(
List<LogStatistics> result,
ZonedDateTime startDate,
ZonedDateTime endDate,
ChronoUnit unit,
String format
) {
DateUtils.GroupType groupByType = DateUtils.groupByType(Duration.between(startDate, endDate));
List<LogStatistics> filledResult = new ArrayList<>();
ZonedDateTime currentDate = startDate;
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(format).withZone(ZoneId.systemDefault());
while (currentDate.isBefore(endDate)) {
String finalCurrentDate = currentDate.format(formatter);
LogStatistics current = result.stream()
.filter(metric -> formatter.format(metric.getTimestamp()).equals(finalCurrentDate))
.collect(Collectors.groupingBy(LogStatistics::getTimestamp))
.values()
.stream()
.map(logStatistics -> {
Map<Level, Long> collect = logStatistics
.stream()
.map(LogStatistics::getCounts)
.flatMap(levelLongMap -> levelLongMap.entrySet().stream())
.collect(Collectors.groupingBy(Map.Entry::getKey, Collectors.summingLong(Map.Entry::getValue)));
return logStatistics.getFirst().toBuilder().counts(collect).build();
})
.findFirst()
.orElse(LogStatistics.builder().timestamp(currentDate.toInstant()).groupBy(groupByType.val()).build());
filledResult.add(current);
currentDate = currentDate.plus(1, unit);
}
return filledResult;
}
@Override
public List<LogEntry> findByExecutionId(String tenantId, String executionId, Level minLevel) {
return findByExecutionId(tenantId, executionId, minLevel, true);
@@ -517,10 +401,10 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
var delete = context
.delete(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId))
.and(field("timestamp").lessOrEqual(endDate.toOffsetDateTime()));
.and(field(DATE_COLUMN).lessOrEqual(endDate.toOffsetDateTime()));
if (startDate != null) {
delete = delete.and(field("timestamp").greaterOrEqual(startDate.toOffsetDateTime()));
delete = delete.and(field(DATE_COLUMN).greaterOrEqual(startDate.toOffsetDateTime()));
}
if (namespace != null) {
@@ -543,6 +427,22 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
});
}
@Override
public void deleteByFilters(String tenantId, List<QueryFilter> filters){
this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
var delete = context
.delete(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
delete = delete.and(this.filter(filters, DATE_COLUMN, Resource.LOG));
return delete.execute();
});
}
private ArrayListTotal<LogEntry> query(String tenantId, Condition condition, Level minLevel, Pageable pageable) {
return this.jdbcRepository
.getDslContextWrapper()
@@ -583,7 +483,7 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
}
return this.jdbcRepository.fetch(select
.orderBy(field("timestamp").sort(SortOrder.ASC))
.orderBy(field(DATE_COLUMN).sort(SortOrder.ASC))
);
});
}
@@ -632,17 +532,6 @@ public abstract class AbstractJdbcLogRepository extends AbstractJdbcRepository i
});
}
private Field<?> aggregate(String aggregation) {
return switch (aggregation) {
case "avg" -> DSL.avg(field("attempt_number", Double.class)).as("metric_value");
case "sum" -> DSL.sum(field("attempt_number", Double.class)).as("metric_value");
case "min" -> DSL.min(field("attempt_number", Double.class)).as("metric_value");
case "max" -> DSL.max(field("attempt_number", Double.class)).as("metric_value");
case "count" -> DSL.count().as("metric_value");
default -> throw new IllegalArgumentException("Invalid aggregation: " + aggregation);
};
}
@Override
public ArrayListTotal<Map<String, Object>> fetchData(
String tenantId,

View File

@@ -13,6 +13,7 @@ import io.kestra.core.models.flows.State;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.repositories.ExecutionRepositoryInterface.ChildFilter;
import io.kestra.core.utils.DateUtils;
import io.kestra.core.utils.Enums;
import io.kestra.core.utils.ListUtils;
import io.kestra.jdbc.services.JdbcFilterService;
import io.micronaut.context.annotation.Value;
@@ -240,50 +241,50 @@ public abstract class AbstractJdbcRepository {
return column.getField() != null ? field(fieldsMapping.get(column.getField())) : null;
}
protected <T extends Record> SelectConditionStep<T> filter(
SelectConditionStep<T> select,
protected Condition filter(
List<QueryFilter> filters,
String dateColumn,
Resource resource
) {
List<Condition> conditions = new ArrayList<>();
if (filters != null) {
QueryFilter.validateQueryFilters(filters, resource);
for (QueryFilter filter : filters) {
QueryFilter.Field field = filter.field();
QueryFilter.Op operation = filter.operation();
Object value = filter.value();
select = getConditionOnField(select, field, value, operation, dateColumn);
conditions.add(getConditionOnField(field, value, operation, dateColumn));
}
}
return select;
return conditions.stream()
.reduce(DSL.noCondition(), Condition::and);
}
/**
*
* @param dateColumn the JDBC column name of the logical date to filter on with {@link io.kestra.core.models.QueryFilter.Field#START_DATE} and/or {@link QueryFilter.Field#END_DATE}
*/
protected <T extends Record> SelectConditionStep<T> getConditionOnField(
SelectConditionStep<T> select,
protected Condition getConditionOnField(
QueryFilter.Field field,
Object value,
QueryFilter.Op operation,
@Nullable String dateColumn
) {
if (field.equals(QueryFilter.Field.QUERY)) {
return handleQuery(select, value, operation);
return handleQuery(value, operation);
}
// Handling for Field.STATE
if (field.equals(QueryFilter.Field.STATE)) {
return select.and(generateStateCondition(value, operation));
return generateStateCondition(value, operation);
}
// Handle Field.CHILD_FILTER
if (field.equals(QueryFilter.Field.CHILD_FILTER)) {
return handleChildFilter(select, value);
return handleChildFilter(value);
}
// Handling for Field.MIN_LEVEL
if (field.equals(QueryFilter.Field.MIN_LEVEL)) {
return handleMinLevelField(select, value, operation);
return handleMinLevelField(value, operation);
}
// Special handling for START_DATE and END_DATE
@@ -294,16 +295,16 @@ public abstract class AbstractJdbcRepository {
OffsetDateTime dateTime = (value instanceof ZonedDateTime)
? ((ZonedDateTime) value).toOffsetDateTime()
: ZonedDateTime.parse(value.toString()).toOffsetDateTime();
return applyDateCondition(select, dateTime, operation, dateColumn);
return applyDateCondition(dateTime, operation, dateColumn);
}
if (field == QueryFilter.Field.SCOPE) {
return applyScopeCondition(select, value, operation);
return applyScopeCondition(value, operation);
}
if (field.equals(QueryFilter.Field.LABELS)) {
if (value instanceof Map<?, ?> map){
return select.and(findLabelCondition(map, operation));
return findLabelCondition(map, operation);
} else {
throw new InvalidQueryFiltersException("Label field value must but instance of Map");
}
@@ -313,37 +314,22 @@ public abstract class AbstractJdbcRepository {
Name columnName = DSL.quotedName(field.name().toLowerCase());
// Default handling for other fields
switch (operation) {
case EQUALS -> select = select.and(DSL.field(columnName).eq(value));
case NOT_EQUALS -> select = select.and(DSL.field(columnName).ne(value));
case GREATER_THAN -> select = select.and(DSL.field(columnName).greaterThan(value));
case LESS_THAN -> select = select.and(DSL.field(columnName).lessThan(value));
case IN -> {
if (value instanceof Collection<?>) {
select = select.and(DSL.field(columnName).in((Collection<?>) value));
} else {
throw new InvalidQueryFiltersException("IN operation requires a collection as value");
}
}
case NOT_IN -> {
if (value instanceof Collection<?>) {
select = select.and(DSL.field(columnName).notIn((Collection<?>) value));
} else {
throw new InvalidQueryFiltersException("NOT_IN operation requires a collection as value");
}
}
case STARTS_WITH -> select = select.and(DSL.field(columnName).like(value + "%"));
return switch (operation) {
case EQUALS -> DSL.field(columnName).eq(value);
case NOT_EQUALS -> DSL.field(columnName).ne(value);
case GREATER_THAN -> DSL.field(columnName).greaterThan(value);
case LESS_THAN -> DSL.field(columnName).lessThan(value);
case IN -> DSL.field(columnName).in(ListUtils.convertToList(value));
case NOT_IN -> DSL.field(columnName).notIn(ListUtils.convertToList(value));
case STARTS_WITH -> DSL.field(columnName).like(value + "%");
case ENDS_WITH -> select = select.and(DSL.field(columnName).like("%" + value));
case CONTAINS -> select = select.and(DSL.field(columnName).like("%" + value + "%"));
case REGEX -> select = select.and(DSL.field(columnName).likeRegex((String) value));
case PREFIX -> select = select.and(
DSL.field(columnName).like(value + ".%")
.or(DSL.field(columnName).eq(value))
);
case ENDS_WITH -> DSL.field(columnName).like("%" + value);
case CONTAINS -> DSL.field(columnName).like("%" + value + "%");
case REGEX -> DSL.field(columnName).likeRegex((String) value);
case PREFIX -> DSL.field(columnName).like(value + ".%")
.or(DSL.field(columnName).eq(value));
default -> throw new InvalidQueryFiltersException("Unsupported operation: " + operation);
}
return select;
};
}
protected Condition findQueryCondition(String query) {
@@ -378,43 +364,36 @@ public abstract class AbstractJdbcRepository {
.in(state.stream().map(Enum::name).toList());
}
private <T extends Record> SelectConditionStep<T> handleQuery(SelectConditionStep<T> select,
Object value,
QueryFilter.Op operation) {
private Condition handleQuery(Object value, QueryFilter.Op operation) {
Condition condition = findQueryCondition(value.toString());
return switch (operation) {
case EQUALS -> select.and(condition);
case NOT_EQUALS -> select.andNot(condition);
case EQUALS -> condition;
case NOT_EQUALS -> condition.not();
default -> throw new InvalidQueryFiltersException("Unsupported operation for QUERY field: " + operation);
};
}
// Handle CHILD_FILTER field logic
private <T extends Record> SelectConditionStep<T> handleChildFilter(SelectConditionStep<T> select, Object value) {
private Condition handleChildFilter(Object value) {
ChildFilter childFilter = (value instanceof String val) ? ChildFilter.valueOf(val) : (ChildFilter) value;
return switch (childFilter) {
case CHILD -> select.and(field("trigger_execution_id").isNotNull());
case MAIN -> select.and(field("trigger_execution_id").isNull());
case CHILD -> field("trigger_execution_id").isNotNull();
case MAIN -> field("trigger_execution_id").isNull();
};
}
private <T extends Record> SelectConditionStep<T> handleMinLevelField(
SelectConditionStep<T> select,
Object value,
QueryFilter.Op operation
) {
private Condition handleMinLevelField(Object value, QueryFilter.Op operation) {
Level minLevel = value instanceof Level ? (Level) value : Level.valueOf((String) value);
switch (operation) {
case EQUALS -> select = select.and(minLevelCondition(minLevel));
case NOT_EQUALS -> select = select.and(minLevelCondition(minLevel).not());
return switch (operation) {
case EQUALS -> minLevelCondition(minLevel);
case NOT_EQUALS -> minLevelCondition(minLevel).not();
default -> throw new InvalidQueryFiltersException(
"Unsupported operation for MIN_LEVEL: " + operation
);
}
return select;
};
}
private Condition minLevelCondition(Level minLevel) {
@@ -425,53 +404,32 @@ public abstract class AbstractJdbcRepository {
return field("level").in(levels.stream().map(level -> level.name()).toList());
}
private <T extends Record> SelectConditionStep<T> applyDateCondition(
SelectConditionStep<T> select, OffsetDateTime dateTime, QueryFilter.Op operation, String fieldName
) {
switch (operation) {
case LESS_THAN -> select = select.and(field(fieldName).lessThan(dateTime));
case LESS_THAN_OR_EQUAL_TO -> select = select.and(field(fieldName).lessOrEqual(dateTime));
case GREATER_THAN -> select = select.and(field(fieldName).greaterThan(dateTime));
case GREATER_THAN_OR_EQUAL_TO -> select = select.and(field(fieldName).greaterOrEqual(dateTime));
case EQUALS -> select = select.and(field(fieldName).eq(dateTime));
case NOT_EQUALS -> select = select.and(field(fieldName).ne(dateTime));
private Condition applyDateCondition(OffsetDateTime dateTime, QueryFilter.Op operation, String fieldName) {
return switch (operation) {
case LESS_THAN -> field(fieldName).lessThan(dateTime);
case LESS_THAN_OR_EQUAL_TO -> field(fieldName).lessOrEqual(dateTime);
case GREATER_THAN -> field(fieldName).greaterThan(dateTime);
case GREATER_THAN_OR_EQUAL_TO -> field(fieldName).greaterOrEqual(dateTime);
case EQUALS -> field(fieldName).eq(dateTime);
case NOT_EQUALS -> field(fieldName).ne(dateTime);
default ->
throw new InvalidQueryFiltersException("Unsupported operation for date condition: " + operation);
}
return select;
};
}
private <T extends Record> SelectConditionStep<T> applyScopeCondition(
SelectConditionStep<T> select, Object value, QueryFilter.Op operation) {
if (!(value instanceof List<?> scopeValues)) {
throw new InvalidQueryFiltersException("Invalid value for SCOPE filtering");
private Condition applyScopeCondition(Object value, QueryFilter.Op operation) {
List<FlowScope> flowScopes = Enums.fromList(value, FlowScope.class);
if (flowScopes.size() > 1){
throw new InvalidQueryFiltersException("Only one scope can be use in the same time");
}
FlowScope scope = flowScopes.getFirst();
List<FlowScope> validScopes = Arrays.stream(FlowScope.values()).toList();
if (!validScopes.containsAll(scopeValues)) {
throw new InvalidQueryFiltersException("Scope values must be a subset of FlowScope");
}
if (operation != QueryFilter.Op.EQUALS && operation != QueryFilter.Op.NOT_EQUALS) {
throw new InvalidQueryFiltersException("Unsupported operation for SCOPE: " + operation);
}
boolean isEqualsOperation = (operation == QueryFilter.Op.EQUALS);
String systemNamespace = this.getSystemFlowNamespace();
if (scopeValues.contains(FlowScope.USER)) {
Condition userCondition = isEqualsOperation
? field("namespace").ne(systemNamespace)
: field("namespace").eq(systemNamespace);
select = select.and(userCondition);
} else if (scopeValues.contains(FlowScope.SYSTEM)) {
Condition systemCondition = isEqualsOperation
? field("namespace").eq(systemNamespace)
: field("namespace").ne(systemNamespace);
select = select.and(systemCondition);
}
return select;
return switch (operation){
case EQUALS -> FlowScope.USER.equals(scope) ? field("namespace").ne(systemNamespace) : field("namespace").eq(systemNamespace);
case NOT_EQUALS -> FlowScope.USER.equals(scope) ? field("namespace").eq(systemNamespace) : field("namespace").ne(systemNamespace);
default -> throw new InvalidQueryFiltersException("Unsupported operation for SCOPE: " + operation);
};
}

View File

@@ -305,7 +305,7 @@ public abstract class AbstractJdbcTriggerRepository extends AbstractJdbcReposito
.from(this.jdbcRepository.getTable())
.where(this.defaultFilter(tenantId));
return filter(select, filters, "next_execution_date", Resource.TRIGGER);
return select.and(filter(filters, "next_execution_date", Resource.TRIGGER));
}
@Override

View File

@@ -20,6 +20,12 @@ public class AbstractJdbcExecutionRunningStorage extends AbstractJdbcRepository
this.jdbcRepository = jdbcRepository;
}
public void save(ExecutionRunning executionRunning) {
jdbcRepository.getDslContextWrapper().transaction(
configuration -> save(DSL.using(configuration), executionRunning)
);
}
public void save(DSLContext dslContext, ExecutionRunning executionRunning) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(executionRunning);
this.jdbcRepository.persist(executionRunning, dslContext, fields);

View File

@@ -546,7 +546,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
}
// create an SLA monitor if needed
if (execution.getState().getCurrent() == State.Type.CREATED && !ListUtils.isEmpty(flow.getSla())) {
if ((execution.getState().getCurrent() == State.Type.CREATED || execution.getState().failedThenRestarted()) && !ListUtils.isEmpty(flow.getSla())) {
List<SLAMonitor> monitors = flow.getSla().stream()
.filter(ExecutionMonitoringSLA.class::isInstance)
.map(ExecutionMonitoringSLA.class::cast)
@@ -562,7 +562,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
// handle concurrency limit, we need to use a different queue to be sure that execution running
// are processed sequentially so inside a queue with no parallelism
if (execution.getState().getCurrent() == State.Type.CREATED && flow.getConcurrency() != null) {
if ((execution.getState().getCurrent() == State.Type.CREATED || execution.getState().failedThenRestarted()) && flow.getConcurrency() != null) {
ExecutionRunning executionRunning = ExecutionRunning.builder()
.tenantId(executor.getFlow().getTenantId())
.namespace(executor.getFlow().getNamespace())
@@ -1121,8 +1121,16 @@ public class JdbcExecutor implements ExecutorInterface, Service {
executor.getFlow().getId(),
throwConsumer(queued -> {
var newExecution = queued.withState(State.Type.RUNNING);
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
ExecutionRunning executionRunning = ExecutionRunning.builder()
.tenantId(newExecution.getTenantId())
.namespace(newExecution.getNamespace())
.flowId(newExecution.getFlowId())
.execution(newExecution)
.concurrencyState(ExecutionRunning.ConcurrencyState.RUNNING)
.build();
executionRunningStorage.save(executionRunning);
executionQueue.emit(newExecution);
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
})
);
}

View File

@@ -7,16 +7,13 @@ import com.google.common.collect.Iterables;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.QueueService;
import io.kestra.core.queues.*;
import io.kestra.core.utils.Either;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.core.utils.IdUtils;
import io.kestra.jdbc.JdbcTableConfigs;
import io.kestra.jdbc.JdbcMapper;
import io.kestra.jdbc.JooqDSLContextWrapper;
import io.kestra.core.queues.MessageTooBigException;
import io.kestra.jdbc.repository.AbstractJdbcRepository;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
@@ -151,6 +148,11 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
.execute();
});
} catch (DataException e) { // The exception is from the data itself, not the database/network/driver so instead of fail fast, we throw a recoverable QueueException
// Postgres refuses to store JSONB with the '\0000' codepoint as it has no textual representation.
// We try to detect that and fail with a specific exception so the Worker can recover from it.
if (e.getMessage() != null && e.getMessage().contains("ERROR: unsupported Unicode escape sequence")) {
throw new UnsupportedMessageException(e.getMessage(), e);
}
throw new QueueException("Unable to emit a message to the queue", e);
}
@@ -385,11 +387,8 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
Result<Record> result = this.receiveFetch(ctx, consumerGroup, queueName, forUpdate);
if (!result.isEmpty()) {
if (inTransaction) {
consumer.accept(ctx, this.map(result));
}
if (!result.isEmpty() && inTransaction) {
consumer.accept(ctx, this.map(result));
this.updateGroupOffsets(
ctx,
consumerGroup,
@@ -403,6 +402,13 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
if (!inTransaction) {
consumer.accept(null, this.map(fetch));
dslContextWrapper.transaction(configuration ->
this.updateGroupOffsets(
DSL.using(configuration),
consumerGroup,
queueName,
fetch.map(record -> record.get("offset", Integer.class))
));
}
pollSize.set(fetch.size());

View File

@@ -30,9 +30,9 @@ dependencies {
api platform("io.micronaut.platform:micronaut-platform:4.8.2")
api platform("io.qameta.allure:allure-bom:2.29.1")
// we define cloud bom here for GCP, Azure and AWS so they are aligned for all plugins that use them (secret, storage, oss and ee plugins)
api platform('com.google.cloud:libraries-bom:26.64.0')
api platform("com.azure:azure-sdk-bom:1.2.36")
api platform('software.amazon.awssdk:bom:2.32.11')
api platform('com.google.cloud:libraries-bom:26.65.0')
api platform("com.azure:azure-sdk-bom:1.2.37")
api platform('software.amazon.awssdk:bom:2.32.16')
constraints {
@@ -60,12 +60,12 @@ dependencies {
api("com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$jacksonVersion")
api("com.fasterxml.uuid:java-uuid-generator:$jugVersion")
// issue with the Docker lib having a too old version for the k8s extension
api("org.apache.commons:commons-compress:1.27.1")
api("org.apache.commons:commons-compress:1.28.0")
// Kafka
api "org.apache.kafka:kafka-clients:$kafkaVersion"
api "org.apache.kafka:kafka-streams:$kafkaVersion"
// AWS CRT is not included in the AWS BOM but needed for the S3 Transfer manager
api 'software.amazon.awssdk.crt:aws-crt:0.38.7'
api 'software.amazon.awssdk.crt:aws-crt:0.38.8'
// we need at least 0.14, it could be removed when Micronaut contains a recent only version in their BOM
api "io.micrometer:micrometer-core:1.15.2"
@@ -104,7 +104,7 @@ dependencies {
api group: 'com.github.victools', name: 'jsonschema-module-jackson', version: jsonschemaVersion
api group: 'com.github.victools', name: 'jsonschema-module-swagger-2', version: jsonschemaVersion
api 'com.h2database:h2:2.3.232'
api 'com.mysql:mysql-connector-j:9.3.0'
api 'com.mysql:mysql-connector-j:9.4.0'
api 'org.postgresql:postgresql:42.7.7'
api 'com.github.docker-java:docker-java:3.5.3'
api 'com.github.docker-java:docker-java-transport-httpclient5:3.5.3'
@@ -115,7 +115,7 @@ dependencies {
api "org.xhtmlrenderer:flying-saucer-core:$flyingSaucerVersion"
api "org.xhtmlrenderer:flying-saucer-pdf:$flyingSaucerVersion"
api group: 'jakarta.mail', name: 'jakarta.mail-api', version: '2.1.3'
api group: 'org.eclipse.angus', name: 'jakarta.mail', version: '2.0.3'
api group: 'org.eclipse.angus', name: 'jakarta.mail', version: '2.0.4'
api group: 'com.github.ben-manes.caffeine', name: 'caffeine', version: '3.2.2'
api group: 'de.siegmar', name: 'fastcsv', version: '4.0.0'
// Json Diff

View File

@@ -38,6 +38,10 @@ public abstract class AbstractTaskRunnerTest {
@Test
protected void run() throws Exception {
var runContext = runContext(this.runContextFactory);
simpleRun(runContext);
}
private void simpleRun(RunContext runContext) throws Exception {
var commands = initScriptCommands(runContext);
Mockito.when(commands.getCommands()).thenReturn(
Property.ofValue(ScriptService.scriptCommands(List.of("/bin/sh", "-c"), Collections.emptyList(), List.of("echo 'Hello World'")))
@@ -166,6 +170,13 @@ public abstract class AbstractTaskRunnerTest {
assertThat(taskException.getLogConsumer().getOutputs().get("logOutput")).isEqualTo("Hello World");
}
@Test
protected void canWorkMultipleTimeInSameWdir() throws Exception {
var runContext = runContext(this.runContextFactory);
simpleRun(runContext);
simpleRun(runContext);
}
protected RunContext runContext(RunContextFactory runContextFactory) {
return this.runContext(runContextFactory, null);
}
@@ -236,4 +247,4 @@ public abstract class AbstractTaskRunnerTest {
protected boolean needsToSpecifyWorkingDirectory() {
return false;
}
}
}

View File

@@ -4,7 +4,7 @@ plugins {
node {
download = true
version = '22.11.0'
version = '22.12.0'
}
tasks.register('assembleFrontend', NpmTask) {

757
ui/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -24,21 +24,22 @@
},
"dependencies": {
"@js-joda/core": "^5.6.5",
"@kestra-io/ui-libs": "^0.0.232",
"@kestra-io/ui-libs": "^0.0.237",
"@vue-flow/background": "^1.3.2",
"@vue-flow/controls": "^1.1.2",
"@vue-flow/core": "^1.45.0",
"@vueuse/core": "^13.5.0",
"@vueuse/core": "^13.6.0",
"ansi-to-html": "^0.7.2",
"axios": "^1.11.0",
"bootstrap": "^5.3.7",
"buffer": "^6.0.3",
"chart.js": "^4.5.0",
"core-js": "^3.44.0",
"core-js": "^3.45.0",
"cronstrue": "^3.2.0",
"cytoscape": "^3.33.0",
"dagre": "^0.8.5",
"el-table-infinite-scroll": "^3.0.7",
"element-plus": "^2.10.4",
"element-plus": "^2.10.5",
"humanize-duration": "^3.33.0",
"js-yaml": "^4.1.0",
"lodash": "^4.17.21",
@@ -56,12 +57,12 @@
"moment-timezone": "^0.5.46",
"nprogress": "^0.2.0",
"path-browserify": "^1.0.1",
"pdfjs-dist": "^5.3.93",
"pdfjs-dist": "^5.4.54",
"pinia": "^3.0.3",
"posthog-js": "^1.257.2",
"posthog-js": "^1.258.5",
"rapidoc": "^9.3.8",
"semver": "^7.7.2",
"shiki": "^3.8.1",
"shiki": "^3.9.2",
"splitpanes": "^3.2.0",
"throttle-debounce": "^5.0.2",
"vue": "^3.5.18",
@@ -77,58 +78,58 @@
"vue3-tour": "github:kestra-io/vue3-tour",
"vuex": "^4.1.0",
"xss": "^1.0.15",
"yaml": "^2.7.1"
"yaml": "^2.8.1"
},
"devDependencies": {
"@codecov/vite-plugin": "^1.9.1",
"@esbuild-plugins/node-modules-polyfill": "^0.2.2",
"@eslint/js": "^9.31.0",
"@playwright/test": "^1.54.1",
"@eslint/js": "^9.32.0",
"@playwright/test": "^1.54.2",
"@rushstack/eslint-patch": "^1.12.0",
"@shikijs/markdown-it": "^3.8.1",
"@storybook/addon-themes": "^9.0.18",
"@storybook/addon-vitest": "^9.0.18",
"@shikijs/markdown-it": "^3.9.2",
"@storybook/addon-themes": "^9.1.1",
"@storybook/addon-vitest": "^9.1.1",
"@storybook/test-runner": "^0.23.0",
"@storybook/vue3-vite": "^9.0.18",
"@storybook/vue3-vite": "^9.1.1",
"@types/humanize-duration": "^3.27.4",
"@types/js-yaml": "^4.0.9",
"@types/moment": "^2.11.29",
"@types/node": "^24.1.0",
"@types/node": "^24.2.0",
"@types/nprogress": "^0.2.3",
"@types/path-browserify": "^1.0.3",
"@types/semver": "^7.7.0",
"@types/testing-library__jest-dom": "^5.14.9",
"@types/testing-library__user-event": "^4.1.1",
"@typescript-eslint/parser": "^8.38.0",
"@vitejs/plugin-vue": "^6.0.0",
"@typescript-eslint/parser": "^8.39.0",
"@vitejs/plugin-vue": "^6.0.1",
"@vitejs/plugin-vue-jsx": "^5.0.1",
"@vitest/browser": "^3.2.4",
"@vitest/coverage-v8": "^3.2.4",
"@vue/eslint-config-prettier": "^10.2.0",
"@vue/test-utils": "^2.4.6",
"@vueuse/router": "^13.5.0",
"@vueuse/router": "^13.6.0",
"change-case": "5.4.4",
"cross-env": "^7.0.3",
"decompress": "^4.2.1",
"eslint": "^9.31.0",
"eslint-plugin-storybook": "^9.0.18",
"eslint": "^9.32.0",
"eslint-plugin-storybook": "^9.1.1",
"eslint-plugin-vue": "^9.33.0",
"globals": "^16.3.0",
"husky": "^9.1.7",
"jsdom": "^26.1.0",
"lint-staged": "^16.1.2",
"lint-staged": "^16.1.4",
"monaco-editor": "^0.52.2",
"monaco-yaml": "5.3.1",
"patch-package": "^8.0.0",
"playwright": "^1.53.0",
"prettier": "^3.6.2",
"rollup-plugin-copy": "^3.5.0",
"sass": "^1.89.2",
"storybook": "^9.0.18",
"sass": "^1.90.0",
"storybook": "^9.1.1",
"storybook-vue3-router": "^5.0.0",
"ts-node": "^10.9.2",
"typescript": "^5.8.3",
"typescript-eslint": "^8.38.0",
"typescript-eslint": "^8.39.0",
"uuid": "^11.1.0",
"vite": "^6.3.5",
"vitest": "^3.2.4"
@@ -137,12 +138,12 @@
"@esbuild/darwin-arm64": "^0.25.8",
"@esbuild/darwin-x64": "^0.25.8",
"@esbuild/linux-x64": "^0.25.8",
"@rollup/rollup-darwin-arm64": "^4.45.1",
"@rollup/rollup-darwin-x64": "^4.45.1",
"@rollup/rollup-linux-x64-gnu": "^4.45.1",
"@swc/core-darwin-arm64": "^1.13.2",
"@swc/core-darwin-x64": "^1.13.2",
"@swc/core-linux-x64-gnu": "^1.13.2"
"@rollup/rollup-darwin-arm64": "^4.46.2",
"@rollup/rollup-darwin-x64": "^4.46.2",
"@rollup/rollup-linux-x64-gnu": "^4.46.2",
"@swc/core-darwin-arm64": "^1.13.3",
"@swc/core-darwin-x64": "^1.13.3",
"@swc/core-linux-x64-gnu": "^1.13.3"
},
"overrides": {
"bootstrap": {

View File

@@ -29,6 +29,7 @@
import {useMiscStore} from "./stores/misc";
import {useExecutionsStore} from "./stores/executions";
import * as BasicAuth from "./utils/basicAuth";
import {useFlowStore} from "./stores/flow";
// Main App
export default {
@@ -49,8 +50,7 @@
},
computed: {
...mapState("auth", ["user"]),
...mapState("flow", ["overallTotal"]),
...mapStores(useApiStore, usePluginsStore, useLayoutStore, useCoreStore, useDocStore, useMiscStore, useExecutionsStore),
...mapStores(useApiStore, usePluginsStore, useLayoutStore, useCoreStore, useDocStore, useMiscStore, useExecutionsStore, useFlowStore),
envName() {
return this.layoutStore.envName || this.miscStore.configs?.environment?.name;
},
@@ -183,12 +183,12 @@
$route: {
async handler(route) {
if(route.name === "home" && this.isOSS) {
await this.$store.dispatch("flow/findFlows", {size: 10, sort: "id:asc"})
await this.flowStore.findFlows({size: 10, sort: "id:asc"})
await this.executionsStore.findExecutions({size: 10}).then(response => {
this.executions = response?.total ?? 0;
})
if (!this.executions && !this.overallTotal) {
if (!this.executions && !this.flowStore.overallTotal) {
this.$router.push({name: "welcome", params: {tenant: this.$route.params.tenant}});
}
}

View File

@@ -1,8 +1,7 @@
<template>
<el-drawer
data-component="FILENAME_PLACEHOLDER"
:model-value="props.modelValue"
@update:model-value="emit('update:modelValue', $event)"
v-model="modelValue"
destroy-on-close
lock-scroll
size=""
@@ -16,7 +15,7 @@
<slot name="header" />
</span>
<el-button link class="full-screen">
<Fullscreen :title="$t('toggle fullscreen')" @click="toggleFullScreen" />
<Fullscreen :title="t('toggle fullscreen')" @click="toggleFullScreen" />
</el-button>
</template>
@@ -30,28 +29,28 @@
</el-drawer>
</template>
<script setup>
<script lang="ts" setup>
import {ref} from "vue";
import {useI18n} from "vue-i18n";
import Fullscreen from "vue-material-design-icons/Fullscreen.vue"
const {t} = useI18n();
const props = defineProps({
modelValue: {
type: Boolean,
required: true
},
title: {
type: String,
required: false,
default: undefined
},
fullScreen: {
type: Boolean,
required: false,
default: false
}
});
const emit = defineEmits(["update:modelValue"])
const modelValue = defineModel({
type: Boolean,
required: true
});
const fullScreen = ref(props.fullScreen);

View File

@@ -103,9 +103,9 @@
{{ t("multi_panel_editor.close_all_tabs") }}
</span>
</el-dropdown-item>
<el-dropdown-item
<el-dropdown-item
v-if="panel.activeTab?.value === 'code'"
:icon="Keyboard"
:icon="Keyboard"
@click="showKeyShortcuts()"
>
<span class="small-text">
@@ -175,8 +175,6 @@
import {CODE_PREFIX} from "./flows/useCodePanels";
import {useKeyShortcuts} from "../utils/useKeyShortcuts";
import {useStore} from "vuex"
const store = useStore()
import CloseIcon from "vue-material-design-icons/Close.vue"
import CircleMediumIcon from "vue-material-design-icons/CircleMedium.vue"
@@ -186,8 +184,9 @@
import DockRight from "vue-material-design-icons/DockRight.vue";
import Close from "vue-material-design-icons/Close.vue";
import Keyboard from "vue-material-design-icons/Keyboard.vue";
import {useEditorStore} from "../stores/editor";
const {t} = useI18n({useScope: "global"});
const {t} = useI18n();
const {showKeyShortcuts} = useKeyShortcuts();
function throttle(callback: () => void, limit: number): () => void {
@@ -248,18 +247,20 @@
const leftPanelDragover = ref(false);
const rightPanelDragover = ref(false);
const editorStore = useEditorStore()
const handleTabClick = (panel: Panel, tab: Tab) => {
panel.activeTab = tab
if(tab.value.startsWith(CODE_PREFIX)){
store.commit("editor/setCurrentTab", {
editorStore.current = {
dirty: tab.dirty ?? false,
extension: tab.value.split(".").pop(),
flow: tab.value === CODE_PREFIX,
name: tab.value,
path: tab.value,
persistent: tab.value === CODE_PREFIX,
});
}
}
};

View File

@@ -22,9 +22,9 @@
</el-tab-pane>
</el-tabs>
<section v-if="isEditorActiveTab || activeTab.component" data-component="FILENAME_PLACEHOLDER#container" ref="container" v-bind="$attrs" :class="{...containerClass, 'maximized': activeTab.maximized}">
<EditorSidebar v-if="isEditorActiveTab" ref="sidebar" :style="`flex: 0 0 calc(${explorerWidth}% - 11px);`" :current-n-s="namespace" v-show="explorerVisible" />
<div v-if="isEditorActiveTab && explorerVisible" @mousedown.prevent.stop="dragSidebar" class="slider" />
<div v-if="isEditorActiveTab" :style="`flex: 1 1 ${100 - (isEditorActiveTab && explorerVisible ? explorerWidth : 0)}%;`">
<EditorSidebar v-if="isEditorActiveTab" ref="sidebar" :style="`flex: 0 0 calc(${editorStore.explorerWidth}% - 11px);`" :current-n-s="namespace" v-show="editorStore.explorerVisible" />
<div v-if="isEditorActiveTab && editorStore.explorerVisible" @mousedown.prevent.stop="dragSidebar" class="slider" />
<div v-if="isEditorActiveTab" :style="`flex: 1 1 ${100 - (isEditorActiveTab && editorStore.explorerVisible ? editorStore.explorerWidth : 0)}%;`">
<component
v-bind="{...activeTab.props, ...attrsWithoutClass}"
v-on="activeTab['v-on'] ?? {}"
@@ -49,17 +49,18 @@
ref="tabContent"
:is="activeTab.component"
@go-to-detail="blueprintId => selectedBlueprintId = blueprintId"
:namespace
:embed="activeTab.props && activeTab.props.embed !== undefined ? activeTab.props.embed : true"
/>
</section>
</template>
<script>
import {mapState, mapMutations} from "vuex";
import EditorSidebar from "./inputs/EditorSidebar.vue";
import EnterpriseBadge from "./EnterpriseBadge.vue";
import BlueprintDetail from "./flows/blueprints/BlueprintDetail.vue";
import {useEditorStore} from "../stores/editor";
import {mapStores} from "pinia";
export default {
components: {EditorSidebar, EnterpriseBadge,BlueprintDetail},
@@ -120,7 +121,6 @@
this.setActiveName();
},
methods: {
...mapMutations("editor", ["changeExplorerWidth", "closeExplorer"]),
dragSidebar(e){
const SELF = this;
@@ -133,7 +133,7 @@
document.onmousemove = function onMouseMove(e) {
let percent = blockWidthPercent + ((e.clientX - dragX) / parentWidth) * 100;
SELF.changeExplorerWidth(percent)
SELF.editorStore.changeExplorerWidth(percent)
};
document.onmouseup = () => {
@@ -172,7 +172,7 @@
},
},
computed: {
...mapState("editor", ["explorerVisible", "explorerWidth"]),
...mapStores(useEditorStore),
containerClass() {
return this.getTabClasses(this.activeTab);
},
@@ -191,7 +191,7 @@
) {
if (TAB === "files") return true;
this.closeExplorer();
this.editorStore.closeExplorer();
return false;
}

View File

@@ -9,18 +9,19 @@
:position
:block-schema-path="blockSchemaPath"
@update-task="(e) => editorUpdate(e)"
@reorder="(yaml) => store.commit('flow/setFlowYaml', yaml)"
@reorder="(yaml) => flowStore.flowYaml = yaml"
@close-task="() => emit('closeTask')"
/>
</template>
<script setup lang="ts">
import {computed, provide, ref} from "vue";
import {computed, provide, ref, watch} from "vue";
import debounce from "lodash/debounce";
import {useStore} from "vuex";
import * as YAML_UTILS from "@kestra-io/ui-libs/flow-yaml-utils";
import NoCode from "./NoCode.vue";
import {CREATE_TASK_FUNCTION_INJECTION_KEY, EDIT_TASK_FUNCTION_INJECTION_KEY} from "./injectionKeys";
import {useEditorStore} from "../../stores/editor";
import {useFlowStore} from "../../stores/flow";
export interface NoCodeProps {
creatingTask?: boolean;
@@ -48,8 +49,8 @@
emit("editTask", parentPath, blockSchemaPath, refPath)
});
const store = useStore();
const flowYaml = computed<string>(() => store.state.flow.flowYaml);
const flowStore = useFlowStore();
const flowYaml = computed<string>(() => flowStore.flowYaml ?? "");
const lastValidFlowYaml = computed<string>(
(oldValue) => {
@@ -63,16 +64,17 @@
);
const validateFlow = debounce(() => {
store.dispatch("flow/validateFlow", {flow: flowYaml.value});
flowStore.validateFlow({flow: flowYaml.value});
}, 500);
const timeout = ref();
const editorStore = useEditorStore();
const editorUpdate = (source: string) => {
store.commit("flow/setFlowYaml", source);
store.commit("flow/setHaveChange", true);
flowStore.flowYaml = source;
flowStore.haveChange = true;
validateFlow();
store.commit("editor/setTabDirty", {
editorStore.setTabDirty({
name: "Flow",
dirty: true
});
@@ -80,11 +82,20 @@
// throttle the trigger of the flow update
clearTimeout(timeout.value);
timeout.value = setTimeout(() => {
store.dispatch("flow/onEdit", {
flowStore.onEdit({
source,
currentIsFlow: true,
topologyVisible: true,
});
}, 1000);
};
watch(
() => flowStore.flowYaml,
(newVal, oldVal) => {
if (newVal !== oldVal) {
editorUpdate(newVal);
}
}
);
</script>

View File

@@ -32,8 +32,8 @@
<script setup lang="ts">
import {onMounted, computed, inject, ref, provide, onActivated} from "vue";
import {useI18n} from "vue-i18n";
import {useStore} from "vuex";
import {usePluginsStore} from "../../../stores/plugins";
import {useFlowStore} from "../../../stores/flow";
import * as YAML_UTILS from "@kestra-io/ui-libs/flow-yaml-utils";
@@ -52,7 +52,6 @@
provide(SCHEMA_PATH_INJECTION_KEY, computed(() => pluginsStore.schemaType?.flow.$ref ?? ""));
const {t} = useI18n();
const store = useStore();
const emits = defineEmits([
"save",
@@ -95,8 +94,9 @@
document.addEventListener("keydown", saveEvent);
const flowStore = useFlowStore();
const creatingFlow = computed(() => {
return store.state.flow.isCreating;
return flowStore.isCreating;
});
const creatingTask = inject(CREATING_TASK_INJECTION_KEY);

View File

@@ -11,7 +11,6 @@
<script setup lang="ts">
import {ref, watch, computed, inject, nextTick} from "vue";
import {useStore} from "vuex";
import {SECTIONS} from "@kestra-io/ui-libs";
import * as YAML_UTILS from "@kestra-io/ui-libs/flow-yaml-utils";
import {PLUGIN_DEFAULTS_SECTION, SECTIONS_MAP} from "../../../utils/constants";
@@ -24,11 +23,9 @@
} from "../injectionKeys";
import TaskEditor from "../../../components/flows/TaskEditor.vue";
import ValidationError from "../../../components/flows/ValidationError.vue";
import {useFlowStore} from "../../../stores/flow";
const emits = defineEmits(["updateTask", "exitTask", "updateDocumentation"]);
const store = useStore();
const flow = inject(FLOW_INJECTION_KEY, ref(""));
const parentPath = inject(PARENT_PATH_INJECTION_KEY, "");
const refPath = inject(REF_PATH_INJECTION_KEY, undefined);
@@ -86,13 +83,14 @@
section.value === "triggers" ? SECTIONS.TRIGGERS : SECTIONS.TASKS
)
const flowStore = useFlowStore();
const validateTask = (task?: string) => {
if(section.value !== PLUGIN_DEFAULTS_SECTION && task){
clearTimeout(timer.value);
timer.value = setTimeout(() => {
if (lastValidatedValue.value !== task) {
lastValidatedValue.value = task;
store.dispatch("flow/validateTask", {
flowStore.validateTask({
task,
section: validationSection.value
});
@@ -104,7 +102,8 @@
const timer = ref<number>();
const lastValidatedValue = ref<string>();
const errors = computed(() => store.state.flow.taskError);
const errors = computed(() => flowStore.taskError?.split(/, ?/));
const saveTask = () => {
let result: string = flow.value;

View File

@@ -14,11 +14,11 @@
/>
</section>
<Sections :key :dashboard :charts :show-default="dashboard.id === 'default'" :padding="padding" />
<Sections ref="dashboardComponent" :dashboard :charts :show-default="dashboard.id === 'default'" :padding="padding" />
</template>
<script setup lang="ts">
import {computed, onBeforeMount, ref, watch} from "vue";
import {computed, onBeforeMount, ref, useTemplateRef} from "vue";
import type {Dashboard, Chart} from "./composables/useDashboards";
import {ALLOWED_CREATION_ROUTES, getDashboard, processFlowYaml} from "./composables/useDashboards";
@@ -43,8 +43,6 @@
import YAML_FLOW from "./assets/default_flow_definition.yaml?raw";
import YAML_NAMESPACE from "./assets/default_namespace_definition.yaml?raw";
import UTILS from "../../utils/utils.js";
import {useRoute, useRouter} from "vue-router";
const route = useRoute();
const router = useRouter();
@@ -65,21 +63,18 @@
const dashboard = ref<Dashboard>({id: "", charts: []});
const charts = ref<Chart[]>([]);
// We use a key to force re-rendering of the Sections component
let key = ref(UTILS.uid());
const loadCharts = async (allCharts: Chart[] = []) => {
charts.value = [];
for (const chart of allCharts) {
charts.value.push({...chart, content: stringify(chart)});
}
refreshCharts()
};
const dashboardComponent = useTemplateRef("dashboardComponent");
const refreshCharts = () => {
key.value = UTILS.uid();
dashboardComponent.value!.refreshCharts();
};
const load = async (id = "default", defaultYAML = YAML_MAIN) => {
@@ -104,8 +99,6 @@
if (props.isFlow && ID === "default") load("default", processFlowYaml(YAML_FLOW, route.params.namespace as string, route.params.id as string));
else if (props.isNamespace && ID === "default") load("default", YAML_NAMESPACE);
});
watch(route, async (_) => refreshCharts());
</script>
<style scoped lang="scss">

View File

@@ -92,6 +92,20 @@ export function defaultConfig(override, theme) {
);
}
export function extractState(value) {
if (!value || typeof value !== "string") return value;
if (value.includes(",")) {
const stateNames = State.arrayAllStates().map(state => state.name);
const matchedState = value.split(",")
.map(part => part.trim())
.find(part => stateNames.includes(part.toUpperCase()));
return matchedState || value;
}
return value;
}
export function chartClick(moment, router, route, event, parsedData, elements, type = "label") {
const query = {};
@@ -107,7 +121,7 @@ export function chartClick(moment, router, route, event, parsedData, elements, t
state = parsedData.labels[element.index];
}
if (state) {
query.state = state;
query.state = extractState(state);
query.scope = "USER";
query.size = 100;
query.page = 1;
@@ -137,7 +151,7 @@ export function chartClick(moment, router, route, event, parsedData, elements, t
}
if (event.state) {
query.state = event.state;
query.state = extractState(event.state);
}
if (route.query.namespace) {

View File

@@ -11,12 +11,12 @@
</template>
<script lang="ts" setup>
import {PropType, computed} from "vue";
import {PropType, computed, watch} from "vue";
import moment from "moment";
import {Bar} from "vue-chartjs";
import NoData from "../../layout/NoData.vue";
import type {Chart} from "../composables/useDashboards";
import {Chart, getDashboard} from "../composables/useDashboards";
import {useChartGenerator} from "../composables/useDashboards";
@@ -159,7 +159,19 @@
return {labels, datasets};
});
const {data: generated} = useChartGenerator(props);
const {data: generated, generate} = useChartGenerator(props);
function refresh() {
return generate(getDashboard(route, "id")!);
}
defineExpose({
refresh
});
watch(() => route.params.filters, () => {
refresh();
}, {deep: true});
</script>
<style lang="scss" scoped>
@@ -182,4 +194,4 @@
min-height: var(--chart-height);
max-height: var(--chart-height);
}
</style>
</style>

View File

@@ -10,12 +10,13 @@
</template>
<script setup lang="ts">
import {PropType} from "vue";
import {PropType, watch} from "vue";
import type {Chart} from "../composables/useDashboards";
import {Chart, getDashboard} from "../composables/useDashboards";
import {getChartTitle, getPropertyValue, useChartGenerator} from "../composables/useDashboards";
import NoData from "../../layout/NoData.vue";
import {useRoute} from "vue-router";
const props = defineProps({
chart: {type: Object as PropType<Chart>, required: true},
@@ -23,7 +24,21 @@
showDefault: {type: Boolean, default: false},
});
const {percentageShown, EMPTY_TEXT, data} = useChartGenerator(props);
const route = useRoute();
const {percentageShown, EMPTY_TEXT, data, generate} = useChartGenerator(props);
function refresh() {
return generate(getDashboard(route, "id")!);
}
defineExpose({
refresh
});
watch(() => route.params.filters, () => {
refresh();
}, {deep: true});
</script>
<style scoped lang="scss">

View File

@@ -7,7 +7,7 @@
</template>
<script setup lang="ts">
import {PropType, onMounted, watch, ref} from "vue";
import {PropType, watch, ref} from "vue";
import type {RouteLocation} from "vue-router";
@@ -34,9 +34,17 @@
else data.value = props.chart.content ?? props.chart.source?.content;
};
const dashboardID = (route: RouteLocation) => getDashboard(route, "id") || "default"
const dashboardID = (route: RouteLocation) => getDashboard(route, "id")!;
watch(route, async (changed) => await getData(dashboardID(changed)));
function refresh() {
return getData(dashboardID(route));
}
onMounted(async () => await getData(dashboardID(route)));
defineExpose({
refresh
});
watch(() => route.params.filters, () => {
refresh();
}, {deep: true, immediate: true});
</script>

View File

@@ -22,9 +22,9 @@
</template>
<script lang="ts" setup>
import {computed,PropType} from "vue";
import {computed, PropType, watch} from "vue";
import type {Chart} from "../composables/useDashboards";
import {Chart, getDashboard} from "../composables/useDashboards";
import {useChartGenerator} from "../composables/useDashboards";
@@ -183,7 +183,19 @@
};
});
const {data: generated} = useChartGenerator(props);
const {data: generated, generate} = useChartGenerator(props);
function refresh() {
return generate(getDashboard(route, "id")!);
}
defineExpose({
refresh
});
watch(() => route.params.filters, () => {
refresh();
}, {deep: true});
</script>
<style lang="scss" scoped>
@@ -192,4 +204,4 @@
.chart {
max-height: $height;
}
</style>
</style>

View File

@@ -56,6 +56,7 @@
<div class="flex-grow-1">
<component
ref="chartsComponents"
:is="TYPES[chart.type as keyof typeof TYPES]"
:chart
:filters
@@ -89,6 +90,18 @@
import Download from "vue-material-design-icons/Download.vue";
import Pencil from "vue-material-design-icons/Pencil.vue";
const chartsComponents = ref<{refresh(): void}[]>();
function refreshCharts() {
chartsComponents.value!.forEach((component) => {
component.refresh();
});
}
defineExpose({
refreshCharts
});
const props = defineProps<{
dashboard: Dashboard;
charts?: Chart[];

View File

@@ -34,7 +34,7 @@
</template>
<script lang="ts" setup>
import {PropType, onMounted, watch, ref, computed} from "vue";
import {PropType, watch, ref, computed} from "vue";
import type {RouteLocation} from "vue-router";
@@ -116,16 +116,24 @@
const dashboardID = (route: RouteLocation) => getDashboard(route, "id") as string;
const handlePageChange = async (options: { page: number; size: number }) => {
const handlePageChange = (options: { page: number; size: number }) => {
if (pageNumber.value === options.page && pageSize.value === options.size) return;
pageNumber.value = options.page;
pageSize.value = options.size;
getData(dashboardID(route));
return getData(dashboardID(route));
};
watch(route, async (changed) => getData(dashboardID(changed)));
function refresh() {
return getData(dashboardID(route));
}
onMounted(async () => getData(dashboardID(route)));
defineExpose({
refresh
});
watch(() => route.params.filters, () => {
refresh();
}, {deep: true, immediate: true});
</script>

View File

@@ -12,13 +12,13 @@
</template>
<script lang="ts" setup>
import {PropType, computed} from "vue";
import {PropType, computed, watch} from "vue";
import NoData from "../../layout/NoData.vue";
import {Bar} from "vue-chartjs";
import type {Chart} from "../composables/useDashboards";
import {Chart, getDashboard} from "../composables/useDashboards";
import {useChartGenerator} from "../composables/useDashboards";
@@ -264,7 +264,19 @@
: yDatasetData,
};
});
const {data: generated} = useChartGenerator(props);
const {data: generated, generate} = useChartGenerator(props);
function refresh() {
return generate(getDashboard(route, "id")!);
}
defineExpose({
refresh
});
watch(() => route.params.filters, () => {
refresh();
}, {deep: true});
</script>
<style lang="scss" scoped>
@@ -278,4 +290,4 @@
min-height: var(--chart-height);
max-height: var(--chart-height);
}
</style>
</style>

View File

@@ -8,7 +8,7 @@
}"
>
<template #message>
{{ $t('demos.audit-logs.message') }}
{{ t('demos.audit-logs.message') }}
</template>
</Layout>
</template>
@@ -31,6 +31,11 @@
}
});
defineOptions({
name: "AuditLogsDemo",
inheritAttrs: false,
});
const routeInfo = ref({
title: t("demos.audit-logs.title"),
});

View File

@@ -0,0 +1,102 @@
<template>
<Empty v-if="!loading && !getElements().length" type="dependencies" />
<el-splitter v-else class="dependencies">
<el-splitter-panel id="graph" v-bind="PANEL">
<div v-loading="loading" ref="container" />
<div class="controls">
<el-button size="small" :title="t('dependency.controls.zoom_in')" @click="handlers.zoomIn">
<Plus />
</el-button>
<el-button size="small" :title="t('dependency.controls.zoom_out')" @click="handlers.zoomOut">
<Minus />
</el-button>
<el-button size="small" :title="t('dependency.controls.clear_selection')" @click="handlers.clearSelection">
<SelectionRemove />
</el-button>
<el-button size="small" :title="t('dependency.controls.fit_view')" @click="handlers.fit">
<FitToScreenOutline />
</el-button>
</div>
</el-splitter-panel>
<el-splitter-panel id="table">
<Table :elements="getElements()" @select="selectNode" :selected="selectedNodeID" />
</el-splitter-panel>
</el-splitter>
</template>
<script setup lang="ts">
import {ref} from "vue";
import Table from "./components/Table.vue";
import Empty from "../layout/empty/Empty.vue";
import {useDependencies} from "./composables/useDependencies";
import {FLOW, EXECUTION} from "./utils/types";
const PANEL = {size: "70%", min: "30%", max: "80%"};
import {useRoute} from "vue-router";
const route = useRoute();
import {useI18n} from "vue-i18n";
const {t} = useI18n({useScope: "global"});
import Plus from "vue-material-design-icons/Plus.vue";
import Minus from "vue-material-design-icons/Minus.vue";
import SelectionRemove from "vue-material-design-icons/SelectionRemove.vue";
import FitToScreenOutline from "vue-material-design-icons/FitToScreenOutline.vue";
const SUBTYPE = route.name === "flows/update" ? FLOW : EXECUTION;
const container = ref(null);
const initialNodeID: string = SUBTYPE === FLOW ? String(route.params.id) : String(route.params.flowId);
const TESTING = false; // When true, bypasses API data fetching and uses mock/test data.
const {getElements, loading, selectedNodeID, selectNode, handlers} = useDependencies(container, SUBTYPE, initialNodeID, route.params, TESTING);
</script>
<style scoped lang="scss">
.dependencies {
display: flex;
width: 100%;
height: calc(100vh - 135px);
& div#graph {
position: relative; // for absolute positioning of controls
& > div:not(.controls) {
height: 100%;
overflow: hidden scroll;
background-color: transparent;
background-image: radial-gradient(circle, var(--ks-dots-topology) 1px, transparent 1px);
background-repeat: repeat;
background-size: 24px 24px;
}
& .controls {
position: absolute;
bottom: 10px;
left: 10px;
display: flex;
flex-direction: column;
justify-content: flex-end;
gap: 0.25rem;
& > button {
width: 2rem;
height: 2rem;
margin: 0;
}
}
}
& div#table {
display: flex;
flex-direction: column;
height: 100%;
}
}
</style>

View File

@@ -0,0 +1,29 @@
<template>
<RouterLink v-if="to" :to>
<code class="link">{{ props.node.flow }}</code>
</RouterLink>
<code v-else class="link">{{ props.node.flow }}</code>
</template>
<script setup lang="ts">
import {computed} from "vue";
import {FLOW, EXECUTION, type Node} from "../utils/types";
const props = defineProps<{ node: Node, subtype: typeof FLOW | typeof EXECUTION}>();
const to = computed(() => {
const base = {namespace: props.node.namespace};
return {name: "flows/update", params: {...base, id: props.node.flow}};
});
</script>
<style scoped lang="scss">
code.link {
display: block;
max-width: 100%;
font-size: var(--font-size-sm);
color: var(--ks-content-id);
}
</style>

View File

@@ -0,0 +1,164 @@
<template>
<section id="input">
<el-input
v-model="search"
:placeholder="t('dependency.search.placeholder')"
clearable
/>
</section>
<el-table
:data="results"
:empty-text="t('dependency.search.no_results', {term: search})"
:show-header="false"
class="nodes"
@row-click="(row: { data: Node }) => emits('select', row.data.id)"
:row-class-name="({row}: { row: { data: Node } }) => row.data.id === props.selected ? 'selected' : ''"
>
<el-table-column>
<template #default="{row}">
<section id="row">
<section id="left">
<div id="link">
<Link :node="row.data" :subtype="row.data.metadata.subtype" />
</div>
<p class="description">
{{ row.data.namespace }}
</p>
</section>
<section id="right">
<Status
v-if="row.data.metadata.subtype === EXECUTION && row.data.metadata.state"
:status="row.data.metadata.state"
size="small"
/>
</section>
</section>
</template>
</el-table-column>
</el-table>
</template>
<script setup lang="ts">
import {watch, nextTick, ref, computed} from "vue";
import type cytoscape from "cytoscape";
import Link from "./Link.vue";
import Status from "../../Status.vue";
import {useI18n} from "vue-i18n";
const {t} = useI18n({useScope: "global"});
import {NODE, EXECUTION, type Node} from "../utils/types";
const emits = defineEmits<{ (e: "select", id: Node["id"]): void }>();
const props = defineProps<{
elements: cytoscape.ElementDefinition[];
selected: Node["id"] | undefined;
}>();
const focusSelectedRow = ()=>{
const row = document.querySelector<HTMLElement>(".el-table__row.selected");
if (!row) return;
row.scrollIntoView({behavior: "smooth", block: "center"});
}
watch(() => props.selected, async (ID) => {
if (!ID) return;
await nextTick();
focusSelectedRow();
});
const search = ref("");
const results = computed(() => {
const f = search.value.trim().toLowerCase();
const NODES = props.elements.filter(({data}) => data.type === NODE);
if (!f) return NODES;
return NODES.filter(({data}) => {
const {flow, namespace} = data;
return (flow?.toLowerCase().includes(f) || namespace?.toLowerCase().includes(f));
});
});
</script>
<style scoped lang="scss">
section#input {
position: sticky;
top: 0;
z-index: 10; // keeps it above table rows
padding: 0.5rem;
background-color: var(--ks-background-input);
:deep(.el-input__wrapper) {
box-shadow: none !important;
font-size: var(--font-size-sm);
}
}
.el-table.nodes {
outline: none;
border-radius: 0;
border-top: 1px solid var(--ks-border-primary);
:deep(.el-table__empty-text) {
width: 100%;
font-size: var(--font-size-sm);
}
& :deep(.el-table__row.selected) {
background-color: var(--ks-tag-background);
&:hover {
--el-table-row-hover-bg-color: var(--ks-tag-background-hover);
}
}
}
section#row {
display: flex;
justify-content: space-between;
align-items: center;
max-width: 100%;
padding: 0.75rem 0 0.75rem 0.75rem;
font-size: var(--font-size-xs);
cursor: pointer;
& section#left {
display: flex;
flex-direction: column;
flex: 1;
min-width: 0;
& * {
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
}
& > div#link {
width: fit-content;
}
& p.description {
margin: 0;
color: var(--ks-content-primary);
}
}
& section#right {
flex-shrink: 0;
margin-left: 0.5rem;
}
}
</style>

View File

@@ -0,0 +1,396 @@
import {onMounted, onBeforeUnmount, nextTick, watch, ref} from "vue";
import {useCoreStore} from "../../../stores/core";
import {useFlowStore} from "../../../stores/flow";
import {useExecutionsStore} from "../../../stores/executions";
import {useI18n} from "vue-i18n";
import type {Ref} from "vue";
import type {RouteParams} from "vue-router";
import {v4 as uuid} from "uuid";
import cytoscape from "cytoscape";
import {State, cssVariable} from "@kestra-io/ui-libs";
import {NODE, EDGE, FLOW, EXECUTION, type Node, type Edge, type Element} from "../utils/types";
import {getRandomNumber, getDependencies} from "../../../../tests/fixtures/dependencies/getDependencies";
import {edgeColors, style} from "../utils/style";
const SELECTED = "selected", FADED = "faded", HOVERED = "hovered", EXECUTIONS = "executions";
const options: Omit<cytoscape.CytoscapeOptions, "container" | "elements"> & {elements?: Element[]} = {
minZoom: 0.1,
maxZoom: 2,
wheelSensitivity: 0.025,
};
/**
* Layout options for the COSE layout algorithm used in cytoscape.
*
* @see {@link https://js.cytoscape.org/#layouts/cose | COSE layout options documentation}
*/
const layout: cytoscape.CoseLayoutOptions = {
name: "cose",
// Physical forces
nodeRepulsion: 2_000_000,
edgeElasticity: 100,
idealEdgeLength: 250,
// Gravity settings
gravity: 0.05,
// Layout iterations & cooling
numIter: 10_000,
initialTemp: 200,
minTemp: 1,
// Spacing and padding
padding: 50,
componentSpacing: 150,
// Node sizing
nodeDimensionsIncludeLabels: true,
};
/**
* Sets the size of each node in the cytoscape instance
* based on the number of connected edges.
*
* The node size is calculated as: `baseSize + count * scale`,
* capped at `maxSize`.
*
* @param cy - The cytoscape core instance containing the graph.
* @param baseSize - The base size of each node. Default is 20.
* @param scale - The scale factor for each connected edge. Default is 2.
* @param maxSize - The maximum allowed size for a node. Default is 100.
*/
export function setNodeSizes(cy: cytoscape.Core, baseSize = 20, scale = 2, maxSize = 100): void {
cy.nodes().forEach((node) => {
const count = node.connectedEdges().length;
let size = baseSize + count * scale;
if (size > maxSize) size = maxSize;
node.style({width: size, height: size});
});
}
/**
* Retrieves the execution state color for a given cytoscape node or a provided state string.
*
* - If a `state` is provided, it will be used directly.
* - If not, it attempts to read the state from the node's `metadata`.
* - Falls back to a default color if no state is available.
*
* @param node - Optional cytoscape node to extract the state from.
* @param state - Optional direct state string.
* @returns The color associated with the execution state, or a fallback if missing.
*/
function getStateColor(node?: cytoscape.NodeSingular, state?: string): string {
const resolved = state ?? node?.data("metadata")?.state;
return resolved ? State.getStateColor(resolved) : cssVariable("--ks-dependencies-node-background")!;
}
/**
* Applies execution state colors to specified nodes in the cytoscape graph.
*
* - Removes all custom classes from nodes and edges.
* - Sets each nodes background and border color based on its execution state.
*
* @param cy - The cytoscape core instance managing the graph.
* @param nodes - Optional array of nodes to apply colors to. If not provided, all nodes are used.
*/
function setExecutionNodeColors(cy: cytoscape.Core, nodes?: cytoscape.NodeSingular[]): void {
// Remove all existing custom classes from the graph
clearClasses(cy, EXECUTION);
// Apply state-based colors to provided nodes or all nodes
(nodes ?? cy.nodes()).forEach((node) => {
node.style({
"background-color": getStateColor(node),
"border-color": getStateColor(node)
});
});
}
/**
* Applies the given color to specified edges in the cytoscape graph.
*
* - Removes the `FADED` class and adds the `EXECUTIONS` class to each edge.
* - Sets the edges line and arrow colors using the provided color.
*
* @param edges - Array of edges to apply colors to.
* @param color - The color to apply to edges.
*/
function setExecutionEdgeColors(edges: cytoscape.EdgeCollection, color: string): void {
edges.forEach((edge) => {
edge.removeClass(FADED).addClass(EXECUTIONS).style({"line-color": color, "target-arrow-color": color});
});
}
/**
* Removes the specified CSS classes from all elements (nodes and edges) in the cytoscape instance.
*
* If the subtype is "EXECUTION", it also reapplies the default edge styling.
*
* This function is typically used to clear selection, hover, and execution-related classes
* before applying new styles or resetting the graph state.
*
* @param cy - The cytoscape core instance containing the graph elements.
* @param subtype - The dependency subtype, either "FLOW" or "EXECUTION".
* Edge styles are only reset when subtype is "EXECUTION".
* @param classes - An array of class names to remove from all elements.
* Defaults to ["selected", "faded", "hovered", "executions"].
*/
export function clearClasses(cy: cytoscape.Core, subtype: typeof FLOW | typeof EXECUTION, classes: string[] = [SELECTED, FADED, HOVERED, EXECUTIONS]): void {
cy.elements().removeClass(classes.join(" "));
if (subtype === EXECUTION) cy.edges().style(edgeColors());
}
/**
* Fits the cytoscape viewport to include all elements, with default or specified padding.
*
* @param cy - The cytoscape core instance containing the graph.
* @param padding - The number of pixels to pad around the elements (default: 50).
*/
export function fit(cy: cytoscape.Core, padding: number = 50): void {
cy.fit(undefined, padding);
}
/**
* Handles selecting a node in the cytoscape graph.
*
* - Removes all existing "selected", "faded", "hovered" and "executions" states from nodes and edges.
* - Marks the chosen node as selected.
* - Applies a faded style to connected elements based on the subtype:
* - FLOW: Fades both connected edges and neighbor nodes.
* - EXECUTION: Highlights connected edges with execution color, fades neighbor nodes.
* - Updates the provided Vue ref with the selected nodes ID.
* - Smoothly centers and zooms the viewport on the selected node.
*
* @param cy - The cytoscape core instance managing the graph.
* @param node - The node element to select.
* @param selected - Vue ref storing the currently selected node ID.
* @param subtype - Determines how connected elements are highlighted ("FLOW" or "EXECUTION").
* @param id - Optional explicit ID to assign to the ref (defaults to the nodes own ID).
*/
function selectHandler(cy: cytoscape.Core, node: cytoscape.NodeSingular, selected: Ref<Node["id"] | undefined>, subtype: typeof FLOW | typeof EXECUTION, id?: Node["id"]): void {
// Remove all "selected", "faded", "hovered" and "executions" classes from every element
clearClasses(cy, subtype);
// Mark the chosen node as selected
node.addClass(SELECTED);
if (subtype === FLOW) {
// FLOW: Fade both connected edges and neighbor nodes
node.connectedEdges().union(node.connectedEdges().connectedNodes()).addClass(FADED);
} else {
// EXECUTION: Highlight connected edges with execution color
setExecutionEdgeColors(node.connectedEdges(), getStateColor(node));
}
// Update the Vue ref with the selected nodes ID
selected.value = id ?? node.id();
// Center and zoom the viewport on the selected node
cy.animate({center: {eles: node}, zoom: 1.2}, {duration: 500});
}
/**
* Sets up hover handlers for nodes and edges.
*
* @param cy - The cytoscape core instance containing the graph.
*/
function hoverHandler(cy: cytoscape.Core): void {
["node", "edge"].forEach((type) => {
cy.on("mouseover", type, (event: cytoscape.EventObject) => event.target.addClass(HOVERED));
cy.on("mouseout", type, (event: cytoscape.EventObject) => event.target.removeClass(HOVERED));
});
}
/**
* Initializes and manages a cytoscape instance within a Vue component.
*
* @param container - Vue ref pointing to the DOM element that hosts the cytoscape graph.
* @param subtype - Dependency subtype, either `"FLOW"` or `"EXECUTION"`. Defaults to `"FLOW"`.
* @param initialNodeID - Optional ID of the node to preselect after layout completes.
* @param params - Vue Router params, expected to include `id` and `namespace`.
* @param isTesting - When true, bypasses API data fetching and uses mock/test data.
* @returns An object with element getters, loading state, selected node ID,
* selection helpers, and control handlers.
*/
export function useDependencies(container: Ref<HTMLElement | null>, subtype: typeof FLOW | typeof EXECUTION = FLOW, initialNodeID: string, params: RouteParams, isTesting = false) {
const coreStore = useCoreStore();
const flowStore = useFlowStore();
const executionsStore = useExecutionsStore();
const {t} = useI18n({useScope: "global"});
let cy: cytoscape.Core;
const loading = ref(true);
const selectedNodeID: Ref<Node["id"] | undefined> = ref(undefined);
/**
* Selects a node in the cytoscape graph by its ID.
*
* @param id - The ID of the node to select.
*/
const selectNode = (id: Node["id"]): void => {
if (!cy) return;
const node = cy.getElementById(id);
if (node.nonempty()) {
selectHandler(cy, node, selectedNodeID, subtype, id);
}
};
let elements: { data: cytoscape.ElementDefinition[]; count: number } = {data: [], count: 0};
onMounted(async () => {
if (!container.value) return;
if(isTesting) elements = {data: getDependencies({subtype}), count: getRandomNumber(1, 100)};
else elements = await flowStore.loadDependencies({id: (subtype === FLOW ? params.id : params.flowId) as string, namespace: params.namespace as string, subtype});
if(subtype === EXECUTION) nextTick(() => openSSE());
cy = cytoscape({container: container.value, layout, ...options, style, elements: elements.data});
// Hide nodes immediately after initialization to avoid visual flickering or rearrangement during layout setup
cy.ready(() => cy.nodes().style("display", "none"));
// Dynamically size nodes based on connectivity
setNodeSizes(cy);
// Apply execution state colors to each node
if(subtype === EXECUTION) setExecutionNodeColors(cy);
// Setup hover handlers for nodes and edges
hoverHandler(cy);
// Animate dashed selected edges
let dashOffset = 0;
function animateEdges(): void {
dashOffset -= 0.25;
cy.edges(`.${FADED}, .${EXECUTIONS}`).style("line-dash-offset", dashOffset);
requestAnimationFrame(animateEdges);
}
animateEdges();
// Node tap handler using selectHandler
cy.on("tap", "node", (event: cytoscape.EventObject) => {
const node = event.target;
selectHandler(cy, node, selectedNodeID, subtype);
});
cy.on("layoutstop", () => {
loading.value = false;
// Reveal nodes after layout rendering completes
cy.nodes().style("display", "element");
// Preselect the proper node after layout rendering completes
const node = isTesting ? cy.nodes()[0] : cy.nodes().filter((n) => n.data("flow") === initialNodeID);
if (node) selectHandler(cy, node, selectedNodeID, subtype);
});
});
const sse = ref();
const messages = ref<Record<string, any>[]>([]);
watch(messages, (newMessages) => {
if (newMessages.length <= 0) return;
newMessages.forEach((message: Record<string, any>) => {
const matched = cy.nodes().filter((element) => element.data("id") === `${message.tenantId}_${message.namespace}_${message.flowId}`);
if (matched.nonempty()) {
matched.forEach((node: cytoscape.NodeSingular) => {
const state = message.state.current;
node.data({...node.data(), metadata: {...node.data("metadata"), state}});
nextTick(() => {}) // Needed to ensure that table nodes are updated after the DOM is ready
setExecutionNodeColors(cy, node.toArray());
setExecutionEdgeColors(node.connectedEdges(), getStateColor(undefined, state));
});
}
});
},
{deep: true},
);
const openSSE = () => {
if (subtype !== EXECUTION) return;
closeSSE();
sse.value = executionsStore.followExecutionDependencies({id: params.id as string, expandAll: true});
sse.value.onmessage = (event: MessageEvent) => {
const isEnd = event && event.lastEventId === "end-all";
if (isEnd) closeSSE();
const message = JSON.parse(event.data);
if (!message.state) return;
messages.value.push(message);
};
sse.value.onerror = () => {
coreStore.message = {variant: "error", title: t("error"), message: t("something_went_wrong.loading_execution")};
};
};
const closeSSE = () => {
if (!sse.value) return;
sse.value.close();
sse.value = undefined;
};
onBeforeUnmount(() => {
if (subtype === EXECUTION) closeSSE();
});
return {
getElements: () => elements.data,
loading,
selectedNodeID,
selectNode,
handlers: {
zoomIn: () => cy.zoom({level: cy.zoom() + 0.1, renderedPosition: cy.getElementById(selectedNodeID.value!).renderedPosition()}),
zoomOut: () => cy.zoom({level: cy.zoom() - 0.1, renderedPosition: cy.getElementById(selectedNodeID.value!).renderedPosition()}),
clearSelection: () => {
clearClasses(cy, subtype);
selectedNodeID.value = undefined;
fit(cy);
},
fit: () => fit(cy)
}
};
}
/**
* Transforms an API response containing nodes and edges into
* Cytoscape-compatible elements with the given subtype.
*
* @param response - The API response object containing `nodes` and `edges` arrays.
* @param subtype - The node subtype, either `"FLOW"` or `"EXECUTION"`.
* @returns An array of cytoscape elements with correctly typed nodes and edges.
*/
export function transformResponse(response: { nodes: { uid: string; namespace: string; id: string; }[]; edges: { source: string; target: string }[] }, subtype: typeof FLOW | typeof EXECUTION): Element[] {
const nodes: Node[] = response.nodes.map((node) => ({id: node.uid, type: NODE, flow: node.id, namespace: node.namespace, metadata: subtype === FLOW ? {subtype: FLOW} : {subtype: EXECUTION}}));
const edges: Edge[] = response.edges.map((edge) => ({id: uuid(), type: EDGE, source: edge.source, target: edge.target}));
return [...nodes.map((node) => ({data: node} as Element)), ...edges.map((edge) => ({data: edge} as Element))];
}

View File

@@ -0,0 +1,100 @@
import type cytoscape from "cytoscape";
import {cssVariable} from "@kestra-io/ui-libs";
const VARIABLES = {
node: {
default: {
background: "--ks-dependencies-node-background",
border: "--ks-dependencies-node-border",
},
faded: {
background: "--ks-dependencies-node-background-selected-level2",
border: "--ks-dependencies-node-border-selected-level2",
},
selected: {
background: "--ks-dependencies-node-background-selected",
border: "--ks-dependencies-node-border-selected",
},
hovered: {
background: "--ks-dependencies-node-background-hover",
border: "--ks-dependencies-node-border-hover",
},
},
edge: {
default: "--ks-dependencies-node-border",
faded: "--ks-dependencies-edge-selected-level2",
hovered: "--ks-dependencies-edge-hover",
},
};
const nodeBase: cytoscape.Css.Node = {
"label": "data(flow)",
"border-width": 2,
"border-style": "solid",
"color": cssVariable("--ks-content-primary"),
"font-size": 10,
"text-valign": "bottom",
"text-margin-y": 10,
};
const edgeBase: cytoscape.Css.Edge = {
"target-arrow-shape": "triangle",
"curve-style": "bezier",
"width": 2,
"line-style": "solid",
};
const edgeAnimated: cytoscape.Css.Edge = {
"line-style": "dashed",
"line-dash-pattern": [3, 5]
};
function nodeColors(type: keyof typeof VARIABLES.node = "default"): Partial<cytoscape.Css.Node> {
return {
"background-color": cssVariable(VARIABLES.node[type].background)!,
"border-color": cssVariable(VARIABLES.node[type].border)!,
};
}
export function edgeColors(type: keyof typeof VARIABLES.edge = "default"): Partial<cytoscape.Css.Edge> {
return {
"line-color": cssVariable(VARIABLES.edge[type])!,
"target-arrow-color": cssVariable(VARIABLES.edge[type])!,
};
}
export const style: cytoscape.StylesheetJson = [
{
selector: "node",
style: {...nodeBase, ...nodeColors("default")},
},
{
selector: "node.faded",
style: {...nodeBase, ...nodeColors("faded"), "background-opacity": 0.75, "border-opacity": 0.75},
},
{
selector: "node.selected",
style: {...nodeBase, ...nodeColors("selected")},
},
{
selector: "node.hovered",
style: {...nodeBase, ...nodeColors("hovered")},
},
{
selector: "edge",
style: {...edgeBase, ...edgeColors("default"), width: 1},
},
{
selector: "edge.faded",
style: {...edgeBase, ...edgeColors("faded"), ...edgeAnimated},
},
{
selector: "edge.hovered",
style: {...edgeBase, ...edgeColors("hovered")},
},
{
selector: "edge.executions",
style: {...edgeBase, ...edgeAnimated},
},
];

View File

@@ -0,0 +1,31 @@
export const NODE = "NODE" as const;
export const EDGE = "EDGE" as const;
export const FLOW = "FLOW" as const;
export const EXECUTION = "EXECUTION" as const;
type Flow = {
subtype: typeof FLOW;
};
type Execution = {
subtype: typeof EXECUTION;
state?: string;
};
export type Node = {
id: string;
type: "NODE";
flow: string;
namespace: string;
metadata: Flow | Execution;
};
export type Edge = {
id: string;
type: "EDGE";
source: string;
target: string;
};
export type Element = { data: Node } | { data: Edge };

View File

@@ -1,352 +0,0 @@
<template>
<el-card shadow="never" v-loading="isLoading">
<VueFlow
:default-marker-color="cssVariable('--bs-cyan')"
:fit-view-on-init="true"
:nodes-connectable="false"
:nodes-draggable="false"
:elevate-nodes-on-select="false"
>
<Background />
<template #node-flow="props">
<BasicNode
v-bind="props"
:title="props.data.flowId"
:state="props.data.state"
:icon-component="iconVNode"
@expand-dependencies="expand"
@mouseover="onMouseOver"
@mouseleave="onMouseLeave"
@open-link="openFlow"
/>
</template>
<Panel position="top-left">
<el-switch
v-model="expandAll"
:disabled="expandAll"
:active-text="t('expand all')"
@change="load(route.params)"
/>
</Panel>
<Controls :show-interactive="false" />
</VueFlow>
</el-card>
</template>
<script setup>
import {ref, onMounted, inject, nextTick, onBeforeUnmount, watch, h, computed} from "vue";
import {useRoute, useRouter} from "vue-router";
import {
VueFlow,
Panel,
useVueFlow,
Position,
MarkerType,
} from "@vue-flow/core";
import {Controls} from "@vue-flow/controls";
import {Background} from "@vue-flow/background";
import dagre from "dagre";
import {cssVariable} from "@kestra-io/ui-libs";
import BasicNode from "@kestra-io/ui-libs/src/components/nodes/BasicNode.vue";
import TaskIcon from "@kestra-io/ui-libs/src/components/misc/TaskIcon.vue";
const icon = computed(() => {
const GRAY = "#2f3342";
return window.btoa(`
<svg xmlns="http://www.w3.org/2000/svg" width="24" height="25" viewBox="0 0 24 25" fill="none">
<path fill-rule="evenodd" clip-rule="evenodd"
d="M4.34546 9.63757C4.74074 10.5277 5.31782 11.3221 6.03835 11.9681L7.03434 10.8209C6.4739 10.3185 6.02504 9.70059 5.71758 9.00824C5.41012 8.3159 5.25111 7.56496 5.25111 6.80532C5.25111 6.04568 5.41012 5.29475 5.71758 4.6024C6.02504 3.91006 6.4739 3.29216 7.03434 2.78977L6.03835 1.64258C5.31782 2.28851 4.74074 3.08293 4.34546 3.97307C3.95019 4.86321 3.74575 5.82867 3.74575 6.80532C3.74575 7.78197 3.95019 8.74744 4.34546 9.63757ZM16.955 4.38931C17.4802 3.97411 18.1261 3.74777 18.7913 3.74576C19.5894 3.74576 20.3547 4.06807 20.919 4.64177C21.4833 5.21548 21.8004 5.9936 21.8004 6.80494C21.8004 7.61628 21.4833 8.3944 20.919 8.96811C20.3547 9.54181 19.5894 9.86412 18.7913 9.86412C18.2559 9.86126 17.7312 9.71144 17.2725 9.43048L12.3325 14.4529L11.2688 13.3715L16.2088 8.34906C16.0668 8.10583 15.9592 7.84348 15.8891 7.56973H11.2688V6.04014H15.8891C16.055 5.38511 16.4298 4.80451 16.955 4.38931ZM17.9555 8.07674C18.2029 8.24482 18.4938 8.33453 18.7913 8.33453C19.1902 8.33412 19.5727 8.17284 19.8548 7.88607C20.1368 7.59931 20.2955 7.21049 20.2959 6.80494C20.2959 6.50241 20.2076 6.20668 20.0423 5.95514C19.877 5.70361 19.642 5.50756 19.3671 5.39178C19.0922 5.27601 18.7897 5.24572 18.4978 5.30474C18.206 5.36376 17.9379 5.50944 17.7275 5.72336C17.5171 5.93727 17.3738 6.20982 17.3157 6.50653C17.2577 6.80324 17.2875 7.11079 17.4014 7.39029C17.5152 7.66978 17.7081 7.90867 17.9555 8.07674ZM3.74621 15.2177V16.7473H7.19606L2.2417 21.7842L3.30539 22.8656L8.25975 17.8287V21.336H9.76427V15.2177H3.74621ZM15.7823 18.2769H12.7733V19.8064H15.7823V22.1008H21.8004V15.9825H15.7823V18.2769ZM17.2868 20.5712V17.5121H20.2959V20.5712H17.2868ZM8.02885 9.67292C7.62863 9.31407 7.30809 8.87275 7.08853 8.37827C6.86897 7.88378 6.75542 7.34747 6.75542 6.80494C6.75542 6.26241 6.86897 5.72609 7.08853 5.23161C7.30809 4.73713 7.62863 4.29581 8.02885 3.93696L9.02484 5.08415C8.78458 5.29946 8.59215 5.5643 8.46034 5.86106C8.32853 6.15782 8.26035 6.47971 8.26035 6.80532C8.26035 7.13094 8.32853 7.45282 8.46034 7.74958C8.59215 8.04634 8.78458 8.31118 9.02484 8.52649L8.02885 9.67292Z"
fill="${GRAY}" />
</svg>
`);
});
const iconVNode = h(TaskIcon, {customIcon: {icon: icon.value}});
import {apiUrl} from "override/utils/route";
import {linkedElements} from "../../utils/vueFlow";
import {useCoreStore} from "../../stores/core";
import {useExecutionsStore} from "../../stores/executions";
import {useStore} from "vuex";
const store = useStore();
import {useI18n} from "vue-i18n";
const {t} = useI18n({useScope: "global"});
const {
id,
addNodes,
addEdges,
getNodes,
updateNode,
removeNodes,
getEdges,
removeEdges,
fitView,
addSelectedElements,
removeSelectedNodes,
removeSelectedEdges,
} = useVueFlow();
const route = useRoute();
const coreStore = useCoreStore();
const executionsStore = useExecutionsStore();
const axios = inject("axios");
const router = useRouter();
const loaded = ref([]);
const dependencies = ref({
nodes: [],
edges: [],
});
const expanded = ref([]);
const isLoading = ref(false);
const initialLoad = ref(true);
const stateColor = (state) => {
switch (state) {
case "RUNNING":
return "primary";
case "SUCCESS":
return "success";
case "WARNING":
return "warning";
case "FAILED":
return "danger";
default:
return "yellow";
}
};
let sse = ref();
const messages = ref([]);
watch(
messages,
(newMessages) => {
if (newMessages.length <= 0) return;
newMessages.forEach((message) => {
const currentNode = getNodes.value.find(
(n) =>
n.data.flowId === message.flowId &&
n.data.namespace === message.namespace,
);
if (!currentNode) return;
updateNode(currentNode.id, {
...currentNode,
data: {
...currentNode.data,
state: message.state.current,
color: stateColor(message.state.current),
link: {
executionId: message.executionId,
namespace: message.namespace,
flowId: message.flowId,
},
},
});
});
},
{deep: true},
);
const openSSE = () => {
closeSSE();
sse.value = executionsStore.followExecutionDependencies({id: route.params.id, expandAll: expandAll.value})
sse.value.onmessage = (executionEvent) => {
const isEnd = executionEvent && executionEvent.lastEventId === "end-all";
if (isEnd) closeSSE();
const message = JSON.parse(executionEvent.data);
if (!message.state) return;
messages.value.push(message);
};
sse.value.onerror = () => {
coreStore.message = {
variant: "error",
title: t("error"),
message: t("something_went_wrong.loading_execution"),
};
};
};
const closeSSE = () => {
if (!sse.value) return;
sse.value.close();
sse.value = undefined;
};
const expandAll = ref(false);
const load = (options) => {
isLoading.value = true;
return axios
.get(
`${apiUrl(store)}/flows/${options.namespace}/${options.flowId}/dependencies${expandAll.value ? "?expandAll=true" : ""}`,
)
.then((response) => {
loaded.value.push(`${options.namespace}_${options.flowId}`);
if (Object.keys(response.data).length > 0) {
dependencies.value.nodes.push(...response.data.nodes);
dependencies.value.edges.push(...response.data.edges);
}
if (!initialLoad.value) {
let newNodes = new Set(response.data.nodes.map((n) => n.uid));
let oldNodes = new Set(getNodes.value.map((n) => n.id));
const loadedCount = [...newNodes].filter(
(node) => !oldNodes.has(node),
).length;
if (loadedCount > 0) {
coreStore.message = {
variant: "success",
title: t("dependencies loaded"),
message: t("loaded x dependencies", loadedCount),
};
}
}
removeEdges(getEdges.value);
removeNodes(getNodes.value);
initialLoad.value = false;
nextTick(() => {
generateGraph();
openSSE();
});
});
};
const expand = (data) => {
expanded.value.push(data.node.uid);
load({namespace: data.namespace, id: data.flowId});
};
const generateDagreGraph = () => {
const dagreGraph = new dagre.graphlib.Graph();
dagreGraph.setDefaultEdgeLabel(() => ({}));
dagreGraph.setGraph({rankdir: "LR"});
for (const node of dependencies.value.nodes) {
dagreGraph.setNode(node.uid, {
width: 184,
height: 44,
});
}
for (const edge of dependencies.value.edges) {
dagreGraph.setEdge(edge.source, edge.target);
}
dagre.layout(dagreGraph);
return dagreGraph;
};
const getNodePosition = (n) => {
return {x: n.x - n.width / 2, y: n.y - n.height / 2};
};
const generateGraph = () => {
const dagreGraph = generateDagreGraph();
for (const node of dependencies.value.nodes) {
const dagreNode = dagreGraph.node(node.uid);
addNodes([
{
id: node.uid,
type: "flow",
position: getNodePosition(dagreNode),
style: {
width: "184px",
height: "44px",
},
sourcePosition: Position.Right,
targetPosition: Position.Left,
data: {
node: node,
loaded: loaded.value.indexOf(node.uid) >= 0,
namespace: node.namespace,
flowId: node.id,
current:
node.namespace === route.params.namespace &&
node.id === route.params.flowId,
link: true,
expandEnabled: !expanded.value.includes(node.uid),
},
},
]);
}
for (const edge of dependencies.value.edges) {
// TODO: https://github.com/kestra-io/kestra/issues/5350
addEdges([
{
id: edge.source + "|" + edge.target,
source: edge.source,
target: edge.target,
markerEnd: {
id: "marker-custom",
type: MarkerType.ArrowClosed,
},
type: "smoothstep",
},
]);
}
fitView();
isLoading.value = false;
};
onMounted(() => {
load(route.params);
});
onBeforeUnmount(() => {
closeSSE();
});
const onMouseOver = (node) => {
addSelectedElements(linkedElements(id, node.uid));
};
const onMouseLeave = () => {
removeSelectedNodes(getNodes.value);
removeSelectedEdges(getEdges.value);
};
const openFlow = (data) => {
router.push({
name: "flows/update",
params: {
namespace: data.link.namespace,
id: data.link.flowId,
tenant: route.params.tenant,
},
});
};
</script>
<style lang="scss" scoped>
.el-card {
height: calc(100vh - 174px);
:deep(.el-card__body) {
height: 100%;
}
}
</style>

View File

@@ -29,10 +29,10 @@
import Tabs from "../../components/Tabs.vue";
import ExecutionRootTopBar from "./ExecutionRootTopBar.vue";
import DemoAuditLogs from "../demo/AuditLogs.vue";
import ExecutionDependencies from "./ExecutionDependencies.vue";
import Dependencies from "../dependencies/Dependencies.vue";
import {useExecutionsStore} from "../../stores/executions";
import {useFlowStore} from "../../stores/flow";
export default {
mixins: [RouteContext],
@@ -44,9 +44,10 @@
return {
sse: undefined,
previousExecutionId: undefined,
dependenciesCount: undefined
};
},
created() {
async created() {
if(!this.$route.params.tab) {
const tab = localStorage.getItem("executeDefaultTab") || undefined;
this.$router.replace({name: "executions/update", params: {...this.$route.params, tab}});
@@ -54,23 +55,19 @@
this.follow();
window.addEventListener("popstate", this.follow)
this.dependenciesCount = (await this.flowStore.loadDependencies({namespace: this.$route.params.namespace, id: this.$route.params.flowId})).count;
},
mounted() {
this.previousExecutionId = this.$route.params.id
},
watch: {
$route(newValue, oldValue) {
$route() {
this.executionsStore.taskRun = undefined;
if (oldValue.name === newValue.name && this.previousExecutionId !== this.$route.params.id) {
this.follow()
}
// if we change the execution id, we need to close the sse
if (this.executionsStore.execution && this.$route.params.id != this.executionsStore.execution.id) {
this.executionsStore.closeSSE();
window.removeEventListener("popstate", this.follow)
this.executionsStore.execution = undefined;
this.$store.commit("flow/setFlow", undefined);
this.$store.commit("flow/setFlowGraph", undefined);
if (this.previousExecutionId !== this.$route.params.id) {
this.flowStore.flow = undefined;
this.flowStore.flowGraph = undefined;
this.follow();
}
},
},
@@ -114,8 +111,10 @@
},
{
name: "dependencies",
component: ExecutionDependencies,
component: Dependencies,
title: this.$t("dependencies"),
count: this.dependenciesCount,
maximized: true,
props: {
isReadOnly: true,
},
@@ -132,7 +131,7 @@
},
computed: {
...mapState("auth", ["user"]),
...mapStores(useCoreStore, useExecutionsStore),
...mapStores(useCoreStore, useExecutionsStore, useFlowStore),
tabs() {
return this.getTabs();
},
@@ -203,8 +202,8 @@
this.executionsStore.closeSSE();
window.removeEventListener("popstate", this.follow)
this.executionsStore.execution = undefined;
this.$store.commit("flow/setFlow", undefined);
this.$store.commit("flow/setFlowGraph", undefined);
this.flowStore.flow = undefined;
this.flowStore.flowGraph = undefined;
}
};
</script>
@@ -212,4 +211,4 @@
.full-space {
flex: 1 1 auto;
}
</style>
</style>

View File

@@ -19,10 +19,10 @@
</li>
<li>
<trigger-flow
v-if="flow"
:disabled="flow.disabled || isReadOnly"
:flow-id="flow.id"
:namespace="flow.namespace"
v-if="flowStore.flow"
:disabled="flowStore.flow.disabled || isReadOnly"
:flow-id="flowStore.flow.id"
:namespace="flowStore.flow.namespace"
/>
</li>
</template>
@@ -58,7 +58,7 @@
</template>
<template v-if="showStatChart()" #top>
<Sections :dashboard="{id: 'default'}" :charts show-default />
<Sections ref="dashboardComponent" :dashboard="{id: 'default'}" :charts show-default />
</template>
<template #table>
@@ -480,6 +480,7 @@
import {filterLabels} from "./utils"
import {useExecutionsStore} from "../../stores/executions";
import {useFlowStore} from "../../stores/flow.ts";
export default {
mixins: [RouteContext, RestoreUrl, DataTableActions, SelectTableActions],
@@ -628,8 +629,7 @@
},
computed: {
...mapState("auth", ["user"]),
...mapState("flow", ["flow"]),
...mapStores(useMiscStore, useExecutionsStore),
...mapStores(useMiscStore, useExecutionsStore, useFlowStore),
routeInfo() {
return {
title: this.$t("executions")
@@ -668,7 +668,7 @@
return this.user && this.user.isAllowed(permission.EXECUTION, action.DELETE, this.namespace);
},
isAllowedEdit() {
return this.user.isAllowed(permission.FLOW, action.UPDATE, this.flow.namespace);
return this.user.isAllowed(permission.FLOW, action.UPDATE, this.flowStore.flow.namespace);
},
hasAnyExecute() {
return this.user.hasAnyActionOnAnyNamespace(permission.EXECUTION, action.CREATE);
@@ -771,6 +771,7 @@
},
refresh() {
this.recomputeInterval = !this.recomputeInterval;
this.$refs.dashboardComponent.refreshCharts();
this.load();
},
selectionMapper(execution) {
@@ -856,7 +857,7 @@
if (params) {
options = {...options, ...params}
}
const action = actionMap[queryAction]();
return action(options)
.then(r => {
@@ -869,7 +870,7 @@
if (params) {
options = {...options, ...params}
}
const action = actionMap[byIdAction]();
return action(options)
.then(r => {
@@ -1069,15 +1070,15 @@
editFlow() {
this.$router.push({
name: "flows/update", params: {
namespace: this.flow.namespace,
id: this.flow.id,
namespace: this.flowStore.flow.namespace,
id: this.flowStore.flow.id,
tab: "edit",
tenant: this.$route.params.tenant
}
})
},
emitStateCount() {
const runningCount = this.executionsStore.executions.filter(execution =>
const runningCount = this.executionsStore.executions.filter(execution =>
execution.state.current === State.RUNNING
)?.length;
const totalCount = this.executionsStore.total;

View File

@@ -1,5 +1,11 @@
<template>
<el-button size="small" type="primary" :icon="EyeOutline" @click="getFilePreview">
<el-button
size="small"
type="primary"
:icon="EyeOutline"
@click="getFilePreview"
:disabled="isZipFile"
>
{{ $t("preview") }}
</el-button>
<drawer
@@ -164,7 +170,11 @@
},
maxPreviewOptions() {
return [10, 25, 100, 500, 1000, 5000, 10000, 25000, 50000].filter(value => value <= this.configPreviewMaxRows())
}
},
isZipFile() {
// Checks if the file extension is .zip (case-insensitive)
return this.value?.toLowerCase().endsWith(".zip");
},
},
emits: ["preview"],
methods: {

View File

@@ -46,7 +46,7 @@
</el-form-item>
<el-form-item>
<el-button-group class="ks-b-group">
<restart :execution="executionsStore.execution" class="ms-0" @follow="forwardEvent('follow', $event)" />
<restart v-if="executionsStore.execution" :execution="executionsStore.execution" class="ms-0" @follow="forwardEvent('follow', $event)" />
<el-button @click="downloadContent()">
<kicon :tooltip="$t('download logs')">
<download />

View File

@@ -83,6 +83,7 @@
import action from "../../models/action";
import {State} from "@kestra-io/ui-libs"
import ExecutionUtils from "../../utils/executionUtils";
import {useFlowStore} from "../../stores/flow";
export default {
inheritAttrs: false,
@@ -129,14 +130,13 @@
methods: {
loadRevision() {
this.revisionsSelected = this.execution.flowRevision
this.$store
.dispatch("flow/loadRevisions", {
namespace: this.execution.namespace,
id: this.execution.flowId
})
this.flowStore.loadRevisions({
namespace: this.execution.namespace,
id: this.execution.flowId
})
},
restartLastRevision() {
this.revisionsSelected = this.revisions[this.revisions.length - 1].revision;
this.revisionsSelected = this.flowStore.revisions[this.flowStore.revisions.length - 1].revision;
this.restart();
},
restart() {
@@ -179,13 +179,12 @@
},
computed: {
...mapState("auth", ["user"]),
...mapState("flow", ["revisions"]),
...mapStores(useExecutionsStore),
...mapStores(useExecutionsStore, useFlowStore),
replayOrRestart() {
return this.isReplay ? "replay" : "restart";
},
revisionsOptions() {
return (this.revisions || [])
return (this.flowStore.revisions || [])
.map((revision) => {
return {
value: revision.revision,

View File

@@ -102,7 +102,8 @@
loadDefinition() {
this.executionsStore.loadFlowForExecution({
flowId: this.execution.flowId,
namespace: this.execution.namespace
namespace: this.execution.namespace,
store: true
});
},
},

View File

@@ -7,7 +7,7 @@
:flow-id="execution.flowId"
:namespace="execution.namespace"
:flow-graph="flowGraph"
:source="flow?.source"
:source="flowStore.flow?.source"
:execution="execution"
:expanded-subflows="expandedSubflows"
is-read-only
@@ -24,19 +24,18 @@
</template>
<script>
import throttle from "lodash/throttle";
import {mapState} from "vuex";
import {mapStores} from "pinia";
import {Utils, State} from "@kestra-io/ui-libs";
import LowCodeEditor from "../inputs/LowCodeEditor.vue";
import {useExecutionsStore} from "../../stores/executions";
import {useFlowStore} from "../../stores/flow";
export default {
emits: ["follow"],
components: {
LowCodeEditor
},
computed: {
...mapState("flow", ["flow"]),
...mapStores(useExecutionsStore),
...mapStores(useExecutionsStore, useFlowStore),
execution() {
return this.executionsStore.execution;
},

View File

@@ -55,7 +55,7 @@
<div class="right wrapper" :style="{width: 100 - leftWidth + '%', 'z-index': 999}">
<div
v-if="multipleSelected || selectedValue"
class="w-100 overflow-auto p-3"
class="w-100 overflow-auto p-3 content-container"
>
<div class="d-flex justify-content-between pe-none fs-5 values">
<code class="d-block">
@@ -463,6 +463,7 @@
display: flex;
width: 100%;
height: 100vh;
overflow: hidden;
.el-scrollbar.el-cascader-menu:nth-of-type(-n + 2) ul li:first-child,
.values {
@@ -539,3 +540,38 @@
}
}
</style>
<style lang="scss" scoped>
.content-container {
height: calc(100vh - 0px);
overflow-y: auto !important;
overflow-x: hidden;
word-wrap: break-word;
word-break: break-word;
}
:deep(.el-collapse) {
.el-collapse-item__wrap {
overflow-y: auto !important;
max-height: none !important;
}
.el-collapse-item__content {
overflow-y: auto !important;
word-wrap: break-word;
word-break: break-word;
}
}
:deep(.var-value) {
overflow-y: auto !important;
word-wrap: break-word;
word-break: break-word;
}
:deep(pre) {
white-space: pre-wrap !important;
word-wrap: break-word !important;
word-break: break-word !important;
overflow-wrap: break-word !important;
}
</style>

View File

@@ -1,13 +1,13 @@
<template>
<template v-if="flow.concurrency">
<template v-if="flowStore.flow.concurrency">
<div v-if="totalCount > 0 || !runningCountSet" :class="{'d-none': !runningCountSet}">
<el-card class="mb-3">
<div class="row mb-3">
<span class="col d-flex align-items-center">
<h5 class="m-3">RUNNING</h5> {{ runningCount }}/{{ flow.concurrency.limit }} {{ $t('active-slots') }}
<h5 class="m-3">RUNNING</h5> {{ runningCount }}/{{ flowStore.flow.concurrency.limit }} {{ $t('active-slots') }}
</span>
<span class="col d-flex justify-content-end align-items-center">
{{ $t('behavior') }}: <status class="mx-2" :status="flow.concurrency.behavior" size="small" />
{{ $t('behavior') }}: <status class="mx-2" :status="flowStore.flow.concurrency.behavior" size="small" />
</span>
</div>
<div class="progressbar mb-3">
@@ -18,8 +18,8 @@
<executions
:restore-url="false"
:topbar="false"
:namespace="flow.namespace"
:flow-id="flow.id"
:namespace="flowStore.flow.namespace"
:flow-id="flowStore.flow.id"
is-concurrency
:statuses="[State.QUEUED, State.RUNNING, State.PAUSED]"
@state-count="setRunningCount"
@@ -33,11 +33,12 @@
</template>
<script>
import {mapStores} from "pinia";
import Executions from "../executions/Executions.vue";
import Empty from "../layout/empty/Empty.vue";
import {mapState} from "vuex";
import {State} from "@kestra-io/ui-libs";
import Status from "../Status.vue";
import {useFlowStore} from "../../stores/flow";
export default {
inheritAttrs: false,
@@ -67,12 +68,12 @@
}
},
computed: {
...mapState("flow", ["flow"]),
...mapStores(useFlowStore),
State() {
return State
},
progress() {
return this.runningCount / this.flow.concurrency.limit * 100
return this.runningCount / this.flowStore.flow.concurrency.limit * 100
}
}
}

View File

@@ -1,12 +1,12 @@
<template>
<top-nav-bar :title="routeInfo.title" />
<section class="full-container">
<MultiPanelEditorView v-if="flow" />
<MultiPanelEditorView v-if="flowStore.flow" />
</section>
</template>
<script>
import {mapMutations, mapState} from "vuex";
import {mapState} from "vuex";
import {mapStores} from "pinia";
import * as YAML_UTILS from "@kestra-io/ui-libs/flow-yaml-utils";
import RouteContext from "../../mixins/routeContext";
@@ -17,6 +17,8 @@
import {useCoreStore} from "../../stores/core";
import {getRandomFlowID} from "../../../scripts/product/flow";
import {useEditorStore} from "../../stores/editor";
import {useFlowStore} from "../../stores/flow";
export default {
mixins: [RouteContext],
@@ -26,27 +28,25 @@
},
created() {
this.$store.commit("flow/setIsCreating", true);
this.flowStore.isCreating = true;
if (this.$route.query.reset) {
localStorage.setItem("tourDoneOrSkip", undefined);
this.coreStore.guidedProperties = {...this.coreStore.guidedProperties, tourStarted: true};
this.$tours["guidedTour"]?.start();
}
this.setupFlow()
this.closeAllTabs()
this.editorStore.closeAllTabs()
},
beforeUnmount() {
this.$store.commit("flow/setFlowValidation", undefined);
this.flowStore.flowValidation = undefined;
},
methods: {
...mapMutations("editor", ["closeAllTabs"]),
async setupFlow() {
const blueprintId = this.$route.query.blueprintId;
const blueprintSource = this.$route.query.blueprintSource;
let flowYaml = ""
if (this.$route.query.copy && this.flow){
flowYaml = this.flow.source;
if (this.$route.query.copy && this.flowStore.flow){
flowYaml = this.flowStore.flow.source;
} else if (blueprintId && blueprintSource) {
flowYaml = await this.blueprintsStore.getBlueprintSource({type: blueprintSource, kind: "flow", id: blueprintId});
} else {
@@ -61,17 +61,16 @@ tasks:
message: Hello World! 🚀`;
}
this.$store.commit("flow/setFlowYaml", flowYaml);
this.$store.commit("flow/setFlowYamlBeforeAdd", flowYaml);
this.flowStore.flowYaml = flowYaml;
this.flowStore.flowYamlBeforeAdd = flowYaml;
this.$store.commit("flow/setFlow", {...YAML_UTILS.parse(this.flowYaml), source: this.flowYaml});
this.$store.dispatch("flow/initYamlSource", {});
this.flowStore.flow = {...YAML_UTILS.parse(this.flowYaml), source: this.flowStore.flowYaml};
this.flowStore.initYamlSource();
}
},
computed: {
...mapState("flow", ["flowGraph", "flowYaml", "flow", "flowValidation", "flowYaml"]),
...mapState("auth", ["user"]),
...mapStores(useBlueprintsStore, useCoreStore),
...mapStores(useBlueprintsStore, useCoreStore, useEditorStore, useFlowStore),
routeInfo() {
return {
title: this.$t("flows")
@@ -82,7 +81,7 @@ tasks:
}
},
beforeRouteLeave(to, from, next) {
this.$store.commit("flow/setFlow", null);
this.flowStore.flow = undefined;
next();
}
};

View File

@@ -1,212 +0,0 @@
<template>
<el-card shadow="never" v-loading="isLoading">
<VueFlow
:default-marker-color="cssVariable('--bs-cyan')"
:fit-view-on-init="true"
:nodes-connectable="false"
:nodes-draggable="false"
:elevate-nodes-on-select="false"
>
<Background />
<template #node-flow="props">
<DependenciesNode
v-bind="props"
@expand-dependencies="expand"
@mouseover="onMouseOver"
@mouseleave="onMouseLeave"
@open-link="openFlow"
/>
</template>
<Panel position="top-left">
<el-switch
v-model="expandAll"
:disabled="expandAll"
:active-text="t('expand all')"
@change="load(route.params)"
/>
</Panel>
<Controls :show-interactive="false" />
</VueFlow>
</el-card>
</template>
<script setup>
import {ref, onMounted, inject, nextTick, getCurrentInstance} from "vue";
import {useRoute, useRouter} from "vue-router";
import {VueFlow, Panel, useVueFlow, Position, MarkerType} from "@vue-flow/core"
import {Controls} from "@vue-flow/controls"
import {Background} from "@vue-flow/background";
import dagre from "dagre"
import {cssVariable} from "@kestra-io/ui-libs";
import {DependenciesNode} from "@kestra-io/ui-libs"
import {linkedElements} from "../../utils/vueFlow"
import {useStore} from "vuex";
import {useCoreStore} from "../../stores/core";
import {apiUrl} from "override/utils/route";
const {id, addNodes, addEdges, getNodes, removeNodes, getEdges, removeEdges, fitView, addSelectedElements, removeSelectedNodes, removeSelectedEdges} = useVueFlow();
const route = useRoute();
const store = useStore();
const coreStore = useCoreStore();
const axios = inject("axios")
const router = useRouter();
const t = getCurrentInstance().appContext.config.globalProperties.$t;
const loaded = ref([]);
const dependencies = ref({
nodes: [],
edges: []
});
const expanded = ref([]);
const isLoading = ref(false);
const initialLoad = ref(true);
const expandAll = ref(false);
const load = (options) => {
isLoading.value = true;
return axios
.get(`${apiUrl(store)}/flows/${options.namespace}/${options.id}/dependencies${expandAll.value ? "?expandAll=true" : ""}`)
.then(response => {
loaded.value.push(`${options.namespace}_${options.id}`)
if (Object.keys(response.data).length > 0) {
dependencies.value.nodes.push(...response.data.nodes)
dependencies.value.edges.push(...response.data.edges)
}
if (!initialLoad.value) {
let newNodes = new Set(response.data.nodes.map(n => n.uid))
let oldNodes = new Set(getNodes.value.map(n => n.id))
coreStore.message = {
variant: "success",
title: t("dependencies loaded"),
message: t("loaded x dependencies", [...newNodes].filter(node => !oldNodes.has(node)).length),
}
}
removeEdges(getEdges.value)
removeNodes(getNodes.value)
initialLoad.value = false
nextTick(() => {
generateGraph();
})
})
};
const expand = (data) => {
expanded.value.push(data.node.uid)
load({namespace: data.namespace, id: data.flowId})
};
const generateDagreGraph = () => {
const dagreGraph = new dagre.graphlib.Graph()
dagreGraph.setDefaultEdgeLabel(() => ({}))
dagreGraph.setGraph({rankdir: "LR"})
for (const node of dependencies.value.nodes) {
dagreGraph.setNode(node.uid, {
width: 184 ,
height: 44
})
}
for (const edge of dependencies.value.edges) {
dagreGraph.setEdge(edge.source, edge.target)
}
dagre.layout(dagreGraph)
return dagreGraph;
}
const getNodePosition = (n) => {
return {x: n.x - n.width / 2, y: n.y - n.height / 2};
};
const generateGraph = () => {
const dagreGraph = generateDagreGraph();
for (const node of dependencies.value.nodes) {
const dagreNode = dagreGraph.node(node.uid);
addNodes([{
id: node.uid,
type: "flow",
position: getNodePosition(dagreNode),
style: {
width: "184px",
height: "44px",
},
sourcePosition: Position.Right,
targetPosition: Position.Left,
data: {
node: node,
loaded: loaded.value.indexOf(node.uid) >= 0,
namespace: node.namespace,
flowId: node.id,
current: node.namespace === route.params.namespace && node.id === route.params.id,
color: "pink",
link: true,
expandEnabled: !expanded.value.includes(node.uid)
}
}]);
}
for (const edge of dependencies.value.edges) {
// TODO: https://github.com/kestra-io/kestra/issues/5350
addEdges([{
id: edge.source + "|" + edge.target,
source: edge.source,
target: edge.target,
markerEnd: {
id: "marker-custom",
type: MarkerType.ArrowClosed,
},
type: "smoothstep"
}]);
}
fitView();
isLoading.value = false;
};
onMounted(() => {
load(route.params)
})
const onMouseOver = (node) => {
addSelectedElements(linkedElements(id, node.uid));
}
const onMouseLeave = () => {
removeSelectedNodes(getNodes.value);
removeSelectedEdges(getEdges.value);
}
const openFlow = (data) => {
router.push({
name: "flows/update",
params: {
"namespace": data.namespace,
"id": data.flowId,
tenant: route.params.tenant
},
});
}
</script>
<style lang="scss" scoped>
.el-card {
height: calc(100vh - 174px);
:deep(.el-card__body) {
height: 100%;
}
}
</style>

View File

@@ -9,7 +9,7 @@
</li>
<li>
<router-link v-if="flow && canCreate" :to="{name: 'flows/create', query: {copy: true}}">
<router-link v-if="flowStore.flow && canCreate" :to="{name: 'flows/create', query: {copy: true}}">
<el-button :icon="icon.ContentCopy" size="large">
{{ $t('copy') }}
</el-button>
@@ -17,7 +17,7 @@
</li>
<li>
<trigger-flow v-if="flow && canExecute" :disabled="flow.disabled" :flow-id="flow.id" type="default" :namespace="flow.namespace" />
<trigger-flow v-if="flowStore.flow && canExecute" :disabled="flowStore.flow.disabled" :flow-id="flowStore.flow.id" type="default" :namespace="flowStore.flow.namespace" />
</li>
<li>
@@ -34,16 +34,16 @@
</template>
<script>
import flowTemplateEdit from "../../mixins/flowTemplateEdit";
import {mapState} from "vuex";
import {shallowRef} from "vue";
import {mapStores} from "pinia";
import {useCoreStore} from "../../stores/core";
import TriggerFlow from "./TriggerFlow.vue"
import ContentCopy from "vue-material-design-icons/ContentCopy.vue";
import ContentSave from "vue-material-design-icons/ContentSave.vue";
import Delete from "vue-material-design-icons/Delete.vue";
import {shallowRef} from "vue";
import {useCoreStore} from "../../stores/core";
import flowTemplateEdit from "../../mixins/flowTemplateEdit";
import TriggerFlow from "./TriggerFlow.vue"
import TopNavBar from "../layout/TopNavBar.vue"
import {useFlowStore} from "../../stores/flow";
export default {
components: {
@@ -63,8 +63,7 @@
};
},
computed: {
...mapState("flow", ["flow", "total"]),
...mapStores(useCoreStore),
...mapStores(useCoreStore, useFlowStore),
},
methods: {
stopTour() {
@@ -79,7 +78,7 @@
setTimeout(() => {
if (!this.guidedProperties.tourStarted
&& localStorage.getItem("tourDoneOrSkip") !== "true"
&& this.total === 0) {
&& this.flowStore.total === 0) {
this.$tours["guidedTour"]?.start();
}
}, 200)

View File

@@ -1,13 +1,13 @@
<template>
<multi-panel-editor-view
v-if="flow"
v-if="flowStore.flow"
/>
</template>
<script setup>
import {onBeforeUnmount, computed} from "vue"
import {useStore} from "vuex";
import {onBeforeUnmount} from "vue"
import MultiPanelEditorView from "./MultiPanelEditorView.vue";
import {useFlowStore} from "../../stores/flow";
defineEmits([
"expand-subflow"
@@ -32,10 +32,9 @@
}
})
const store = useStore();
const flow = computed(() => store.state.flow.flow);
const flowStore = useFlowStore();
onBeforeUnmount(() => {
store.commit("flow/setFlowValidation", undefined);
flowStore.flowValidation = undefined;
})
</script>

View File

@@ -1,10 +1,11 @@
<template>
<executions :restore-url="false" filter :topbar="false" :namespace="flow.namespace" :flow-id="flow.id" />
<executions :restore-url="false" filter :topbar="false" :namespace="flowStore.flow?.namespace" :flow-id="flowStore.flow?.id" />
</template>
<script>
import {mapStores} from "pinia";
import Executions from "../executions/Executions.vue";
import {mapState} from "vuex";
import {useFlowStore} from "../../stores/flow";
export default {
inheritAttrs: false,
@@ -12,7 +13,7 @@
Executions,
},
computed: {
...mapState("flow", ["flow"]),
...mapStores(useFlowStore)
}
};
</script>

View File

@@ -20,7 +20,7 @@
:popper-class="
tooltipContent === '' ? 'd-none' : 'tooltip-stats'
"
v-if="aggregatedMetric"
v-if="flowStore.aggregatedMetrics"
>
<template #content>
<span v-html="tooltipContent" />
@@ -29,7 +29,7 @@
ref="chartRef"
:data="chartData"
:options="options"
v-if="aggregatedMetric"
v-if="flowStore.aggregatedMetrics"
/>
</el-tooltip>
<span v-else>
@@ -46,16 +46,17 @@
</script>
<script lang="ts">
import {defineComponent} from "vue";
import {Bar} from "vue-chartjs";
import {mapState} from "vuex";
import {mapStores} from "pinia";
import {useMiscStore} from "../../stores/misc";
import {useFlowStore} from "../../stores/flow";
import moment from "moment";
import {defaultConfig, getFormat, tooltip} from "../dashboard/composables/charts.js";
import {defaultConfig, getFormat, tooltip} from "../dashboard/composables/charts";
import {cssVariable} from "@kestra-io/ui-libs";
import KestraFilter from "../filter/KestraFilter.vue";
export default {
export default defineComponent({
name: "FlowMetrics",
components: {
Bar,
@@ -65,13 +66,7 @@
this.loadMetrics();
},
computed: {
...mapState("flow", [
"flow",
"metrics",
"aggregatedMetric",
"tasksWithMetrics",
]),
...mapStores(useMiscStore),
...mapStores(useMiscStore, useFlowStore),
xGrid() {
return this.miscStore.theme === "light"
? {}
@@ -90,9 +85,9 @@
},
chartData() {
return {
labels: this.aggregatedMetric.aggregations.map((e) =>
labels: this.flowStore.aggregatedMetrics.aggregations.map((e) =>
moment(e.date).format(
getFormat(this.aggregatedMetric.groupBy),
getFormat(this.flowStore.aggregatedMetrics.groupBy),
),
),
datasets: [
@@ -103,7 +98,7 @@
backgroundColor:
cssVariable("--el-color-success"),
borderRadius: 4,
data: this.aggregatedMetric.aggregations.map(
data: this.flowStore.aggregatedMetrics.aggregations.map(
(e) => (e.value ? e.value : 0),
),
},
@@ -171,52 +166,43 @@
};
},
methods: {
onDateFilterTypeChange(event) {
this.canAutoRefresh = event;
},
loadQuery(base) {
return {
...base
};
},
loadMetrics() {
this.$store.dispatch("flow/loadTasksWithMetrics", {
this.flowStore.loadTasksWithMetrics({
...this.$route.params,
});
this.$store
.dispatch(
this.$route.query.task
? "flow/loadTaskMetrics"
: "flow/loadFlowMetrics",
this.loadQuery({
...this.$route.params,
taskId: this.$route.query.task,
}),
)
.then(() => {
if (this.metrics.length > 0) {
if (
this.$route.query.metric &&
!this.metrics.includes(this.$route.query.metric)
) {
let query = {...this.$route.query};
delete query.metric;
this.flowStore[this.$route.query.task ? "loadTaskMetrics" : "loadFlowMetrics"](
this.loadQuery({
...this.$route.params,
taskId: this.$route.query.task,
}),
).then(() => {
if ((this.flowStore.metrics?.length ?? -1) > 0) {
if (
this.$route.query.metric &&
!this.flowStore.metrics?.includes(this.$route.query.metric)
) {
let query = {...this.$route.query};
delete query.metric;
this.$router
.push({query: query})
.then((_) => this.loadAggregatedMetrics());
} else {
this.loadAggregatedMetrics();
}
this.$router
.push({query: query})
.then(() => this.loadAggregatedMetrics());
} else {
this.loadAggregatedMetrics();
}
});
}
});
},
loadAggregatedMetrics() {
this.isLoading = true;
if (this.display) {
this.$store.dispatch(
`flow/load${this.$route.query?.task ? "Task" : "Flow"}AggregatedMetrics`,
this.flowStore[this.$route.query?.task ? "loadTaskAggregatedMetrics" : "loadFlowAggregatedMetrics"](
this.loadQuery({
...this.$route.params,
...this.$route.query,
@@ -228,7 +214,7 @@
}),
);
} else {
this.$store.commit("flow/setAggregatedMetric", undefined);
this.flowStore.aggregatedMetrics = undefined;
}
this.isLoading = false;
},
@@ -263,7 +249,7 @@
},
},
},
};
});
</script>
<style>

Some files were not shown because too many files have changed in this diff Show More