Compare commits

...

101 Commits

Author SHA1 Message Date
YannC.
602ff849e3 fix: clean translation 2025-08-26 17:49:05 +02:00
github-actions[bot]
155bdca83f chore(version): update to version '0.24.3' 2025-08-26 13:10:59 +00:00
github-actions[bot]
faaaeada3a chore(core): localize to languages other than english (#10904)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-08-26 12:27:13 +02:00
Piyush Bhaskar
6ef35974d7 fix(core): do not overflow the version selection on release notes (#10903) 2025-08-26 13:28:03 +05:30
YannC
46f9bb768f feat: add action to merge release note between OSS and EE (#10882) 2025-08-26 09:42:47 +02:00
Piyush Bhaskar
ab87f63e8c fix(ui): bring better small chart and tooltip. (#10839) 2025-08-26 13:06:04 +05:30
YannC
cdb73ccbd7 fix: allow to enforce editor view when list is unreadable, also truncate too long column (#10885) 2025-08-26 09:12:04 +02:00
brian.mulier
8fc936e0a3 fix(logs): emitAsync is now keeping messages order 2025-08-25 16:54:45 +02:00
brian.mulier
1e0ebc94b8 fix(logs): higher max message length to keep stacktraces in a single log 2025-08-25 16:54:45 +02:00
brian.mulier
5318592eff chore(deps): bump Micronaut platform to 4.9.2
closes #10626
closes #10788
2025-08-25 16:54:45 +02:00
Piyush Bhaskar
2da08f160d fix(core): show the logs for the task from topology graph. (#10890) 2025-08-25 18:48:46 +05:30
Roman Acevedo
8cbc9e7aff ci: backport recent docker semver rework
backport changes from develop maede in https://github.com/kestra-io/kestra/pull/10848
2025-08-22 15:26:32 +02:00
brian-mulier-p
f8e15d103f fix(kv): Set task should convert numbers to string if kvType == STRING (#10836) 2025-08-21 09:33:43 +02:00
Loïc Mathieu
49794a4f2a fix(system): properly close the ScheduledExecutorService tasks
This avoids having running threads while the component is supposed to be closed.
2025-08-20 15:57:43 +02:00
Nicolas K.
bafa5fe03c fix(test): disable kafka concurrency queue test (#10755)
Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
2025-08-19 18:12:26 +02:00
nKwiatkowski
208b244f0f chore: update version to 0.24.2 2025-08-19 15:15:36 +02:00
Barthélémy Ledoux
b93976091d fix(core): avoid triggering hundreds of reactivity updates for each icon (#10766) 2025-08-19 14:07:55 +02:00
brian.mulier
eec52d76f0 fix(namespaces): namespace files content was not sent to the flow namespace
part of #7499
2025-08-19 12:18:12 +02:00
brian.mulier
b96fd87572 fix(core): change cache policy on files returned by webserver that needs to stay fresh
closes #7499
2025-08-19 11:55:08 +02:00
brian.mulier
1aa5bfab43 fix(namespaces): properly send editor content upon creating / updating ns file
part of #7499
2025-08-19 11:54:58 +02:00
Roman Acevedo
c4572e86a5 fix(tests): filter out ExecutionKind.TEST from FlowTriggers
- fixes Silence flow trigger on TEST-kind executions kestra-ee#4689
2025-08-19 11:08:06 +02:00
Florian Hussonnois
f2f97bb70c fix(core): fix preconditions rendering for ExecutionOutputs (#10651)
Ensure that preconditions are always re-rendered for any
new executions

Changes:
* add new fluent skipCache methods on RunContextProperty and Property
  classes

Fix: #10651
2025-08-18 21:01:05 +02:00
Roman Acevedo
804c740d3c fix(tests): namespace binding was breaking filtering in Flow page
fixes https://github.com/kestra-io/kestra-ee/issues/4691

the additional namespace binding in Tabs was added in PR https://github.com/kestra-io/kestra/pull/10543 to solve the special case of Namespace creation
2025-08-18 15:05:26 +02:00
Loïc Mathieu
75cd4f44e0 fix(execution): parallel flowable may not ends all child flowable
Parallel flowable tasks like `Parallel`, `Dag` and `ForEach` are racy. When a task fail in a branch, other concurrent branches that have flowable may never ends.
We make sure that all children are terminated when a flowable is itself terminated.

Fixes #6780
2025-08-14 12:26:37 +02:00
YannC
f167a2a2bb fix: avoid file being displayed as diff in namespace file editor (#10746)
close #10744
2025-08-14 10:38:51 +02:00
Loïc Mathieu
08d9416e3a fix(execution): concurrency limit didn't work with afterExecutions
This is because the execution is never considered fully terminated so concurrency limit is not handled properly.
This should also affect SLA, trigger lock, and other cleaning stuff.

The root issue is that, with a worker task from afterExecution, there are no other update on the execution itself (as it's already terminated) so no execution messages are again processed by the executor.

Because of that, the worker task result message from the afterExecution block is the last message, but unfortunatly as messages from the worker task result have no flow attached, the computation of the final termination is incorrect.
The solution is to load the flow if null inside the executor and the execution is terminated which should only occurs inside afterExecution.

Fixes #10657
Fixes #8459
Fixes #8609
2025-08-13 09:32:40 +02:00
Prayag
2a879c617c fix(core): Enter key is now validating filter / refreshing data (#9630)
closes #9471

---------

Co-authored-by: brian.mulier <bmmulier@hotmail.fr>
2025-08-12 17:29:45 +02:00
Loïc Mathieu
3227ca7c11 fix(executions): SLA monitor should take into account restarted executions 2025-08-12 11:50:03 +02:00
Loïc Mathieu
428a52ce02 fix(executions): concurrency limit exceeded when restarting an execution
Fixes #7880
2025-08-12 11:49:40 +02:00
YannC.
f58bc4caba chore: update version to 0.23.11 2025-08-12 10:58:41 +02:00
Loïc Mathieu
e99ae9513f fix(executions): correctly fail the request when trying to resume an execution with the wrong inputs
Fixes #9959
2025-08-12 09:40:44 +02:00
Piyush Bhaskar
c8b51fcacf fix(core): reduce size of code block text and padding (#10689) 2025-08-12 11:47:41 +05:30
brian.mulier
813b2f6439 fix(dashboard): avoid duplicate dashboard calls + properly refresh dashboards on refresh button + don't discard component entirely on refresh 2025-08-11 22:29:15 +02:00
brian.mulier
c6b5bca25b fix(dashboard): properly use time filters in queries
closes kestra-io/kestra-ee#4389
2025-08-11 22:29:15 +02:00
brian.mulier
de35d2cdb9 tests(core): add a test to taskrunners to ensure it's working multiple times on the same working directory
part of kestra-io/plugin-ee-kubernetes#45
2025-08-11 15:06:21 +02:00
Loïc Mathieu
a6ffbd59d0 fix(executions): properly fail the task if it contains unsupported unicode sequence
This occurs in Postgres using the `\u0000` unicode sequence. Postgres refuse to store any JSONB with this sequence as it has no textual representation.
We now properly detect that and fail the task.

Fixes #10326
2025-08-11 11:54:16 +02:00
Piyush Bhaskar
568740a214 fix(flows): copy trigger url propely. (#10645) 2025-08-08 13:13:02 +05:30
Loïc Mathieu
aa0d2c545f fix(executions): allow caching tasks that use the 'workingDir' variable
Fixes #10253
2025-08-08 09:08:00 +02:00
brian.mulier
cda77d5146 fix(core): ensure props with defaults are not marked as required in generated doc 2025-08-07 15:10:16 +02:00
brian.mulier
d4fd1f61ba fix(core): wrong @NotNull import leading to key not being marked as required
closes #9287
2025-08-07 15:10:16 +02:00
github-actions[bot]
9859ea5eb6 chore(version): update to version '0.24.0' 2025-08-05 12:01:23 +00:00
Piyush Bhaskar
aca374a28f fix(flows): ensure plugin documentation change on flow switch (#10546)
Co-authored-by: Barthélémy Ledoux <bledoux@kestra.io>
2025-08-05 15:21:21 +05:30
Barthélémy Ledoux
c413ba95e1 fix(flows): add conditional rendering for restart button based on execution (#10570) 2025-08-05 10:22:53 +02:00
Barthélémy Ledoux
9c6b92619e fix: restore InputForm (#10568) 2025-08-05 09:45:10 +02:00
brian.mulier
8173e8df51 fix(namespaces): autocomplete in kv & secrets
related to kestra-io/kestra-ee#4559
2025-08-04 20:30:06 +02:00
brian.mulier
5c95505911 fix(executions): avoid SSE error in follow execution dependencies
closes #10560
2025-08-04 20:23:40 +02:00
Barthélémy Ledoux
33f0b533bb fix(flows)*: load flow for execution needs to be stored most of the time (#10566) 2025-08-04 18:55:57 +02:00
brian.mulier
23e35a7f97 chore(version): upgrade version to 0.24.0-rc2-SNAPSHOT 2025-08-04 16:19:44 +02:00
Abhilash T
0357321c58 fix: Updated InputsForm.vue to clear Radio Button Selection (#9654)
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
Co-authored-by: Bart Ledoux <bledoux@kestra.io>
2025-08-04 16:03:55 +02:00
Barthélémy Ledoux
5c08403398 fix(flows): no-code - when changing type message avoid warning (#10498) 2025-08-04 15:57:23 +02:00
Barthélémy Ledoux
a63cb71218 fix: remove debugging value from playground (#10541) 2025-08-04 15:57:05 +02:00
brian.mulier
317885b91c fix(executions): restore execution redirect & subflow logs view from parent
closes #10528
closes #10551
2025-08-04 15:47:49 +02:00
Piyush Bhaskar
87637302e4 chore(core): remove variable and directly assign. (#10554) 2025-08-04 18:51:14 +05:30
Piyush Bhaskar
056faaaf9f fix(core): proper state detection from parsed data (#10527) 2025-08-04 18:50:53 +05:30
Miloš Paunović
54c74a1328 chore(namespaces): add the needed prop for loading all namespaces inside a selector (#10544) 2025-08-04 12:45:06 +02:00
Miloš Paunović
fae0c88c5e fix(namespaces): amend problems with namespace secrets and kv pairs (#10543)
Closes https://github.com/kestra-io/kestra-ee/issues/4584.
2025-08-04 12:20:37 +02:00
YannC.
db5d83d1cb fix: add missing webhook releases secrets for github releases 2025-08-01 23:22:18 +02:00
brian.mulier
066b947762 fix(core): remove icon for inputs in no-code
closes #10520
2025-08-01 16:32:55 +02:00
Piyush Bhaskar
b6597475b1 fix(namespaces): fixes loading of additional ns (#10518) 2025-08-01 17:01:53 +05:30
brian.mulier
f2610baf15 fix(executions): avoid race condition leading to never-ending follow with non-terminal state 2025-08-01 13:12:59 +02:00
brian.mulier
b619bf76d8 fix(core): ensure instances can read all messages when no consumer group / queue type 2025-08-01 13:12:59 +02:00
Loïc Mathieu
117f453a77 feat(flows): warn on runnable only properties on non-runnable tasks
Closes #9967
Closes #10500
2025-08-01 12:53:24 +02:00
Piyush Bhaskar
053d6276ff fix(executions): do not rely on monaco to get value (#10515) 2025-08-01 13:26:25 +05:30
Barthélémy Ledoux
3870eca70b fix(flows): playground need to use ui-libs (#10506) 2025-08-01 09:06:51 +02:00
Piyush Bhaskar
afd7c216f9 fix(flows): route to flow page (#10514) 2025-08-01 12:13:08 +05:30
Piyush Bhaskar
59a17e88e7 fix(executions): properly handle methods and computed for tabs (#10513) 2025-08-01 12:12:54 +05:30
Piyush Bhaskar
99f8dca1c2 fix(editor): adjust padding for editor (#10497)
* fix(editor): adjust padding for editor

* fix: make padding 16px
2025-08-01 12:12:38 +05:30
YannC
1068c9fe51 fix: handle empty flows list in lastExecutions correctly (#10493) 2025-08-01 07:21:16 +02:00
YannC
ea6d30df7c fix(ui): load correctly filters + refresh dashboard on filter change (#10504) 2025-08-01 07:16:34 +02:00
Loïc Mathieu
04ba7363c2 fix(ci): workflow build artifact doesn't need the plugin version 2025-07-31 14:32:57 +02:00
Loïc Mathieu
281a987944 chore(version): upgrade version to 0.24.0-rc1-SNAPSHOT 2025-07-31 14:20:07 +02:00
github-actions[bot]
c9ce54b0be chore(core): localize to languages other than english (#10494)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-31 14:19:14 +02:00
github-actions[bot]
ccd9baef3c chore(core): localize to languages other than english (#10489)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-31 14:19:04 +02:00
Barthélémy Ledoux
97869b9c75 fix(flows): forget all old taskRunId when a new execution (#10487) 2025-07-31 14:17:14 +02:00
Barthélémy Ledoux
1c681c1492 fix(flows): wait longer for widgets to be rendered (#10485) 2025-07-31 14:17:06 +02:00
Barthélémy Ledoux
de2a446f93 fix(flows): load flows documentation when coming back to no-code root (#10374) 2025-07-31 14:17:00 +02:00
Barthélémy Ledoux
d778947017 fix(flows): add the load errors to the flow errors (#10483) 2025-07-31 14:16:47 +02:00
Barthélémy Ledoux
3f97845fdd fix(flows): hide executionkind meta in the logs (#10482) 2025-07-31 14:16:41 +02:00
Barthélémy Ledoux
631cd169a1 fix(executions): do not rely on monaco to get value (#10467) 2025-07-31 14:16:33 +02:00
Barthélémy Ledoux
1648fa076c fix(flows): playground - implement new designs (#10459)
Co-authored-by: brian.mulier <bmmulier@hotmail.fr>
2025-07-31 14:16:26 +02:00
Barthélémy Ledoux
474806882e fix(flows): playground align restart button button (#10415) 2025-07-31 14:16:17 +02:00
Barthélémy Ledoux
65467bd118 fix(flows): playground clear current execution when clearExecutions() (#10414) 2025-07-31 14:16:04 +02:00
YannC
387bbb80ac feat(ui): added http method autocompletion (#10492) 2025-07-31 13:29:22 +02:00
Loïc Mathieu
19d4c64f19 fix(executions): Don't create outputs from the Subflow task when we didn't wait
As, well, if we didn't wait for the subflow execution, we cannot have access to its outputs.
2025-07-31 13:07:26 +02:00
Loïc Mathieu
809c0a228c feat(system): improve performance of computeSchedulable
- Store flowIds in a list to avoid computing the multiple times
- Storeg triggers by ID in a map to avoid iterating the list of triggers for each flow
2025-07-31 12:34:30 +02:00
Piyush Bhaskar
6a045900fb fix(core): remove top spacing from no execution page and removing the redundant code (#10445)
Co-authored-by: Miloš Paunović <paun992@hotmail.com>
2025-07-31 13:28:08 +05:30
Piyush Bhaskar
4ada5fe8f3 fix(executions): make columns that are not links normal text (#10460)
* fix(executions): make it normal text

* fix(executions): use monospace font only
2025-07-31 13:27:45 +05:30
github-actions[bot]
998087ca30 chore(core): localize to languages other than english (#10471)
Extended localization support by adding translations for multiple languages using English as the base. This enhances accessibility and usability for non-English-speaking users while keeping English as the source reference.

Co-authored-by: GitHub Action <actions@github.com>
2025-07-31 08:25:16 +02:00
Malaydewangan09
146338e48f feat(plugins): add script plugins 2025-07-30 23:34:55 +05:30
brian.mulier
de177b925e chore(deps): hardcode vue override version 2025-07-30 19:26:31 +02:00
brian.mulier
04bfb19095 fix(core): avoid follow execution from being discarded too early
closes #10472
closes #7623
2025-07-30 19:26:31 +02:00
brian-mulier-p
c913c48785 fix(core): redesign playground run task button (#10423)
closes #10389
2025-07-30 15:27:49 +02:00
François Delbrayelle
0d5b593d42 fix(): fix icons 2025-07-30 14:55:33 +02:00
weibo1
83f92535c5 feat: Trigger Initialization Method Performance Optimization 2025-07-30 14:54:08 +02:00
Loïc Mathieu
fd6a0a6c11 fix(ci): bad SNAPSHOT repo URL 2025-07-30 12:57:28 +02:00
Loïc Mathieu
104c4c97b4 fix(ci): don't publish docker in build-artifact 2025-07-30 12:05:30 +02:00
Loïc Mathieu
21cd21269f fix(ci): add missing build artifact job 2025-07-30 11:50:26 +02:00
Loïc Mathieu
679befa2fe build(ci): allow downloading the exe from the workflow and not the release
This would allow running the workflow even if the release step fail
2025-07-30 11:24:21 +02:00
YannC
8a0ecdeb8a fix(dashboard): pageSize & pageNumber is now correctly pass when fetching a chart (#10413) 2025-07-30 08:45:51 +02:00
YannC.
ee8762e138 fix(ci): correctly pass GH token to release workflow 2025-07-29 15:04:18 +02:00
github-actions[bot]
d16324f265 chore(version): update to version 'v0.24.0-rc0-SNAPSHOT'. 2025-07-29 12:14:49 +00:00
157 changed files with 2705 additions and 1266 deletions

View File

@@ -2,7 +2,7 @@ name: Auto-Translate UI keys and create PR
on: on:
schedule: schedule:
- cron: "0 9-21 * * *" # Every hour from 9 AM to 9 PM - cron: "0 9-21/3 * * *" # Every 3 hours from 9 AM to 9 PM
workflow_dispatch: workflow_dispatch:
inputs: inputs:
retranslate_modified_keys: retranslate_modified_keys:
@@ -20,7 +20,7 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
timeout-minutes: 10 timeout-minutes: 10
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v5
name: Checkout name: Checkout
with: with:
fetch-depth: 0 fetch-depth: 0

View File

@@ -27,7 +27,7 @@ jobs:
steps: steps:
- name: Checkout repository - name: Checkout repository
uses: actions/checkout@v4 uses: actions/checkout@v5
with: with:
# We must fetch at least the immediate parents so that if this is # We must fetch at least the immediate parents so that if this is
# a pull request then we can checkout the head. # a pull request then we can checkout the head.

View File

@@ -1,147 +0,0 @@
name: Create Docker images on Release
on:
workflow_dispatch:
inputs:
retag-latest:
description: 'Retag latest Docker images'
required: true
type: string
default: "false"
options:
- "true"
- "false"
release-tag:
description: 'Kestra Release Tag'
required: false
type: string
plugin-version:
description: 'Plugin version'
required: false
type: string
default: "LATEST"
env:
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
jobs:
plugins:
name: List Plugins
runs-on: ubuntu-latest
outputs:
plugins: ${{ steps.plugins.outputs.plugins }}
steps:
# Checkout
- uses: actions/checkout@v4
# Get Plugins List
- name: Get Plugins List
uses: ./.github/actions/plugins-list
id: plugins
with:
plugin-version: ${{ env.PLUGIN_VERSION }}
docker:
name: Publish Docker
needs: [ plugins ]
runs-on: ubuntu-latest
strategy:
matrix:
image:
- name: "-no-plugins"
plugins: ""
packages: jattach
python-libs: ""
- name: ""
plugins: ${{needs.plugins.outputs.plugins}}
packages: python3 python-is-python3 python3-pip curl jattach
python-libs: kestra
steps:
- uses: actions/checkout@v4
# Vars
- name: Set image name
id: vars
run: |
if [[ "${{ inputs.release-tag }}" == "" ]]; then
TAG=${GITHUB_REF#refs/*/}
echo "tag=${TAG}" >> $GITHUB_OUTPUT
else
TAG="${{ inputs.release-tag }}"
echo "tag=${TAG}" >> $GITHUB_OUTPUT
fi
if [[ "${{ env.PLUGIN_VERSION }}" == *"-SNAPSHOT" ]]; then
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ ${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT;
else
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
fi
# Download release
- name: Download release
uses: robinraju/release-downloader@v1.12
with:
tag: ${{steps.vars.outputs.tag}}
fileName: 'kestra-*'
out-file-path: build/executable
- name: Copy exe to image
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
# Docker setup
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Docker - Fix Qemu
shell: bash
run: |
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
# Docker Login
- name: Login to DockerHub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_PASSWORD }}
# Docker Build and push
- name: Push to Docker Hub
uses: docker/build-push-action@v6
with:
context: .
push: true
tags: ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }}
platforms: linux/amd64,linux/arm64
build-args: |
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }}
APT_PACKAGES=${{ matrix.image.packages }}
PYTHON_LIBRARIES=${{ matrix.image.python-libs }}
- name: Install regctl
if: github.event.inputs.retag-latest == 'true'
uses: regclient/actions/regctl-installer@main
- name: Retag to latest
if: github.event.inputs.retag-latest == 'true'
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
end:
runs-on: ubuntu-latest
needs:
- docker
if: always()
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
steps:
# Slack
- name: Slack notification
uses: Gamesight/slack-workflow-status@master
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
name: GitHub Actions
icon_emoji: ':github-actions:'
channel: 'C02DQ1A7JLR' # _int_git channel

View File

@@ -19,7 +19,7 @@ on:
default: "no input" default: "no input"
jobs: jobs:
check: check:
timeout-minutes: 10 timeout-minutes: 15
runs-on: ubuntu-latest runs-on: ubuntu-latest
env: env:
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }} GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
@@ -32,7 +32,7 @@ jobs:
password: ${{ github.token }} password: ${{ github.token }}
- name: Checkout kestra - name: Checkout kestra
uses: actions/checkout@v4 uses: actions/checkout@v5
with: with:
path: kestra path: kestra

View File

@@ -21,12 +21,12 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
# Checkout # Checkout
- uses: actions/checkout@v4 - uses: actions/checkout@v5
with: with:
fetch-depth: 0 fetch-depth: 0
# Checkout GitHub Actions # Checkout GitHub Actions
- uses: actions/checkout@v4 - uses: actions/checkout@v5
with: with:
repository: kestra-io/actions repository: kestra-io/actions
path: actions path: actions

View File

@@ -33,13 +33,13 @@ jobs:
exit 1; exit 1;
fi fi
# Checkout # Checkout
- uses: actions/checkout@v4 - uses: actions/checkout@v5
with: with:
fetch-depth: 0 fetch-depth: 0
path: kestra path: kestra
# Checkout GitHub Actions # Checkout GitHub Actions
- uses: actions/checkout@v4 - uses: actions/checkout@v5
with: with:
repository: kestra-io/actions repository: kestra-io/actions
path: actions path: actions

View File

@@ -4,9 +4,8 @@ on:
workflow_dispatch: workflow_dispatch:
inputs: inputs:
plugin-version: plugin-version:
description: "Kestra version" description: "plugins version"
default: 'LATEST' required: false
required: true
type: string type: string
push: push:
branches: branches:
@@ -34,7 +33,7 @@ jobs:
if: "!startsWith(github.ref, 'refs/heads/releases')" if: "!startsWith(github.ref, 'refs/heads/releases')"
uses: ./.github/workflows/workflow-release.yml uses: ./.github/workflows/workflow-release.yml
with: with:
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }} plugin-version: ${{ inputs.plugin-version != '' && inputs.plugin-version || (github.ref == 'refs/heads/develop' && 'LATEST-SNAPSHOT' || 'LATEST') }}
secrets: secrets:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }} DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }} DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
@@ -43,7 +42,8 @@ jobs:
SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }} SONATYPE_GPG_KEYID: ${{ secrets.SONATYPE_GPG_KEYID }}
SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }} SONATYPE_GPG_PASSWORD: ${{ secrets.SONATYPE_GPG_PASSWORD }}
SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }} SONATYPE_GPG_FILE: ${{ secrets.SONATYPE_GPG_FILE }}
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}
end: end:
runs-on: ubuntu-latest runs-on: ubuntu-latest
needs: needs:

View File

@@ -17,7 +17,7 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
# Checkout # Checkout
- uses: actions/checkout@v4 - uses: actions/checkout@v5
with: with:
fetch-depth: 0 fetch-depth: 0

View File

@@ -34,7 +34,7 @@ jobs:
fi fi
# Checkout # Checkout
- uses: actions/checkout@v4 - uses: actions/checkout@v5
with: with:
fetch-depth: 0 fetch-depth: 0

View File

@@ -17,12 +17,12 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
# Checkout # Checkout
- uses: actions/checkout@v4 - uses: actions/checkout@v5
with: with:
fetch-depth: 0 fetch-depth: 0
# Checkout GitHub Actions # Checkout GitHub Actions
- uses: actions/checkout@v4 - uses: actions/checkout@v5
with: with:
repository: kestra-io/actions repository: kestra-io/actions
path: actions path: actions
@@ -66,12 +66,12 @@ jobs:
actions: read actions: read
steps: steps:
# Checkout # Checkout
- uses: actions/checkout@v4 - uses: actions/checkout@v5
with: with:
fetch-depth: 0 fetch-depth: 0
# Checkout GitHub Actions # Checkout GitHub Actions
- uses: actions/checkout@v4 - uses: actions/checkout@v5
with: with:
repository: kestra-io/actions repository: kestra-io/actions
path: actions path: actions
@@ -111,12 +111,12 @@ jobs:
actions: read actions: read
steps: steps:
# Checkout # Checkout
- uses: actions/checkout@v4 - uses: actions/checkout@v5
with: with:
fetch-depth: 0 fetch-depth: 0
# Checkout GitHub Actions # Checkout GitHub Actions
- uses: actions/checkout@v4 - uses: actions/checkout@v5
with: with:
repository: kestra-io/actions repository: kestra-io/actions
path: actions path: actions

View File

@@ -29,7 +29,7 @@ jobs:
GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }} GOOGLE_SERVICE_ACCOUNT: ${{ secrets.GOOGLE_SERVICE_ACCOUNT }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v5
name: Checkout - Current ref name: Checkout - Current ref
with: with:
fetch-depth: 0 fetch-depth: 0

View File

@@ -1,23 +1,7 @@
name: Build Artifacts name: Build Artifacts
on: on:
workflow_call: workflow_call: {}
inputs:
plugin-version:
description: "Kestra version"
default: 'LATEST'
required: true
type: string
outputs:
docker-tag:
value: ${{ jobs.build.outputs.docker-tag }}
description: "The Docker image Tag for Kestra"
docker-artifact-name:
value: ${{ jobs.build.outputs.docker-artifact-name }}
description: "The GitHub artifact containing the Kestra docker image name."
plugins:
value: ${{ jobs.build.outputs.plugins }}
description: "The Kestra plugins list used for the build."
jobs: jobs:
build: build:
@@ -31,7 +15,7 @@ jobs:
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }} PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
steps: steps:
- name: Checkout - Current ref - name: Checkout - Current ref
uses: actions/checkout@v4 uses: actions/checkout@v5
with: with:
fetch-depth: 0 fetch-depth: 0
@@ -82,55 +66,6 @@ jobs:
run: | run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
# Docker Tag
- name: Setup - Docker vars
id: vars
shell: bash
run: |
TAG=${GITHUB_REF#refs/*/}
if [[ $TAG = "master" ]]
then
TAG="latest";
elif [[ $TAG = "develop" ]]
then
TAG="develop";
elif [[ $TAG = v* ]]
then
TAG="${TAG}";
else
TAG="build-${{ github.run_id }}";
fi
echo "tag=${TAG}" >> $GITHUB_OUTPUT
echo "artifact=docker-kestra-${TAG}" >> $GITHUB_OUTPUT
# Docker setup
- name: Docker - Setup QEMU
uses: docker/setup-qemu-action@v3
- name: Docker - Fix Qemu
shell: bash
run: |
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
- name: Docker - Setup Buildx
uses: docker/setup-buildx-action@v3
# Docker Build
- name: Docker - Build & export image
uses: docker/build-push-action@v6
if: "!startsWith(github.ref, 'refs/tags/v')"
with:
context: .
push: false
file: Dockerfile
tags: |
kestra/kestra:${{ steps.vars.outputs.tag }}
build-args: |
KESTRA_PLUGINS=${{ steps.plugins.outputs.plugins }}
APT_PACKAGES=${{ env.DOCKER_APT_PACKAGES }}
PYTHON_LIBRARIES=${{ env.DOCKER_PYTHON_LIBRARIES }}
outputs: type=docker,dest=/tmp/${{ steps.vars.outputs.artifact }}.tar
# Upload artifacts # Upload artifacts
- name: Artifacts - Upload JAR - name: Artifacts - Upload JAR
uses: actions/upload-artifact@v4 uses: actions/upload-artifact@v4
@@ -143,10 +78,3 @@ jobs:
with: with:
name: exe name: exe
path: build/executable/ path: build/executable/
- name: Artifacts - Upload Docker
uses: actions/upload-artifact@v4
if: "!startsWith(github.ref, 'refs/tags/v')"
with:
name: ${{ steps.vars.outputs.artifact }}
path: /tmp/${{ steps.vars.outputs.artifact }}.tar

View File

@@ -20,7 +20,7 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: Checkout - name: Checkout
uses: actions/checkout@v4 uses: actions/checkout@v5
- name: Cache Node Modules - name: Cache Node Modules
id: cache-node-modules id: cache-node-modules

View File

@@ -1,14 +1,17 @@
name: Github - Release name: Github - Release
on: on:
workflow_dispatch:
workflow_call: workflow_call:
secrets: secrets:
GH_PERSONAL_TOKEN: GH_PERSONAL_TOKEN:
description: "The Github personal token." description: "The Github personal token."
required: true required: true
push: SLACK_RELEASES_WEBHOOK_URL:
tags: description: "The Slack webhook URL."
- '*' required: true
jobs: jobs:
publish: publish:
@@ -17,14 +20,14 @@ jobs:
steps: steps:
# Check out # Check out
- name: Checkout - Repository - name: Checkout - Repository
uses: actions/checkout@v4 uses: actions/checkout@v5
with: with:
fetch-depth: 0 fetch-depth: 0
submodules: true submodules: true
# Checkout GitHub Actions # Checkout GitHub Actions
- name: Checkout - Actions - name: Checkout - Actions
uses: actions/checkout@v4 uses: actions/checkout@v5
with: with:
repository: kestra-io/actions repository: kestra-io/actions
sparse-checkout-cone-mode: true sparse-checkout-cone-mode: true
@@ -35,7 +38,7 @@ jobs:
# Download Exec # Download Exec
# Must be done after checkout actions # Must be done after checkout actions
- name: Artifacts - Download executable - name: Artifacts - Download executable
uses: actions/download-artifact@v4 uses: actions/download-artifact@v5
if: startsWith(github.ref, 'refs/tags/v') if: startsWith(github.ref, 'refs/tags/v')
with: with:
name: exe name: exe
@@ -75,4 +78,11 @@ jobs:
"new_version": "${{ github.ref_name }}", "new_version": "${{ github.ref_name }}",
"github_repository": "${{ github.repository }}", "github_repository": "${{ github.repository }}",
"github_actor": "${{ github.actor }}" "github_actor": "${{ github.actor }}"
} }
- name: Merge Release Notes
if: ${{ startsWith(github.ref, 'refs/tags/v') }}
uses: ./actions/.github/actions/github-release-note-merge
env:
GITHUB_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
RELEASE_TAG: ${{ github.ref_name }}

View File

@@ -1,22 +1,37 @@
name: Publish - Docker name: Create Docker images on Release
on: on:
workflow_dispatch: workflow_dispatch:
inputs: inputs:
plugin-version: retag-latest:
description: "Kestra version" description: 'Retag latest Docker images'
default: 'LATEST' required: true
type: choice
default: "false"
options:
- "true"
- "false"
release-tag:
description: 'Kestra Release Tag (by default, deduced with the ref)'
required: false required: false
type: string type: string
plugin-version:
description: 'Plugin version'
required: false
type: string
default: "LATEST"
force-download-artifact: force-download-artifact:
description: 'Force download artifact' description: 'Force download artifact'
required: false required: false
type: string type: choice
default: "true" default: "true"
options:
- "true"
- "false"
workflow_call: workflow_call:
inputs: inputs:
plugin-version: plugin-version:
description: "Kestra version" description: "Plugin version"
default: 'LATEST' default: 'LATEST'
required: false required: false
type: string type: string
@@ -33,47 +48,93 @@ on:
description: "The Dockerhub password." description: "The Dockerhub password."
required: true required: true
env:
PLUGIN_VERSION: ${{ inputs.plugin-version != null && inputs.plugin-version || 'LATEST' }}
jobs: jobs:
plugins:
name: List Plugins
runs-on: ubuntu-latest
outputs:
plugins: ${{ steps.plugins.outputs.plugins }}
steps:
# Checkout
- uses: actions/checkout@v5
# Get Plugins List
- name: Get Plugins List
uses: ./.github/actions/plugins-list
id: plugins
with: # remap LATEST-SNAPSHOT to LATEST
plugin-version: ${{ env.PLUGIN_VERSION == 'LATEST-SNAPSHOT' && 'LATEST' || env.PLUGIN_VERSION }}
# ******************************************************************************************************************** # ********************************************************************************************************************
# Build # Build
# ******************************************************************************************************************** # ********************************************************************************************************************
build-artifacts: build-artifacts:
name: Build Artifacts name: Build Artifacts
if: ${{ github.event.inputs.force-download-artifact == 'true' }} if: ${{ inputs.force-download-artifact == 'true' }}
uses: ./.github/workflows/workflow-build-artifacts.yml uses: ./.github/workflows/workflow-build-artifacts.yml
with:
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }} docker:
# ******************************************************************************************************************** name: Publish Docker
# Docker needs: [ plugins, build-artifacts ]
# ******************************************************************************************************************** if: always()
publish:
name: Publish - Docker
runs-on: ubuntu-latest runs-on: ubuntu-latest
needs: build-artifacts
if: |
always() &&
(needs.build-artifacts.result == 'success' ||
github.event.inputs.force-download-artifact != 'true')
env:
PLUGIN_VERSION: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
strategy: strategy:
matrix: matrix:
image: image:
- tag: -no-plugins - name: "-no-plugins"
plugins: ""
packages: jattach packages: jattach
plugins: false python-libs: ""
python-libraries: "" - name: ""
plugins: ${{needs.plugins.outputs.plugins}}
- tag: "" packages: python3 python-is-python3 python3-pip curl jattach
plugins: true python-libs: kestra
packages: python3 python3-venv python-is-python3 python3-pip nodejs npm curl zip unzip jattach
python-libraries: kestra
steps: steps:
- name: Checkout - Current ref - uses: actions/checkout@v5
uses: actions/checkout@v4
# Vars
- name: Set image name
id: vars
run: |
if [[ "${{ inputs.release-tag }}" == "" ]]; then
TAG=${GITHUB_REF#refs/*/}
echo "tag=${TAG}" >> $GITHUB_OUTPUT
else
TAG="${{ inputs.release-tag }}"
echo "tag=${TAG}" >> $GITHUB_OUTPUT
fi
if [[ $GITHUB_REF == refs/tags/* ]]; then
if [[ $TAG =~ ^v[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
# this will remove the patch version number
MINOR_SEMVER=${TAG%.*}
echo "minor_semver=${MINOR_SEMVER}" >> $GITHUB_OUTPUT
else
echo "Tag '$TAG' is not a valid semver (vMAJOR.MINOR.PATCH), skipping minor_semver"
fi
fi
if [[ "${{ env.PLUGIN_VERSION }}" == *"-SNAPSHOT" ]]; then
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ ${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT;
else
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
fi
# Download executable from artifact
- name: Artifacts - Download executable
uses: actions/download-artifact@v5
with:
name: exe
path: build/executable
- name: Copy exe to image
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
# Docker setup # Docker setup
- name: Docker - Setup QEMU - name: Set up QEMU
uses: docker/setup-qemu-action@v3 uses: docker/setup-qemu-action@v3
- name: Docker - Fix Qemu - name: Docker - Fix Qemu
@@ -81,66 +142,59 @@ jobs:
run: | run: |
docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes docker run --rm --privileged multiarch/qemu-user-static --reset -p yes -c yes
- name: Docker - Setup Docker Buildx - name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3 uses: docker/setup-buildx-action@v3
# Docker Login # Docker Login
- name: Docker - Login to DockerHub - name: Login to DockerHub
uses: docker/login-action@v3 uses: docker/login-action@v3
with: with:
username: ${{ secrets.DOCKERHUB_USERNAME }} username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_PASSWORD }} password: ${{ secrets.DOCKERHUB_PASSWORD }}
# # Get Plugins List
- name: Plugins - Get List
uses: ./.github/actions/plugins-list
id: plugins-list
if: ${{ matrix.image.plugins}}
with:
plugin-version: ${{ env.PLUGIN_VERSION }}
# Vars
- name: Docker - Set variables
shell: bash
id: vars
run: |
TAG=${GITHUB_REF#refs/*/}
PLUGINS="${{ matrix.image.plugins == true && steps.plugins-list.outputs.plugins || '' }}"
if [[ $TAG == v* ]]; then
TAG="${TAG}";
echo "plugins=${{ matrix.image.plugins }}" >> $GITHUB_OUTPUT
elif [[ $TAG = "develop" ]]; then
TAG="develop";
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ $PLUGINS" >> $GITHUB_OUTPUT
else
TAG="build-${{ github.run_id }}";
echo "plugins=--repositories=https://central.sonatype.com/repository/maven-snapshots/ $PLUGINS" >> $GITHUB_OUTPUT
fi
echo "tag=${TAG}${{ matrix.image.tag }}" >> $GITHUB_OUTPUT
# Build Docker Image
- name: Artifacts - Download executable
uses: actions/download-artifact@v4
with:
name: exe
path: build/executable
- name: Docker - Copy exe to image
shell: bash
run: |
cp build/executable/* docker/app/kestra && chmod +x docker/app/kestra
# Docker Build and push # Docker Build and push
- name: Docker - Build image - name: Push to Docker Hub
uses: docker/build-push-action@v6 uses: docker/build-push-action@v6
with: with:
context: . context: .
push: true push: true
tags: kestra/kestra:${{ steps.vars.outputs.tag }} tags: ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }}
platforms: linux/amd64,linux/arm64 platforms: linux/amd64,linux/arm64
build-args: | build-args: |
KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }} KESTRA_PLUGINS=${{ steps.vars.outputs.plugins }}
APT_PACKAGES=${{ matrix.image.packages }} APT_PACKAGES=${{ matrix.image.packages }}
PYTHON_LIBRARIES=${{ matrix.image.python-libraries }} PYTHON_LIBRARIES=${{ matrix.image.python-libs }}
- name: Install regctl
if: startsWith(github.ref, 'refs/tags/v')
uses: regclient/actions/regctl-installer@main
- name: Retag to minor semver version
if: startsWith(github.ref, 'refs/tags/v') && steps.vars.outputs.minor_semver != ''
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.minor_semver, matrix.image.name) }}
- name: Retag to latest
if: startsWith(github.ref, 'refs/tags/v') && inputs.retag-latest == 'true'
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
end:
runs-on: ubuntu-latest
needs:
- docker
if: always()
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
steps:
# Slack
- name: Slack notification
uses: Gamesight/slack-workflow-status@master
if: ${{ always() && env.SLACK_WEBHOOK_URL != 0 }}
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
slack_webhook_url: ${{ secrets.SLACK_WEBHOOK_URL }}
name: GitHub Actions
icon_emoji: ':github-actions:'
channel: 'C02DQ1A7JLR' # _int_git channel

View File

@@ -25,7 +25,7 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: Checkout - Current ref - name: Checkout - Current ref
uses: actions/checkout@v4 uses: actions/checkout@v5
# Setup build # Setup build
- name: Setup - Build - name: Setup - Build

View File

@@ -4,7 +4,7 @@ on:
workflow_dispatch: workflow_dispatch:
inputs: inputs:
plugin-version: plugin-version:
description: "Kestra version" description: "plugins version"
default: 'LATEST' default: 'LATEST'
required: false required: false
type: string type: string
@@ -16,7 +16,7 @@ on:
workflow_call: workflow_call:
inputs: inputs:
plugin-version: plugin-version:
description: "Kestra version" description: "plugins version"
default: 'LATEST' default: 'LATEST'
required: false required: false
type: string type: string
@@ -42,21 +42,25 @@ on:
SONATYPE_GPG_FILE: SONATYPE_GPG_FILE:
description: "The Sonatype GPG file." description: "The Sonatype GPG file."
required: true required: true
GH_PERSONAL_TOKEN:
description: "GH personnal Token."
required: true
SLACK_RELEASES_WEBHOOK_URL:
description: "Slack webhook for releases channel."
required: true
jobs: jobs:
build-artifacts: build-artifacts:
name: Build - Artifacts name: Build - Artifacts
uses: ./.github/workflows/workflow-build-artifacts.yml uses: ./.github/workflows/workflow-build-artifacts.yml
with:
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }}
Docker: Docker:
name: Publish Docker name: Publish Docker
needs: build-artifacts needs: build-artifacts
uses: ./.github/workflows/workflow-publish-docker.yml uses: ./.github/workflows/workflow-publish-docker.yml
if: startsWith(github.ref, 'refs/heads/develop') || github.event.inputs.publish-docker == 'true' if: github.ref == 'refs/heads/develop' || inputs.publish-docker == 'true'
with: with:
force-download-artifact: 'false' force-download-artifact: 'false'
plugin-version: ${{ github.event.inputs.plugin-version != null && github.event.inputs.plugin-version || 'LATEST' }} plugin-version: ${{ inputs.plugin-version != null && inputs.plugin-version || 'LATEST' }}
secrets: secrets:
DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }} DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }} DOCKERHUB_PASSWORD: ${{ secrets.DOCKERHUB_PASSWORD }}
@@ -77,4 +81,5 @@ jobs:
if: startsWith(github.ref, 'refs/tags/v') if: startsWith(github.ref, 'refs/tags/v')
uses: ./.github/workflows/workflow-github-release.yml uses: ./.github/workflows/workflow-github-release.yml
secrets: secrets:
GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }} GH_PERSONAL_TOKEN: ${{ secrets.GH_PERSONAL_TOKEN }}
SLACK_RELEASES_WEBHOOK_URL: ${{ secrets.SLACK_RELEASES_WEBHOOK_URL }}

View File

@@ -27,7 +27,7 @@ jobs:
ui: ${{ steps.changes.outputs.ui }} ui: ${{ steps.changes.outputs.ui }}
backend: ${{ steps.changes.outputs.backend }} backend: ${{ steps.changes.outputs.backend }}
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v5
if: "!startsWith(github.ref, 'refs/tags/v')" if: "!startsWith(github.ref, 'refs/tags/v')"
- uses: dorny/paths-filter@v3 - uses: dorny/paths-filter@v3
if: "!startsWith(github.ref, 'refs/tags/v')" if: "!startsWith(github.ref, 'refs/tags/v')"

View File

@@ -87,13 +87,18 @@
#plugin-powerbi:io.kestra.plugin:plugin-powerbi:LATEST #plugin-powerbi:io.kestra.plugin:plugin-powerbi:LATEST
#plugin-pulsar:io.kestra.plugin:plugin-pulsar:LATEST #plugin-pulsar:io.kestra.plugin:plugin-pulsar:LATEST
#plugin-redis:io.kestra.plugin:plugin-redis:LATEST #plugin-redis:io.kestra.plugin:plugin-redis:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-bun:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-deno:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-go:LATEST #plugin-scripts:io.kestra.plugin:plugin-script-go:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-groovy:LATEST #plugin-scripts:io.kestra.plugin:plugin-script-groovy:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-jbang:LATEST #plugin-scripts:io.kestra.plugin:plugin-script-jbang:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-julia:LATEST #plugin-scripts:io.kestra.plugin:plugin-script-julia:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-jython:LATEST #plugin-scripts:io.kestra.plugin:plugin-script-jython:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-lua:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-nashorn:LATEST #plugin-scripts:io.kestra.plugin:plugin-script-nashorn:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-node:LATEST #plugin-scripts:io.kestra.plugin:plugin-script-node:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-perl:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-php:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-powershell:LATEST #plugin-scripts:io.kestra.plugin:plugin-script-powershell:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-python:LATEST #plugin-scripts:io.kestra.plugin:plugin-script-python:LATEST
#plugin-scripts:io.kestra.plugin:plugin-script-r:LATEST #plugin-scripts:io.kestra.plugin:plugin-script-r:LATEST

View File

@@ -122,12 +122,13 @@ public class JsonSchemaGenerator {
if (jsonNode instanceof ObjectNode clazzSchema && clazzSchema.get("required") instanceof ArrayNode requiredPropsNode && clazzSchema.get("properties") instanceof ObjectNode properties) { if (jsonNode instanceof ObjectNode clazzSchema && clazzSchema.get("required") instanceof ArrayNode requiredPropsNode && clazzSchema.get("properties") instanceof ObjectNode properties) {
List<String> requiredFieldValues = StreamSupport.stream(requiredPropsNode.spliterator(), false) List<String> requiredFieldValues = StreamSupport.stream(requiredPropsNode.spliterator(), false)
.map(JsonNode::asText) .map(JsonNode::asText)
.toList(); .collect(Collectors.toList());
properties.fields().forEachRemaining(e -> { properties.fields().forEachRemaining(e -> {
int indexInRequiredArray = requiredFieldValues.indexOf(e.getKey()); int indexInRequiredArray = requiredFieldValues.indexOf(e.getKey());
if (indexInRequiredArray != -1 && e.getValue() instanceof ObjectNode valueNode && valueNode.has("default")) { if (indexInRequiredArray != -1 && e.getValue() instanceof ObjectNode valueNode && valueNode.has("default")) {
requiredPropsNode.remove(indexInRequiredArray); requiredPropsNode.remove(indexInRequiredArray);
requiredFieldValues.remove(indexInRequiredArray);
} }
}); });

View File

@@ -1040,6 +1040,16 @@ public class Execution implements DeletedInterface, TenantInterface {
return result; return result;
} }
/**
* Find all children of this {@link TaskRun}.
*/
public List<TaskRun> findChildren(TaskRun parentTaskRun) {
return taskRunList.stream()
.filter(taskRun -> parentTaskRun.getId().equals(taskRun.getParentTaskRunId()))
.toList();
}
public List<String> findParentsValues(TaskRun taskRun, boolean withCurrent) { public List<String> findParentsValues(TaskRun taskRun, boolean withCurrent) {
return (withCurrent ? return (withCurrent ?
Stream.concat(findParents(taskRun).stream(), Stream.of(taskRun)) : Stream.concat(findParents(taskRun).stream(), Stream.of(taskRun)) :

View File

@@ -116,7 +116,7 @@ public class State {
} }
public Instant maxDate() { public Instant maxDate() {
if (this.histories.size() == 0) { if (this.histories.isEmpty()) {
return Instant.now(); return Instant.now();
} }
@@ -124,7 +124,7 @@ public class State {
} }
public Instant minDate() { public Instant minDate() {
if (this.histories.size() == 0) { if (this.histories.isEmpty()) {
return Instant.now(); return Instant.now();
} }
@@ -173,6 +173,11 @@ public class State {
return this.current.isBreakpoint(); return this.current.isBreakpoint();
} }
@JsonIgnore
public boolean isQueued() {
return this.current.isQueued();
}
@JsonIgnore @JsonIgnore
public boolean isRetrying() { public boolean isRetrying() {
return this.current.isRetrying(); return this.current.isRetrying();
@@ -206,6 +211,14 @@ public class State {
return this.histories.get(this.histories.size() - 2).state.isPaused(); return this.histories.get(this.histories.size() - 2).state.isPaused();
} }
/**
* Return true if the execution has failed, then was restarted.
* This is to disambiguate between a RESTARTED after PAUSED and RESTARTED after FAILED state.
*/
public boolean failedThenRestarted() {
return this.current == Type.RESTARTED && this.histories.get(this.histories.size() - 2).state.isFailed();
}
@Introspected @Introspected
public enum Type { public enum Type {
CREATED, CREATED,
@@ -264,6 +277,10 @@ public class State {
return this == Type.KILLED; return this == Type.KILLED;
} }
public boolean isQueued(){
return this == Type.QUEUED;
}
/** /**
* @return states that are terminal to an execution * @return states that are terminal to an execution
*/ */

View File

@@ -68,6 +68,19 @@ public class Property<T> {
String getExpression() { String getExpression() {
return expression; return expression;
} }
/**
* Returns a new {@link Property} with no cached rendered value,
* so that the next render will evaluate its original Pebble expression.
* <p>
* The returned property will still cache its rendered result.
* To re-evaluate on a subsequent render, call {@code skipCache()} again.
*
* @return a new {@link Property} without a pre-rendered value
*/
public Property<T> skipCache() {
return Property.ofExpression(expression);
}
/** /**
* Build a new Property object with a value already set.<br> * Build a new Property object with a value already set.<br>

View File

@@ -5,6 +5,7 @@ import io.kestra.core.models.Pauseable;
import io.kestra.core.utils.Either; import io.kestra.core.utils.Either;
import java.io.Closeable; import java.io.Closeable;
import java.util.List;
import java.util.function.Consumer; import java.util.function.Consumer;
public interface QueueInterface<T> extends Closeable, Pauseable { public interface QueueInterface<T> extends Closeable, Pauseable {
@@ -18,7 +19,15 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
emitAsync(null, message); emitAsync(null, message);
} }
void emitAsync(String consumerGroup, T message) throws QueueException; default void emitAsync(String consumerGroup, T message) throws QueueException {
emitAsync(consumerGroup, List.of(message));
}
default void emitAsync(List<T> messages) throws QueueException {
emitAsync(null, messages);
}
void emitAsync(String consumerGroup, List<T> messages) throws QueueException;
default void delete(T message) throws QueueException { default void delete(T message) throws QueueException {
delete(null, message); delete(null, message);
@@ -27,7 +36,7 @@ public interface QueueInterface<T> extends Closeable, Pauseable {
void delete(String consumerGroup, T message) throws QueueException; void delete(String consumerGroup, T message) throws QueueException;
default Runnable receive(Consumer<Either<T, DeserializationException>> consumer) { default Runnable receive(Consumer<Either<T, DeserializationException>> consumer) {
return receive((String) null, consumer); return receive(null, consumer, false);
} }
default Runnable receive(String consumerGroup, Consumer<Either<T, DeserializationException>> consumer) { default Runnable receive(String consumerGroup, Consumer<Either<T, DeserializationException>> consumer) {

View File

@@ -0,0 +1,12 @@
package io.kestra.core.queues;
import java.io.Serial;
public class UnsupportedMessageException extends QueueException {
@Serial
private static final long serialVersionUID = 1L;
public UnsupportedMessageException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -161,7 +161,7 @@ public interface ExecutionRepositoryInterface extends SaveRepositoryInterface<Ex
} }
List<Execution> lastExecutions( List<Execution> lastExecutions(
@Nullable String tenantId, String tenantId,
@Nullable List<FlowFilter> flows @Nullable List<FlowFilter> flows
); );
} }

View File

@@ -86,7 +86,7 @@ public class Executor {
public Boolean canBeProcessed() { public Boolean canBeProcessed() {
return !(this.getException() != null || this.getFlow() == null || this.getFlow() instanceof FlowWithException || this.getFlow().getTasks() == null || return !(this.getException() != null || this.getFlow() == null || this.getFlow() instanceof FlowWithException || this.getFlow().getTasks() == null ||
this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint()); this.getExecution().isDeleted() || this.getExecution().getState().isPaused() || this.getExecution().getState().isBreakpoint() || this.getExecution().getState().isQueued());
} }
public Executor withFlow(FlowWithSource flow) { public Executor withFlow(FlowWithSource flow) {

View File

@@ -237,9 +237,9 @@ public class ExecutorService {
try { try {
state = flowableParent.resolveState(runContext, execution, parentTaskRun); state = flowableParent.resolveState(runContext, execution, parentTaskRun);
} catch (Exception e) { } catch (Exception e) {
// This will lead to the next task being still executed but at least Kestra will not crash. // This will lead to the next task being still executed, but at least Kestra will not crash.
// This is the best we can do, Flowable task should not fail, so it's a kind of panic mode. // This is the best we can do, Flowable task should not fail, so it's a kind of panic mode.
runContext.logger().error("Unable to resolve state from the Flowable task: " + e.getMessage(), e); runContext.logger().error("Unable to resolve state from the Flowable task: {}", e.getMessage(), e);
state = Optional.of(State.Type.FAILED); state = Optional.of(State.Type.FAILED);
} }
Optional<WorkerTaskResult> endedTask = childWorkerTaskTypeToWorkerTask( Optional<WorkerTaskResult> endedTask = childWorkerTaskTypeToWorkerTask(
@@ -589,6 +589,23 @@ public class ExecutorService {
list = list.stream().filter(workerTaskResult -> !workerTaskResult.getTaskRun().getId().equals(taskRun.getParentTaskRunId())) list = list.stream().filter(workerTaskResult -> !workerTaskResult.getTaskRun().getId().equals(taskRun.getParentTaskRunId()))
.collect(Collectors.toCollection(ArrayList::new)); .collect(Collectors.toCollection(ArrayList::new));
} }
// If the task is a flowable and its terminated, check that all children are terminated.
// This may not be the case for parallel flowable tasks like Parallel, Dag, ForEach...
// After a fail task, some child flowable may not be correctly terminated.
if (task instanceof FlowableTask<?> && taskRun.getState().isTerminated()) {
List<TaskRun> updated = executor.getExecution().findChildren(taskRun).stream()
.filter(child -> !child.getState().isTerminated())
.map(throwFunction(child -> child.withState(taskRun.getState().getCurrent())))
.toList();
if (!updated.isEmpty()) {
Execution execution = executor.getExecution();
for (TaskRun child : updated) {
execution = execution.withTaskRun(child);
}
executor = executor.withExecution(execution, "handledTerminatedFlowableTasks");
}
}
} }
metricRegistry metricRegistry

View File

@@ -29,7 +29,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class RunContextLogger implements Supplier<org.slf4j.Logger> { public class RunContextLogger implements Supplier<org.slf4j.Logger> {
private static final int MAX_MESSAGE_LENGTH = 1024 * 10; private static final int MAX_MESSAGE_LENGTH = 1024 * 15;
public static final String ORIGINAL_TIMESTAMP_KEY = "originalTimestamp"; public static final String ORIGINAL_TIMESTAMP_KEY = "originalTimestamp";
private final String loggerName; private final String loggerName;
@@ -80,7 +80,6 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
} }
List<LogEntry> result = new ArrayList<>(); List<LogEntry> result = new ArrayList<>();
long i = 0;
for (String s : split) { for (String s : split) {
result.add(LogEntry.builder() result.add(LogEntry.builder()
.namespace(logEntry.getNamespace()) .namespace(logEntry.getNamespace())
@@ -98,7 +97,6 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
.thread(event.getThreadName()) .thread(event.getThreadName())
.build() .build()
); );
i++;
} }
return result; return result;
@@ -331,14 +329,11 @@ public class RunContextLogger implements Supplier<org.slf4j.Logger> {
protected void append(ILoggingEvent e) { protected void append(ILoggingEvent e) {
e = this.transform(e); e = this.transform(e);
logEntries(e, logEntry) try {
.forEach(l -> { logQueue.emitAsync(logEntries(e, logEntry));
try { } catch (QueueException ex) {
logQueue.emitAsync(l); log.warn("Unable to emit logQueue", ex);
} catch (QueueException ex) { }
log.warn("Unable to emit logQueue", ex);
}
});
} }
} }

View File

@@ -4,15 +4,11 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.property.Property; import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.models.triggers.AbstractTrigger;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.Validator;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import static io.kestra.core.utils.Rethrow.throwFunction; import static io.kestra.core.utils.Rethrow.throwFunction;
@@ -27,12 +23,19 @@ public class RunContextProperty<T> {
private final RunContext runContext; private final RunContext runContext;
private final Task task; private final Task task;
private final AbstractTrigger trigger; private final AbstractTrigger trigger;
private final boolean skipCache;
RunContextProperty(Property<T> property, RunContext runContext) { RunContextProperty(Property<T> property, RunContext runContext) {
this(property, runContext, false);
}
RunContextProperty(Property<T> property, RunContext runContext, boolean skipCache) {
this.property = property; this.property = property;
this.runContext = runContext; this.runContext = runContext;
this.task = ((DefaultRunContext) runContext).getTask(); this.task = ((DefaultRunContext) runContext).getTask();
this.trigger = ((DefaultRunContext) runContext).getTrigger(); this.trigger = ((DefaultRunContext) runContext).getTrigger();
this.skipCache = skipCache;
} }
private void validate() { private void validate() {
@@ -45,6 +48,19 @@ public class RunContextProperty<T> {
log.trace("Unable to do validation: no task or trigger found"); log.trace("Unable to do validation: no task or trigger found");
} }
} }
/**
* Returns a new {@link RunContextProperty} that will always be rendered by evaluating
* its original Pebble expression, without using any previously cached value.
* <p>
* This ensures that each time the property is rendered, the underlying
* expression is re-evaluated to produce a fresh result.
*
* @return a new {@link Property} that bypasses the cache
*/
public RunContextProperty<T> skipCache() {
return new RunContextProperty<>(this.property, this.runContext, true);
}
/** /**
* Render a property then convert it to its target type and validate it.<br> * Render a property then convert it to its target type and validate it.<br>
@@ -55,13 +71,13 @@ public class RunContextProperty<T> {
* Warning, due to the caching mechanism, this method is not thread-safe. * Warning, due to the caching mechanism, this method is not thread-safe.
*/ */
public Optional<T> as(Class<T> clazz) throws IllegalVariableEvaluationException { public Optional<T> as(Class<T> clazz) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property) var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.as(prop, this.runContext, clazz))); .map(throwFunction(prop -> Property.as(prop, this.runContext, clazz)));
validate(); validate();
return as; return as;
} }
/** /**
* Render a property with additional variables, then convert it to its target type and validate it.<br> * Render a property with additional variables, then convert it to its target type and validate it.<br>
* *
@@ -71,7 +87,7 @@ public class RunContextProperty<T> {
* Warning, due to the caching mechanism, this method is not thread-safe. * Warning, due to the caching mechanism, this method is not thread-safe.
*/ */
public Optional<T> as(Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException { public Optional<T> as(Class<T> clazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property) var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.as(prop, this.runContext, clazz, variables))); .map(throwFunction(prop -> Property.as(prop, this.runContext, clazz, variables)));
validate(); validate();
@@ -89,7 +105,7 @@ public class RunContextProperty<T> {
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <I> T asList(Class<I> itemClazz) throws IllegalVariableEvaluationException { public <I> T asList(Class<I> itemClazz) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property) var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz))) .map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz)))
.orElse((T) Collections.emptyList()); .orElse((T) Collections.emptyList());
@@ -108,7 +124,7 @@ public class RunContextProperty<T> {
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <I> T asList(Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException { public <I> T asList(Class<I> itemClazz, Map<String, Object> variables) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property) var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz, variables))) .map(throwFunction(prop -> Property.asList(prop, this.runContext, itemClazz, variables)))
.orElse((T) Collections.emptyList()); .orElse((T) Collections.emptyList());
@@ -127,7 +143,7 @@ public class RunContextProperty<T> {
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException { public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property) var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass))) .map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass)))
.orElse((T) Collections.emptyMap()); .orElse((T) Collections.emptyMap());
@@ -146,11 +162,15 @@ public class RunContextProperty<T> {
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException { public <K,V> T asMap(Class<K> keyClass, Class<V> valueClass, Map<String, Object> variables) throws IllegalVariableEvaluationException {
var as = Optional.ofNullable(this.property) var as = Optional.ofNullable(getProperty())
.map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass, variables))) .map(throwFunction(prop -> Property.asMap(prop, this.runContext, keyClass, valueClass, variables)))
.orElse((T) Collections.emptyMap()); .orElse((T) Collections.emptyMap());
validate(); validate();
return as; return as;
} }
private Property<T> getProperty() {
return skipCache ? this.property.skipCache() : this.property;
}
} }

View File

@@ -508,14 +508,11 @@ public class Worker implements Service, Runnable, AutoCloseable {
Execution execution = workerTrigger.getTrigger().isFailOnTriggerError() ? TriggerService.generateExecution(workerTrigger.getTrigger(), workerTrigger.getConditionContext(), workerTrigger.getTriggerContext(), (Output) null) Execution execution = workerTrigger.getTrigger().isFailOnTriggerError() ? TriggerService.generateExecution(workerTrigger.getTrigger(), workerTrigger.getConditionContext(), workerTrigger.getTriggerContext(), (Output) null)
.withState(FAILED) : null; .withState(FAILED) : null;
if (execution != null) { if (execution != null) {
RunContextLogger.logEntries(Execution.loggingEventFromException(e), LogEntry.of(execution)) try {
.forEach(log -> { logQueue.emitAsync(RunContextLogger.logEntries(Execution.loggingEventFromException(e), LogEntry.of(execution)));
try { } catch (QueueException ex) {
logQueue.emitAsync(log); // fail silently
} catch (QueueException ex) { }
// fail silently
}
});
} }
this.workerTriggerResultQueue.emit( this.workerTriggerResultQueue.emit(
WorkerTriggerResult.builder() WorkerTriggerResult.builder()
@@ -764,6 +761,7 @@ public class Worker implements Service, Runnable, AutoCloseable {
workerTask = workerTask.withTaskRun(workerTask.getTaskRun().withState(state)); workerTask = workerTask.withTaskRun(workerTask.getTaskRun().withState(state));
WorkerTaskResult workerTaskResult = new WorkerTaskResult(workerTask.getTaskRun(), dynamicTaskRuns); WorkerTaskResult workerTaskResult = new WorkerTaskResult(workerTask.getTaskRun(), dynamicTaskRuns);
this.workerTaskResultQueue.emit(workerTaskResult); this.workerTaskResultQueue.emit(workerTaskResult);
// upload the cache file, hash may not be present if we didn't succeed in computing it // upload the cache file, hash may not be present if we didn't succeed in computing it
@@ -796,6 +794,10 @@ public class Worker implements Service, Runnable, AutoCloseable {
// If it's a message too big, we remove the outputs // If it's a message too big, we remove the outputs
failed = failed.withOutputs(Variables.empty()); failed = failed.withOutputs(Variables.empty());
} }
if (e instanceof UnsupportedMessageException) {
// we expect the offending char is in the output so we remove it
failed = failed.withOutputs(Variables.empty());
}
WorkerTaskResult workerTaskResult = new WorkerTaskResult(failed); WorkerTaskResult workerTaskResult = new WorkerTaskResult(failed);
RunContextLogger contextLogger = runContextLoggerFactory.create(workerTask); RunContextLogger contextLogger = runContextLoggerFactory.create(workerTask);
contextLogger.logger().error("Unable to emit the worker task result to the queue: {}", e.getMessage(), e); contextLogger.logger().error("Unable to emit the worker task result to the queue: {}", e.getMessage(), e);
@@ -818,7 +820,11 @@ public class Worker implements Service, Runnable, AutoCloseable {
private Optional<String> hashTask(RunContext runContext, Task task) { private Optional<String> hashTask(RunContext runContext, Task task) {
try { try {
var map = JacksonMapper.toMap(task); var map = JacksonMapper.toMap(task);
var rMap = runContext.render(map); // If there are task provided variables, rendering the task may fail.
// The best we can do is to add a fake 'workingDir' as it's an often added variables,
// and it should not be part of the task hash.
Map<String, Object> variables = Map.of("workingDir", "workingDir");
var rMap = runContext.render(map, variables);
var json = JacksonMapper.ofJson().writeValueAsBytes(rMap); var json = JacksonMapper.ofJson().writeValueAsBytes(rMap);
MessageDigest digest = MessageDigest.getInstance("SHA-256"); MessageDigest digest = MessageDigest.getInstance("SHA-256");
digest.update(json); digest.update(json);

View File

@@ -8,6 +8,7 @@ import io.kestra.core.events.CrudEventType;
import io.kestra.core.exceptions.DeserializationException; import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.exceptions.InternalException; import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry; import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.conditions.Condition; import io.kestra.core.models.conditions.Condition;
import io.kestra.core.models.conditions.ConditionContext; import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.Execution;
@@ -29,10 +30,7 @@ import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceStateChangeEvent; import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.server.ServiceType; import io.kestra.core.server.ServiceType;
import io.kestra.core.services.*; import io.kestra.core.services.*;
import io.kestra.core.utils.Await; import io.kestra.core.utils.*;
import io.kestra.core.utils.Either;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.ListUtils;
import io.micronaut.context.ApplicationContext; import io.micronaut.context.ApplicationContext;
import io.micronaut.context.event.ApplicationEventPublisher; import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.core.util.CollectionUtils; import io.micronaut.core.util.CollectionUtils;
@@ -91,7 +89,9 @@ public abstract class AbstractScheduler implements Scheduler, Service {
private volatile Boolean isReady = false; private volatile Boolean isReady = false;
private final ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor(); private final ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> scheduledFuture;
private final ScheduledExecutorService executionMonitorExecutor = Executors.newSingleThreadScheduledExecutor(); private final ScheduledExecutorService executionMonitorExecutor = Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> executionMonitorFuture;
@Getter @Getter
protected SchedulerTriggerStateInterface triggerState; protected SchedulerTriggerStateInterface triggerState;
@@ -152,7 +152,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
this.flowListeners.run(); this.flowListeners.run();
this.flowListeners.listen(this::initializedTriggers); this.flowListeners.listen(this::initializedTriggers);
ScheduledFuture<?> evaluationLoop = scheduleExecutor.scheduleAtFixedRate( scheduledFuture = scheduleExecutor.scheduleAtFixedRate(
this::handle, this::handle,
0, 0,
1, 1,
@@ -162,10 +162,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
// look at exception on the evaluation loop thread // look at exception on the evaluation loop thread
Thread.ofVirtual().name("scheduler-evaluation-loop-watch").start( Thread.ofVirtual().name("scheduler-evaluation-loop-watch").start(
() -> { () -> {
Await.until(evaluationLoop::isDone); Await.until(scheduledFuture::isDone);
try { try {
evaluationLoop.get(); scheduledFuture.get();
} catch (CancellationException ignored) { } catch (CancellationException ignored) {
} catch (ExecutionException | InterruptedException e) { } catch (ExecutionException | InterruptedException e) {
@@ -177,7 +177,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
); );
// Periodically report metrics and logs of running executions // Periodically report metrics and logs of running executions
ScheduledFuture<?> monitoringLoop = executionMonitorExecutor.scheduleWithFixedDelay( executionMonitorFuture = executionMonitorExecutor.scheduleWithFixedDelay(
this::executionMonitor, this::executionMonitor,
30, 30,
10, 10,
@@ -187,10 +187,10 @@ public abstract class AbstractScheduler implements Scheduler, Service {
// look at exception on the monitoring loop thread // look at exception on the monitoring loop thread
Thread.ofVirtual().name("scheduler-monitoring-loop-watch").start( Thread.ofVirtual().name("scheduler-monitoring-loop-watch").start(
() -> { () -> {
Await.until(monitoringLoop::isDone); Await.until(executionMonitorFuture::isDone);
try { try {
monitoringLoop.get(); executionMonitorFuture.get();
} catch (CancellationException ignored) { } catch (CancellationException ignored) {
} catch (ExecutionException | InterruptedException e) { } catch (ExecutionException | InterruptedException e) {
@@ -318,7 +318,7 @@ public abstract class AbstractScheduler implements Scheduler, Service {
} }
synchronized (this) { // we need a sync block as we read then update so we should not do it in multiple threads concurrently synchronized (this) { // we need a sync block as we read then update so we should not do it in multiple threads concurrently
List<Trigger> triggers = triggerState.findAllForAllTenants(); Map<String, Trigger> triggers = triggerState.findAllForAllTenants().stream().collect(Collectors.toMap(HasUID::uid, Function.identity()));
flows flows
.stream() .stream()
@@ -328,7 +328,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
.flatMap(flow -> flow.getTriggers().stream().filter(trigger -> trigger instanceof WorkerTriggerInterface).map(trigger -> new FlowAndTrigger(flow, trigger))) .flatMap(flow -> flow.getTriggers().stream().filter(trigger -> trigger instanceof WorkerTriggerInterface).map(trigger -> new FlowAndTrigger(flow, trigger)))
.distinct() .distinct()
.forEach(flowAndTrigger -> { .forEach(flowAndTrigger -> {
Optional<Trigger> trigger = triggers.stream().filter(t -> t.uid().equals(Trigger.uid(flowAndTrigger.flow(), flowAndTrigger.trigger()))).findFirst(); // must have one or none String triggerUid = Trigger.uid(flowAndTrigger.flow(), flowAndTrigger.trigger());
Optional<Trigger> trigger = Optional.ofNullable(triggers.get(triggerUid));
if (trigger.isEmpty()) { if (trigger.isEmpty()) {
RunContext runContext = runContextFactory.of(flowAndTrigger.flow(), flowAndTrigger.trigger()); RunContext runContext = runContextFactory.of(flowAndTrigger.flow(), flowAndTrigger.trigger());
ConditionContext conditionContext = conditionService.conditionContext(runContext, flowAndTrigger.flow(), null); ConditionContext conditionContext = conditionService.conditionContext(runContext, flowAndTrigger.flow(), null);
@@ -467,9 +468,12 @@ public abstract class AbstractScheduler implements Scheduler, Service {
private List<FlowWithTriggers> computeSchedulable(List<FlowWithSource> flows, List<Trigger> triggerContextsToEvaluate, ScheduleContextInterface scheduleContext) { private List<FlowWithTriggers> computeSchedulable(List<FlowWithSource> flows, List<Trigger> triggerContextsToEvaluate, ScheduleContextInterface scheduleContext) {
List<String> flowToKeep = triggerContextsToEvaluate.stream().map(Trigger::getFlowId).toList(); List<String> flowToKeep = triggerContextsToEvaluate.stream().map(Trigger::getFlowId).toList();
List<String> flowIds = flows.stream().map(FlowId::uidWithoutRevision).toList();
Map<String, Trigger> triggerById = triggerContextsToEvaluate.stream().collect(Collectors.toMap(HasUID::uid, Function.identity()));
// delete trigger which flow has been deleted
triggerContextsToEvaluate.stream() triggerContextsToEvaluate.stream()
.filter(trigger -> !flows.stream().map(FlowId::uidWithoutRevision).toList().contains(FlowId.uid(trigger))) .filter(trigger -> !flowIds.contains(FlowId.uid(trigger)))
.forEach(trigger -> { .forEach(trigger -> {
try { try {
this.triggerState.delete(trigger); this.triggerState.delete(trigger);
@@ -491,12 +495,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
.map(abstractTrigger -> { .map(abstractTrigger -> {
RunContext runContext = runContextFactory.of(flow, abstractTrigger); RunContext runContext = runContextFactory.of(flow, abstractTrigger);
ConditionContext conditionContext = conditionService.conditionContext(runContext, flow, null); ConditionContext conditionContext = conditionService.conditionContext(runContext, flow, null);
Trigger triggerContext = null; Trigger triggerContext;
Trigger lastTrigger = triggerContextsToEvaluate Trigger lastTrigger = triggerById.get(Trigger.uid(flow, abstractTrigger));
.stream()
.filter(triggerContextToFind -> triggerContextToFind.uid().equals(Trigger.uid(flow, abstractTrigger)))
.findFirst()
.orElse(null);
// If a trigger is not found in triggers to evaluate, then we ignore it // If a trigger is not found in triggers to evaluate, then we ignore it
if (lastTrigger == null) { if (lastTrigger == null) {
return null; return null;
@@ -1006,8 +1006,8 @@ public abstract class AbstractScheduler implements Scheduler, Service {
setState(ServiceState.TERMINATING); setState(ServiceState.TERMINATING);
this.receiveCancellations.forEach(Runnable::run); this.receiveCancellations.forEach(Runnable::run);
this.scheduleExecutor.shutdown(); ExecutorsUtils.closeScheduledThreadPool(this.scheduleExecutor, Duration.ofSeconds(5), List.of(scheduledFuture));
this.executionMonitorExecutor.shutdown(); ExecutorsUtils.closeScheduledThreadPool(executionMonitorExecutor, Duration.ofSeconds(5), List.of(executionMonitorFuture));
try { try {
if (onClose != null) { if (onClose != null) {
onClose.run(); onClose.run();

View File

@@ -1,6 +1,7 @@
package io.kestra.core.server; package io.kestra.core.server;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import io.kestra.core.utils.ExecutorsUtils;
import io.micronaut.core.annotation.Introspected; import io.micronaut.core.annotation.Introspected;
import jakarta.annotation.PostConstruct; import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy; import jakarta.annotation.PreDestroy;
@@ -8,9 +9,11 @@ import lombok.extern.slf4j.Slf4j;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@@ -25,6 +28,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
protected final ServerConfig serverConfig; protected final ServerConfig serverConfig;
private final AtomicBoolean isStopped = new AtomicBoolean(false); private final AtomicBoolean isStopped = new AtomicBoolean(false);
private ScheduledExecutorService scheduledExecutorService; private ScheduledExecutorService scheduledExecutorService;
private ScheduledFuture<?> scheduledFuture;
private Instant lastScheduledExecution; private Instant lastScheduledExecution;
/** /**
@@ -98,7 +102,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, name)); scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, name));
Duration scheduleInterval = getScheduleInterval(); Duration scheduleInterval = getScheduleInterval();
log.debug("Scheduling '{}' at fixed rate {}.", name, scheduleInterval); log.debug("Scheduling '{}' at fixed rate {}.", name, scheduleInterval);
scheduledExecutorService.scheduleAtFixedRate( scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(
this, this,
0, 0,
scheduleInterval.toSeconds(), scheduleInterval.toSeconds(),
@@ -133,20 +137,7 @@ public abstract class AbstractServiceLivenessTask implements Runnable, AutoClose
@Override @Override
public void close() { public void close() {
if (isStopped.compareAndSet(false, true) && scheduledExecutorService != null) { if (isStopped.compareAndSet(false, true) && scheduledExecutorService != null) {
scheduledExecutorService.shutdown(); ExecutorsUtils.closeScheduledThreadPool(scheduledExecutorService, Duration.ofSeconds(5), List.of(scheduledFuture));
if (scheduledExecutorService.isTerminated()) {
return;
}
try {
if (!scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS)) {
log.debug("Failed to wait for scheduled '{}' task termination. Cause: Timeout", name);
}
log.debug("Stopped scheduled '{}' task.", name);
} catch (InterruptedException e) {
scheduledExecutorService.shutdownNow();
Thread.currentThread().interrupt();
log.debug("Failed to wait for scheduled '{}' task termination. Cause: Interrupted.", name);
}
} }
} }
} }

View File

@@ -9,6 +9,7 @@ import io.kestra.core.models.flows.FlowInterface;
import io.kestra.core.models.flows.FlowWithException; import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource; import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.GenericFlow; import io.kestra.core.models.flows.GenericFlow;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.topologies.FlowTopology; import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.models.validations.ModelValidator; import io.kestra.core.models.validations.ModelValidator;
@@ -51,7 +52,6 @@ import java.util.stream.StreamSupport;
@Singleton @Singleton
@Slf4j @Slf4j
public class FlowService { public class FlowService {
@Inject @Inject
Optional<FlowRepositoryInterface> flowRepository; Optional<FlowRepositoryInterface> flowRepository;
@@ -236,6 +236,7 @@ public class FlowService {
} }
List<String> warnings = new ArrayList<>(checkValidSubflows(flow, tenantId)); List<String> warnings = new ArrayList<>(checkValidSubflows(flow, tenantId));
List<io.kestra.plugin.core.trigger.Flow> flowTriggers = ListUtils.emptyOnNull(flow.getTriggers()).stream() List<io.kestra.plugin.core.trigger.Flow> flowTriggers = ListUtils.emptyOnNull(flow.getTriggers()).stream()
.filter(io.kestra.plugin.core.trigger.Flow.class::isInstance) .filter(io.kestra.plugin.core.trigger.Flow.class::isInstance)
.map(io.kestra.plugin.core.trigger.Flow.class::cast) .map(io.kestra.plugin.core.trigger.Flow.class::cast)
@@ -246,6 +247,21 @@ public class FlowService {
} }
}); });
// add warning for runnable properties (timeout, workerGroup, taskCache) when used not in a runnable
flow.allTasksWithChilds().forEach(task -> {
if (!(task instanceof RunnableTask<?>)) {
if (task.getTimeout() != null) {
warnings.add("The task '" + task.getId() + "' cannot use the 'timeout' property as it's only relevant for runnable tasks.");
}
if (task.getTaskCache() != null) {
warnings.add("The task '" + task.getId() + "' cannot use the 'taskCache' property as it's only relevant for runnable tasks.");
}
if (task.getWorkerGroup() != null) {
warnings.add("The task '" + task.getId() + "' cannot use the 'workerGroup' property as it's only relevant for runnable tasks.");
}
}
});
return warnings; return warnings;
} }

View File

@@ -1,6 +1,7 @@
package io.kestra.core.services; package io.kestra.core.services;
import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKind;
import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithException; import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource; import io.kestra.core.models.flows.FlowWithSource;
@@ -10,7 +11,6 @@ import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInte
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow; import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.core.runners.RunContextFactory; import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.utils.ListUtils; import io.kestra.core.utils.ListUtils;
import jakarta.inject.Inject;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;
@@ -24,14 +24,15 @@ import java.util.stream.Stream;
@Singleton @Singleton
public class FlowTriggerService { public class FlowTriggerService {
@Inject private final ConditionService conditionService;
private ConditionService conditionService; private final RunContextFactory runContextFactory;
private final FlowService flowService;
@Inject public FlowTriggerService(ConditionService conditionService, RunContextFactory runContextFactory, FlowService flowService) {
private RunContextFactory runContextFactory; this.conditionService = conditionService;
this.runContextFactory = runContextFactory;
@Inject this.flowService = flowService;
private FlowService flowService; }
// used in EE only // used in EE only
public Stream<FlowWithFlowTrigger> withFlowTriggersOnly(Stream<FlowWithSource> allFlows) { public Stream<FlowWithFlowTrigger> withFlowTriggersOnly(Stream<FlowWithSource> allFlows) {
@@ -53,6 +54,8 @@ public class FlowTriggerService {
List<FlowWithFlowTrigger> validTriggersBeforeMultipleConditionEval = allFlows.stream() List<FlowWithFlowTrigger> validTriggersBeforeMultipleConditionEval = allFlows.stream()
// prevent recursive flow triggers // prevent recursive flow triggers
.filter(flow -> flowService.removeUnwanted(flow, execution)) .filter(flow -> flowService.removeUnwanted(flow, execution))
// filter out Test Executions
.filter(flow -> execution.getKind() == null)
// ensure flow & triggers are enabled // ensure flow & triggers are enabled
.filter(flow -> !flow.isDisabled() && !(flow instanceof FlowWithException)) .filter(flow -> !flow.isDisabled() && !(flow instanceof FlowWithException))
.filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty()) .filter(flow -> flow.getTriggers() != null && !flow.getTriggers().isEmpty())

View File

@@ -173,18 +173,15 @@ public class PluginDefaultService {
try { try {
return this.injectAllDefaults(flow, false); return this.injectAllDefaults(flow, false);
} catch (Exception e) { } catch (Exception e) {
RunContextLogger try {
.logEntries( logQueue.emitAsync(RunContextLogger
Execution.loggingEventFromException(e), .logEntries(
LogEntry.of(execution) Execution.loggingEventFromException(e),
) LogEntry.of(execution)
.forEach(logEntry -> { ));
try { } catch (QueueException e1) {
logQueue.emitAsync(logEntry); // silently do nothing
} catch (QueueException e1) { }
// silently do nothing
}
});
return readWithoutDefaultsOrThrow(flow); return readWithoutDefaultsOrThrow(flow);
} }
} }

View File

@@ -3,12 +3,16 @@ package io.kestra.core.utils;
import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics; import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.*; import java.util.concurrent.*;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
@Singleton @Singleton
@Slf4j
public class ExecutorsUtils { public class ExecutorsUtils {
@Inject @Inject
private ThreadMainFactoryBuilder threadFactoryBuilder; private ThreadMainFactoryBuilder threadFactoryBuilder;
@@ -61,6 +65,29 @@ public class ExecutorsUtils {
); );
} }
public static void closeScheduledThreadPool(ScheduledExecutorService scheduledExecutorService, Duration gracePeriod, List<ScheduledFuture<?>> taskFutures) {
scheduledExecutorService.shutdown();
if (scheduledExecutorService.isTerminated()) {
return;
}
try {
if (!scheduledExecutorService.awaitTermination(gracePeriod.toMillis(), TimeUnit.MILLISECONDS)) {
log.warn("Failed to shutdown the ScheduledThreadPoolExecutor during grace period, forcing it to shutdown now");
// Ensure the scheduled task reaches a terminal state to avoid possible memory leak
ListUtils.emptyOnNull(taskFutures).forEach(taskFuture -> taskFuture.cancel(true));
scheduledExecutorService.shutdownNow();
}
log.debug("Stopped scheduled ScheduledThreadPoolExecutor.");
} catch (InterruptedException e) {
scheduledExecutorService.shutdownNow();
Thread.currentThread().interrupt();
log.debug("Failed to shutdown the ScheduledThreadPoolExecutor.");
}
}
private ExecutorService wrap(String name, ExecutorService executorService) { private ExecutorService wrap(String name, ExecutorService executorService) {
return ExecutorServiceMetrics.monitor( return ExecutorServiceMetrics.monitor(
meterRegistry, meterRegistry,

View File

@@ -54,9 +54,10 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
violations.add("Namespace '" + value.getNamespace() + "' does not exist but is required to exist before a flow can be created in it."); violations.add("Namespace '" + value.getNamespace() + "' does not exist but is required to exist before a flow can be created in it.");
} }
List<Task> allTasks = value.allTasksWithChilds();
// tasks unique id // tasks unique id
List<String> taskIds = value.allTasksWithChilds() List<String> taskIds = allTasks.stream()
.stream()
.map(Task::getId) .map(Task::getId)
.toList(); .toList();
@@ -72,8 +73,8 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
violations.add("Duplicate trigger id with name [" + String.join(", ", duplicateIds) + "]"); violations.add("Duplicate trigger id with name [" + String.join(", ", duplicateIds) + "]");
} }
value.allTasksWithChilds() allTasks.stream()
.stream().filter(task -> task instanceof ExecutableTask<?> executableTask .filter(task -> task instanceof ExecutableTask<?> executableTask
&& value.getId().equals(executableTask.subflowId().flowId()) && value.getId().equals(executableTask.subflowId().flowId())
&& value.getNamespace().equals(executableTask.subflowId().namespace())) && value.getNamespace().equals(executableTask.subflowId().namespace()))
.forEach(task -> violations.add("Recursive call to flow [" + value.getNamespace() + "." + value.getId() + "]")); .forEach(task -> violations.add("Recursive call to flow [" + value.getNamespace() + "." + value.getId() + "]"));
@@ -102,7 +103,7 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
.map(input -> Pattern.compile("\\{\\{\\s*inputs." + input.getId() + "\\s*\\}\\}")) .map(input -> Pattern.compile("\\{\\{\\s*inputs." + input.getId() + "\\s*\\}\\}"))
.collect(Collectors.toList()); .collect(Collectors.toList());
List<String> invalidTasks = value.allTasks() List<String> invalidTasks = allTasks.stream()
.filter(task -> checkObjectFieldsWithPatterns(task, inputsWithMinusPatterns)) .filter(task -> checkObjectFieldsWithPatterns(task, inputsWithMinusPatterns))
.map(task -> task.getId()) .map(task -> task.getId())
.collect(Collectors.toList()); .collect(Collectors.toList());
@@ -112,12 +113,12 @@ public class FlowValidator implements ConstraintValidator<FlowValidation, Flow>
" [" + String.join(", ", invalidTasks) + "]"); " [" + String.join(", ", invalidTasks) + "]");
} }
List<Pattern> outputsWithMinusPattern = value.allTasks() List<Pattern> outputsWithMinusPattern = allTasks.stream()
.filter(output -> Optional.ofNullable(output.getId()).orElse("").contains("-")) .filter(output -> Optional.ofNullable(output.getId()).orElse("").contains("-"))
.map(output -> Pattern.compile("\\{\\{\\s*outputs\\." + output.getId() + "\\.[^}]+\\s*\\}\\}")) .map(output -> Pattern.compile("\\{\\{\\s*outputs\\." + output.getId() + "\\.[^}]+\\s*\\}\\}"))
.collect(Collectors.toList()); .collect(Collectors.toList());
invalidTasks = value.allTasks() invalidTasks = allTasks.stream()
.filter(task -> checkObjectFieldsWithPatterns(task, outputsWithMinusPattern)) .filter(task -> checkObjectFieldsWithPatterns(task, outputsWithMinusPattern))
.map(task -> task.getId()) .map(task -> task.getId())
.collect(Collectors.toList()); .collect(Collectors.toList());

View File

@@ -90,7 +90,7 @@ public class ExecutionOutputs extends Condition implements ScheduleCondition {
private static final String OUTPUTS_VAR = "outputs"; private static final String OUTPUTS_VAR = "outputs";
@NotNull @NotNull
private Property<String> expression; private Property<Boolean> expression;
/** {@inheritDoc} **/ /** {@inheritDoc} **/
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@@ -105,9 +105,8 @@ public class ExecutionOutputs extends Condition implements ScheduleCondition {
conditionContext.getVariables(), conditionContext.getVariables(),
Map.of(TRIGGER_VAR, Map.of(OUTPUTS_VAR, conditionContext.getExecution().getOutputs())) Map.of(TRIGGER_VAR, Map.of(OUTPUTS_VAR, conditionContext.getExecution().getOutputs()))
); );
String render = conditionContext.getRunContext().render(expression).as(String.class, variables).orElseThrow(); return conditionContext.getRunContext().render(expression).skipCache().as(Boolean.class, variables).orElseThrow();
return !(render.isBlank() || render.trim().equals("false"));
} }
private boolean hasNoOutputs(final Execution execution) { private boolean hasNoOutputs(final Execution execution) {

View File

@@ -19,7 +19,6 @@ import lombok.experimental.SuperBuilder;
@NoArgsConstructor @NoArgsConstructor
@JsonInclude(JsonInclude.Include.NON_DEFAULT) @JsonInclude(JsonInclude.Include.NON_DEFAULT)
@EqualsAndHashCode @EqualsAndHashCode
//@TriggersDataFilterValidation
@Schema( @Schema(
title = "Display Execution data in a dashboard chart.", title = "Display Execution data in a dashboard chart.",
description = "Execution data can be displayed in charts broken out by Namespace and filtered by State, for example." description = "Execution data can be displayed in charts broken out by Namespace and filtered by State, for example."

View File

@@ -208,48 +208,50 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output>, Chi
return Optional.empty(); return Optional.empty();
} }
boolean isOutputsAllowed = runContext
.<Boolean>pluginConfiguration(PLUGIN_FLOW_OUTPUTS_ENABLED)
.orElse(true);
final Output.OutputBuilder builder = Output.builder() final Output.OutputBuilder builder = Output.builder()
.executionId(execution.getId()) .executionId(execution.getId())
.state(execution.getState().getCurrent()); .state(execution.getState().getCurrent());
final Map<String, Object> subflowOutputs = Optional
.ofNullable(flow.getOutputs())
.map(outputs -> outputs
.stream()
.collect(Collectors.toMap(
io.kestra.core.models.flows.Output::getId,
io.kestra.core.models.flows.Output::getValue)
)
)
.orElseGet(() -> isOutputsAllowed ? this.getOutputs() : null);
VariablesService variablesService = ((DefaultRunContext) runContext).getApplicationContext().getBean(VariablesService.class); VariablesService variablesService = ((DefaultRunContext) runContext).getApplicationContext().getBean(VariablesService.class);
if (subflowOutputs != null) { if (this.wait) { // we only compute outputs if we wait for the subflow
try { boolean isOutputsAllowed = runContext
Map<String, Object> outputs = runContext.render(subflowOutputs); .<Boolean>pluginConfiguration(PLUGIN_FLOW_OUTPUTS_ENABLED)
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking .orElse(true);
if (flow.getOutputs() != null && flowInputOutput != null) {
outputs = flowInputOutput.typedOutputs(flow, execution, outputs);
}
builder.outputs(outputs);
} catch (Exception e) {
runContext.logger().warn("Failed to extract outputs with the error: '{}'", e.getLocalizedMessage(), e);
var state = State.Type.fail(this);
Variables variables = variablesService.of(StorageContext.forTask(taskRun), builder.build());
taskRun = taskRun
.withState(state)
.withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(state)).build()))
.withOutputs(variables);
return Optional.of(SubflowExecutionResult.builder() final Map<String, Object> subflowOutputs = Optional
.executionId(execution.getId()) .ofNullable(flow.getOutputs())
.state(State.Type.FAILED) .map(outputs -> outputs
.parentTaskRun(taskRun) .stream()
.build()); .collect(Collectors.toMap(
io.kestra.core.models.flows.Output::getId,
io.kestra.core.models.flows.Output::getValue)
)
)
.orElseGet(() -> isOutputsAllowed ? this.getOutputs() : null);
if (subflowOutputs != null) {
try {
Map<String, Object> outputs = runContext.render(subflowOutputs);
FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class); // this is hacking
if (flow.getOutputs() != null && flowInputOutput != null) {
outputs = flowInputOutput.typedOutputs(flow, execution, outputs);
}
builder.outputs(outputs);
} catch (Exception e) {
runContext.logger().warn("Failed to extract outputs with the error: '{}'", e.getLocalizedMessage(), e);
var state = State.Type.fail(this);
Variables variables = variablesService.of(StorageContext.forTask(taskRun), builder.build());
taskRun = taskRun
.withState(state)
.withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(state)).build()))
.withOutputs(variables);
return Optional.of(SubflowExecutionResult.builder()
.executionId(execution.getId())
.state(State.Type.FAILED)
.parentTaskRun(taskRun)
.build());
}
} }
} }

View File

@@ -9,11 +9,11 @@ import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContext;
import io.kestra.core.services.FlowService; import io.kestra.core.services.FlowService;
import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.Builder; import lombok.Builder;
import lombok.Getter; import lombok.Getter;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
import org.codehaus.commons.nullanalysis.NotNull;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;

View File

@@ -103,8 +103,10 @@ public class Set extends Task implements RunnableTask<VoidOutput> {
KVStore kvStore = runContext.namespaceKv(renderedNamespace); KVStore kvStore = runContext.namespaceKv(renderedNamespace);
if (kvType != null && renderedValue instanceof String renderedValueStr) { if (kvType != null){
renderedValue = switch (runContext.render(kvType).as(KVType.class).orElseThrow()) { KVType renderedKvType = runContext.render(kvType).as(KVType.class).orElseThrow();
if (renderedValue instanceof String renderedValueStr) {
renderedValue = switch (renderedKvType) {
case NUMBER -> JacksonMapper.ofJson().readValue(renderedValueStr, Number.class); case NUMBER -> JacksonMapper.ofJson().readValue(renderedValueStr, Number.class);
case BOOLEAN -> Boolean.parseBoolean((String) renderedValue); case BOOLEAN -> Boolean.parseBoolean((String) renderedValue);
case DATETIME, DATE -> Instant.parse(renderedValueStr); case DATETIME, DATE -> Instant.parse(renderedValueStr);
@@ -112,7 +114,10 @@ public class Set extends Task implements RunnableTask<VoidOutput> {
case JSON -> JacksonMapper.toObject(renderedValueStr); case JSON -> JacksonMapper.toObject(renderedValueStr);
default -> renderedValue; default -> renderedValue;
}; };
} else if (renderedValue instanceof Number valueNumber && renderedKvType == KVType.STRING) {
renderedValue = valueNumber.toString();
} }
}
kvStore.put(renderedKey, new KVValueAndMetadata( kvStore.put(renderedKey, new KVValueAndMetadata(
new KVMetadata( new KVMetadata(

View File

@@ -56,7 +56,8 @@ public class OverrideRetryInterceptor implements MethodInterceptor<Object, Objec
retry.get("delay", Duration.class).orElse(Duration.ofSeconds(1)), retry.get("delay", Duration.class).orElse(Duration.ofSeconds(1)),
retry.get("maxDelay", Duration.class).orElse(null), retry.get("maxDelay", Duration.class).orElse(null),
new DefaultRetryPredicate(resolveIncludes(retry, "includes"), resolveIncludes(retry, "excludes")), new DefaultRetryPredicate(resolveIncludes(retry, "includes"), resolveIncludes(retry, "excludes")),
Throwable.class Throwable.class,
0
); );
MutableConvertibleValues<Object> attrs = context.getAttributes(); MutableConvertibleValues<Object> attrs = context.getAttributes();

View File

@@ -0,0 +1,11 @@
<svg width="512" height="512" viewBox="0 0 512 512" fill="currentColor" xmlns="http://www.w3.org/2000/svg">
<g clip-path="url(#clip0_1765_9330)">
<path d="M244.592 215.915C251.569 208.938 262.881 208.938 269.858 215.915L298.537 244.595C305.514 251.572 305.514 262.883 298.537 269.86L269.858 298.54C262.881 305.517 251.569 305.517 244.592 298.54L215.913 269.86C208.936 262.883 208.936 251.572 215.913 244.595L244.592 215.915Z" />
<path d="M376.685 215.687C383.537 208.835 394.646 208.835 401.498 215.687L430.63 244.818C437.482 251.67 437.482 262.78 430.63 269.632L401.498 298.763C394.646 305.615 383.537 305.615 376.685 298.763L347.553 269.632C340.701 262.78 340.701 251.67 347.553 244.818L376.685 215.687Z" />
<path d="M244.818 83.8243C251.671 76.9722 262.78 76.9722 269.632 83.8243L298.763 112.956C305.616 119.808 305.616 130.917 298.763 137.769L269.632 166.901C262.78 173.753 251.671 173.753 244.818 166.901L215.687 137.769C208.835 130.917 208.835 119.808 215.687 112.956L244.818 83.8243Z" />
<path d="M232.611 178.663C239.588 185.64 239.588 196.951 232.611 203.928L203.931 232.608C196.955 239.585 185.643 239.585 178.666 232.608L149.986 203.928C143.01 196.952 143.01 185.64 149.986 178.663L178.666 149.983C185.643 143.006 196.955 143.006 203.931 149.983L232.611 178.663Z" />
<path d="M166.901 244.818C173.753 251.67 173.753 262.78 166.901 269.632L137.77 298.763C130.918 305.615 119.808 305.615 112.956 298.763L83.8246 269.632C76.9725 262.78 76.9725 251.67 83.8246 244.818L112.956 215.687C119.808 208.835 130.918 208.835 137.77 215.687L166.901 244.818Z" />
<path d="M364.472 178.663C371.449 185.64 371.449 196.951 364.472 203.928L335.793 232.608C328.816 239.585 317.504 239.585 310.527 232.608L281.848 203.928C274.871 196.952 274.871 185.64 281.848 178.663L310.527 149.983C317.504 143.006 328.816 143.006 335.793 149.983L364.472 178.663Z" />
<path d="M285.45 367.015C301.037 382.602 301.037 407.873 285.45 423.46C269.863 439.047 244.591 439.047 229.004 423.46C213.417 407.873 213.417 382.602 229.004 367.015C244.591 351.428 269.863 351.428 285.45 367.015Z" />
</g>
</svg>

After

Width:  |  Height:  |  Size: 2.1 KiB

View File

@@ -0,0 +1,11 @@
<svg width="512" height="512" viewBox="0 0 512 512" fill="currentColor" xmlns="http://www.w3.org/2000/svg">
<g clip-path="url(#clip0_1765_9330)">
<path d="M244.592 215.915C251.569 208.938 262.881 208.938 269.858 215.915L298.537 244.595C305.514 251.572 305.514 262.883 298.537 269.86L269.858 298.54C262.881 305.517 251.569 305.517 244.592 298.54L215.913 269.86C208.936 262.883 208.936 251.572 215.913 244.595L244.592 215.915Z" />
<path d="M376.685 215.687C383.537 208.835 394.646 208.835 401.498 215.687L430.63 244.818C437.482 251.67 437.482 262.78 430.63 269.632L401.498 298.763C394.646 305.615 383.537 305.615 376.685 298.763L347.553 269.632C340.701 262.78 340.701 251.67 347.553 244.818L376.685 215.687Z" />
<path d="M244.818 83.8243C251.671 76.9722 262.78 76.9722 269.632 83.8243L298.763 112.956C305.616 119.808 305.616 130.917 298.763 137.769L269.632 166.901C262.78 173.753 251.671 173.753 244.818 166.901L215.687 137.769C208.835 130.917 208.835 119.808 215.687 112.956L244.818 83.8243Z" />
<path d="M232.611 178.663C239.588 185.64 239.588 196.951 232.611 203.928L203.931 232.608C196.955 239.585 185.643 239.585 178.666 232.608L149.986 203.928C143.01 196.952 143.01 185.64 149.986 178.663L178.666 149.983C185.643 143.006 196.955 143.006 203.931 149.983L232.611 178.663Z" />
<path d="M166.901 244.818C173.753 251.67 173.753 262.78 166.901 269.632L137.77 298.763C130.918 305.615 119.808 305.615 112.956 298.763L83.8246 269.632C76.9725 262.78 76.9725 251.67 83.8246 244.818L112.956 215.687C119.808 208.835 130.918 208.835 137.77 215.687L166.901 244.818Z" />
<path d="M364.472 178.663C371.449 185.64 371.449 196.951 364.472 203.928L335.793 232.608C328.816 239.585 317.504 239.585 310.527 232.608L281.848 203.928C274.871 196.952 274.871 185.64 281.848 178.663L310.527 149.983C317.504 143.006 328.816 143.006 335.793 149.983L364.472 178.663Z" />
<path d="M285.45 367.015C301.037 382.602 301.037 407.873 285.45 423.46C269.863 439.047 244.591 439.047 229.004 423.46C213.417 407.873 213.417 382.602 229.004 367.015C244.591 351.428 269.863 351.428 285.45 367.015Z" />
</g>
</svg>

After

Width:  |  Height:  |  Size: 2.1 KiB

View File

@@ -112,7 +112,7 @@ class JsonSchemaGeneratorTest {
var requiredWithDefault = definitions.get("io.kestra.core.docs.JsonSchemaGeneratorTest-RequiredWithDefault"); var requiredWithDefault = definitions.get("io.kestra.core.docs.JsonSchemaGeneratorTest-RequiredWithDefault");
assertThat(requiredWithDefault, is(notNullValue())); assertThat(requiredWithDefault, is(notNullValue()));
assertThat((List<String>) requiredWithDefault.get("required"), not(contains("requiredWithDefault"))); assertThat((List<String>) requiredWithDefault.get("required"), not(containsInAnyOrder("requiredWithDefault", "anotherRequiredWithDefault")));
var properties = (Map<String, Map<String, Object>>) flow.get("properties"); var properties = (Map<String, Map<String, Object>>) flow.get("properties");
var listeners = properties.get("listeners"); var listeners = properties.get("listeners");
@@ -253,7 +253,7 @@ class JsonSchemaGeneratorTest {
void requiredAreRemovedIfThereIsADefault() { void requiredAreRemovedIfThereIsADefault() {
Map<String, Object> generate = jsonSchemaGenerator.properties(Task.class, RequiredWithDefault.class); Map<String, Object> generate = jsonSchemaGenerator.properties(Task.class, RequiredWithDefault.class);
assertThat(generate, is(not(nullValue()))); assertThat(generate, is(not(nullValue())));
assertThat((List<String>) generate.get("required"), not(containsInAnyOrder("requiredWithDefault"))); assertThat((List<String>) generate.get("required"), not(containsInAnyOrder("requiredWithDefault", "anotherRequiredWithDefault")));
assertThat((List<String>) generate.get("required"), containsInAnyOrder("requiredWithNoDefault")); assertThat((List<String>) generate.get("required"), containsInAnyOrder("requiredWithNoDefault"));
} }
@@ -466,6 +466,11 @@ class JsonSchemaGeneratorTest {
@Builder.Default @Builder.Default
private Property<TaskWithEnum.TestClass> requiredWithDefault = Property.ofValue(TaskWithEnum.TestClass.builder().testProperty("test").build()); private Property<TaskWithEnum.TestClass> requiredWithDefault = Property.ofValue(TaskWithEnum.TestClass.builder().testProperty("test").build());
@PluginProperty
@NotNull
@Builder.Default
private Property<TaskWithEnum.TestClass> anotherRequiredWithDefault = Property.ofValue(TaskWithEnum.TestClass.builder().testProperty("test2").build());
@PluginProperty @PluginProperty
@NotNull @NotNull
private Property<TaskWithEnum.TestClass> requiredWithNoDefault; private Property<TaskWithEnum.TestClass> requiredWithNoDefault;

View File

@@ -44,6 +44,7 @@ import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.*; import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static io.kestra.core.models.flows.FlowScope.USER; import static io.kestra.core.models.flows.FlowScope.USER;
@@ -740,4 +741,16 @@ public abstract class AbstractExecutionRepositoryTest {
executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters); executions = executionRepository.find(Pageable.from(1, 10), MAIN_TENANT, filters);
assertThat(executions.size()).isEqualTo(0L); assertThat(executions.size()).isEqualTo(0L);
} }
@Test
protected void shouldReturnLastExecutionsWhenInputsAreNull() {
inject();
List<Execution> lastExecutions = executionRepository.lastExecutions(MAIN_TENANT, null);
assertThat(lastExecutions).isNotEmpty();
Set<String> flowIds = lastExecutions.stream().map(Execution::getFlowId).collect(Collectors.toSet());
assertThat(flowIds.size()).isEqualTo(lastExecutions.size());
}
} }

View File

@@ -387,6 +387,13 @@ public abstract class AbstractRunnerTest {
forEachItemCaseTest.forEachItemInIf(); forEachItemCaseTest.forEachItemInIf();
} }
@Test
@LoadFlows({"flows/valids/for-each-item-subflow-after-execution.yaml",
"flows/valids/for-each-item-after-execution.yaml"})
protected void forEachItemWithAfterExecution() throws Exception {
forEachItemCaseTest.forEachItemWithAfterExecution();
}
@Test @Test
@LoadFlows({"flows/valids/flow-concurrency-cancel.yml"}) @LoadFlows({"flows/valids/flow-concurrency-cancel.yml"})
void concurrencyCancel() throws Exception { void concurrencyCancel() throws Exception {
@@ -423,6 +430,18 @@ public abstract class AbstractRunnerTest {
flowConcurrencyCaseTest.flowConcurrencyWithForEachItem(); flowConcurrencyCaseTest.flowConcurrencyWithForEachItem();
} }
@Test
@LoadFlows({"flows/valids/flow-concurrency-queue-fail.yml"})
protected void concurrencyQueueRestarted() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueueRestarted();
}
@Test
@LoadFlows({"flows/valids/flow-concurrency-queue-after-execution.yml"})
void concurrencyQueueAfterExecution() throws Exception {
flowConcurrencyCaseTest.flowConcurrencyQueueAfterExecution();
}
@Test @Test
@ExecuteFlow("flows/valids/executable-fail.yml") @ExecuteFlow("flows/valids/executable-fail.yml")
void badExecutable(Execution execution) { void badExecutable(Execution execution) {

View File

@@ -8,6 +8,7 @@ import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface; import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.FlowRepositoryInterface; import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.ExecutionService;
import io.kestra.core.storages.StorageInterface; import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.TestsUtils; import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject; import jakarta.inject.Inject;
@@ -53,6 +54,9 @@ public class FlowConcurrencyCaseTest {
@Named(QueueFactoryInterface.EXECUTION_NAMED) @Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue; protected QueueInterface<Execution> executionQueue;
@Inject
private ExecutionService executionService;
public void flowConcurrencyCancel() throws TimeoutException, QueueException, InterruptedException { public void flowConcurrencyCancel() throws TimeoutException, QueueException, InterruptedException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel", null, null, Duration.ofSeconds(30)); Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel", null, null, Duration.ofSeconds(30));
Execution execution2 = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel"); Execution execution2 = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-cancel");
@@ -278,6 +282,115 @@ public class FlowConcurrencyCaseTest {
assertThat(terminated.getState().getCurrent()).isEqualTo(Type.SUCCESS); assertThat(terminated.getState().getCurrent()).isEqualTo(Type.SUCCESS);
} }
public void flowConcurrencyQueueRestarted() throws Exception {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-fail", null, null, Duration.ofSeconds(30));
Flow flow = flowRepository
.findById(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-fail", Optional.empty())
.orElseThrow();
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
executionQueue.emit(execution2);
assertThat(execution1.getState().isRunning()).isTrue();
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CREATED);
var executionResult1 = new AtomicReference<Execution>();
var executionResult2 = new AtomicReference<Execution>();
CountDownLatch latch1 = new CountDownLatch(2);
AtomicReference<Execution> failedExecution = new AtomicReference<>();
CountDownLatch latch2 = new CountDownLatch(1);
CountDownLatch latch3 = new CountDownLatch(1);
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
if (e.getLeft().getId().equals(execution1.getId())) {
executionResult1.set(e.getLeft());
if (e.getLeft().getState().getCurrent() == Type.FAILED) {
failedExecution.set(e.getLeft());
latch1.countDown();
}
}
if (e.getLeft().getId().equals(execution2.getId())) {
executionResult2.set(e.getLeft());
if (e.getLeft().getState().getCurrent() == State.Type.RUNNING) {
latch2.countDown();
}
if (e.getLeft().getState().getCurrent() == Type.FAILED) {
latch3.countDown();
}
}
});
assertTrue(latch2.await(1, TimeUnit.MINUTES));
assertThat(failedExecution.get()).isNotNull();
// here the first fail and the second is now running.
// we restart the first one, it should be queued then fail again.
Execution restarted = executionService.restart(failedExecution.get(), null);
executionQueue.emit(restarted);
assertTrue(latch3.await(1, TimeUnit.MINUTES));
assertTrue(latch1.await(1, TimeUnit.MINUTES));
receive.blockLast();
assertThat(executionResult1.get().getState().getCurrent()).isEqualTo(Type.FAILED);
// it should have been queued after restarted
assertThat(executionResult1.get().getState().getHistories().stream().anyMatch(history -> history.getState() == Type.RESTARTED)).isTrue();
assertThat(executionResult1.get().getState().getHistories().stream().anyMatch(history -> history.getState() == Type.QUEUED)).isTrue();
assertThat(executionResult2.get().getState().getCurrent()).isEqualTo(Type.FAILED);
assertThat(executionResult2.get().getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
assertThat(executionResult2.get().getState().getHistories().get(1).getState()).isEqualTo(State.Type.QUEUED);
assertThat(executionResult2.get().getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
}
public void flowConcurrencyQueueAfterExecution() throws TimeoutException, QueueException, InterruptedException {
Execution execution1 = runnerUtils.runOneUntilRunning(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-after-execution", null, null, Duration.ofSeconds(30));
Flow flow = flowRepository
.findById(MAIN_TENANT, "io.kestra.tests", "flow-concurrency-queue-after-execution", Optional.empty())
.orElseThrow();
Execution execution2 = Execution.newExecution(flow, null, null, Optional.empty());
executionQueue.emit(execution2);
assertThat(execution1.getState().isRunning()).isTrue();
assertThat(execution2.getState().getCurrent()).isEqualTo(State.Type.CREATED);
var executionResult1 = new AtomicReference<Execution>();
var executionResult2 = new AtomicReference<Execution>();
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
CountDownLatch latch3 = new CountDownLatch(1);
Flux<Execution> receive = TestsUtils.receive(executionQueue, e -> {
if (e.getLeft().getId().equals(execution1.getId())) {
executionResult1.set(e.getLeft());
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
latch1.countDown();
}
}
if (e.getLeft().getId().equals(execution2.getId())) {
executionResult2.set(e.getLeft());
if (e.getLeft().getState().getCurrent() == State.Type.RUNNING) {
latch2.countDown();
}
if (e.getLeft().getState().getCurrent() == State.Type.SUCCESS) {
latch3.countDown();
}
}
});
assertTrue(latch1.await(1, TimeUnit.MINUTES));
assertTrue(latch2.await(1, TimeUnit.MINUTES));
assertTrue(latch3.await(1, TimeUnit.MINUTES));
receive.blockLast();
assertThat(executionResult1.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(executionResult2.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(executionResult2.get().getState().getHistories().getFirst().getState()).isEqualTo(State.Type.CREATED);
assertThat(executionResult2.get().getState().getHistories().get(1).getState()).isEqualTo(State.Type.QUEUED);
assertThat(executionResult2.get().getState().getHistories().get(2).getState()).isEqualTo(State.Type.RUNNING);
}
private URI storageUpload() throws URISyntaxException, IOException { private URI storageUpload() throws URISyntaxException, IOException {
File tempFile = File.createTempFile("file", ".txt"); File tempFile = File.createTempFile("file", ".txt");

View File

@@ -83,4 +83,24 @@ class RunContextPropertyTest {
runContextProperty = new RunContextProperty<>(Property.<Map<String, String>>builder().expression("{ \"key\": \"{{ key }}\"}").build(), runContext); runContextProperty = new RunContextProperty<>(Property.<Map<String, String>>builder().expression("{ \"key\": \"{{ key }}\"}").build(), runContext);
assertThat(runContextProperty.asMap(String.class, String.class, Map.of("key", "value"))).containsEntry("key", "value"); assertThat(runContextProperty.asMap(String.class, String.class, Map.of("key", "value"))).containsEntry("key", "value");
} }
@Test
void asShouldReturnCachedRenderedProperty() throws IllegalVariableEvaluationException {
var runContext = runContextFactory.of();
var runContextProperty = new RunContextProperty<>(Property.<String>builder().expression("{{ variable }}").build(), runContext);
assertThat(runContextProperty.as(String.class, Map.of("variable", "value1"))).isEqualTo(Optional.of("value1"));
assertThat(runContextProperty.as(String.class, Map.of("variable", "value2"))).isEqualTo(Optional.of("value1"));
}
@Test
void asShouldNotReturnCachedRenderedPropertyWithSkipCache() throws IllegalVariableEvaluationException {
var runContext = runContextFactory.of();
var runContextProperty = new RunContextProperty<>(Property.<String>builder().expression("{{ variable }}").build(), runContext);
assertThat(runContextProperty.as(String.class, Map.of("variable", "value1"))).isEqualTo(Optional.of("value1"));
assertThat(runContextProperty.skipCache().as(String.class, Map.of("variable", "value2"))).isEqualTo(Optional.of("value2"));
}
} }

View File

@@ -140,7 +140,7 @@ class RunContextTest {
List<LogEntry> logs = new CopyOnWriteArrayList<>(); List<LogEntry> logs = new CopyOnWriteArrayList<>();
Flux<LogEntry> receive = TestsUtils.receive(workerTaskLogQueue, either -> logs.add(either.getLeft())); Flux<LogEntry> receive = TestsUtils.receive(workerTaskLogQueue, either -> logs.add(either.getLeft()));
char[] chars = new char[1024 * 11]; char[] chars = new char[1024 * 16];
Arrays.fill(chars, 'a'); Arrays.fill(chars, 'a');
Map<String, Object> inputs = new HashMap<>(InputsTest.inputs); Map<String, Object> inputs = new HashMap<>(InputsTest.inputs);

View File

@@ -5,6 +5,7 @@ import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State; import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.Task;
import io.kestra.core.queues.QueueException; import io.kestra.core.queues.QueueException;
@@ -18,6 +19,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.Map;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@@ -77,8 +79,12 @@ public class TaskCacheTest {
@Plugin @Plugin
public static class CounterTask extends Task implements RunnableTask<CounterTask.Output> { public static class CounterTask extends Task implements RunnableTask<CounterTask.Output> {
private String workingDir;
@Override @Override
public Output run(RunContext runContext) throws Exception { public Output run(RunContext runContext) throws Exception {
Map<String, Object> variables = Map.of("workingDir", runContext.workingDir().path().toString());
runContext.render(this.workingDir, variables);
return Output.builder() return Output.builder()
.counter(COUNTER.incrementAndGet()) .counter(COUNTER.incrementAndGet())
.build(); .build();

View File

@@ -372,4 +372,44 @@ class FlowServiceTest {
assertThat(exceptions.size()).isZero(); assertThat(exceptions.size()).isZero();
} }
@Test
void shouldReturnValidationForRunnablePropsOnFlowable() {
// Given
String source = """
id: dolphin_164914
namespace: company.team
tasks:
- id: for
type: io.kestra.plugin.core.flow.ForEach
values: [1, 2, 3]
workerGroup:
key: toto
timeout: PT10S
taskCache:
enabled: true
tasks:
- id: hello
type: io.kestra.plugin.core.log.Log
message: Hello World! 🚀
workerGroup:
key: toto
timeout: PT10S
taskCache:
enabled: true
""";
// When
List<ValidateConstraintViolation> results = flowService.validate("my-tenant", source);
// Then
assertThat(results).hasSize(1);
assertThat(results.getFirst().getWarnings()).hasSize(3);
assertThat(results.getFirst().getWarnings()).containsExactlyInAnyOrder(
"The task 'for' cannot use the 'timeout' property as it's only relevant for runnable tasks.",
"The task 'for' cannot use the 'taskCache' property as it's only relevant for runnable tasks.",
"The task 'for' cannot use the 'workerGroup' property as it's only relevant for runnable tasks."
);
}
} }

View File

@@ -0,0 +1,142 @@
package io.kestra.core.services;
import io.kestra.core.context.TestRunContextFactory;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKind;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionStorageInterface;
import io.kestra.core.utils.IdUtils;
import io.kestra.plugin.core.log.Log;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Optional;
import static io.kestra.core.repositories.AbstractFlowRepositoryTest.TEST_NAMESPACE;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat;
@KestraTest
class FlowTriggerServiceTest {
public static final List<Label> EMPTY_LABELS = List.of();
public static final Optional<MultipleConditionStorageInterface> EMPTY_MULTIPLE_CONDITION_STORAGE = Optional.empty();
@Inject
private TestRunContextFactory runContextFactory;
@Inject
private ConditionService conditionService;
@Inject
private FlowService flowService;
private FlowTriggerService flowTriggerService;
@BeforeEach
void setUp() {
flowTriggerService = new FlowTriggerService(conditionService, runContextFactory, flowService);
}
@Test
void computeExecutionsFromFlowTriggers_ok() {
var simpleFlow = aSimpleFlow();
var flowWithFlowTrigger = Flow.builder()
.id("flow-with-flow-trigger")
.namespace(TEST_NAMESPACE)
.tenantId(MAIN_TENANT)
.tasks(List.of(simpleLogTask()))
.triggers(List.of(
flowTriggerWithNoConditions()
))
.build();
var simpleFlowExecution = Execution.newExecution(simpleFlow, EMPTY_LABELS).withState(State.Type.SUCCESS);
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
simpleFlowExecution,
List.of(simpleFlow, flowWithFlowTrigger),
EMPTY_MULTIPLE_CONDITION_STORAGE
);
assertThat(resultingExecutionsToRun).size().isEqualTo(1);
assertThat(resultingExecutionsToRun.get(0).getFlowId()).isEqualTo(flowWithFlowTrigger.getId());
}
@Test
void computeExecutionsFromFlowTriggers_filteringOutCreatedExecutions() {
var simpleFlow = aSimpleFlow();
var flowWithFlowTrigger = Flow.builder()
.id("flow-with-flow-trigger")
.namespace(TEST_NAMESPACE)
.tenantId(MAIN_TENANT)
.tasks(List.of(simpleLogTask()))
.triggers(List.of(
flowTriggerWithNoConditions()
))
.build();
var simpleFlowExecution = Execution.newExecution(simpleFlow, EMPTY_LABELS).withState(State.Type.CREATED);
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
simpleFlowExecution,
List.of(simpleFlow, flowWithFlowTrigger),
EMPTY_MULTIPLE_CONDITION_STORAGE
);
assertThat(resultingExecutionsToRun).size().isEqualTo(0);
}
@Test
void computeExecutionsFromFlowTriggers_filteringOutTestExecutions() {
var simpleFlow = aSimpleFlow();
var flowWithFlowTrigger = Flow.builder()
.id("flow-with-flow-trigger")
.namespace(TEST_NAMESPACE)
.tenantId(MAIN_TENANT)
.tasks(List.of(simpleLogTask()))
.triggers(List.of(
flowTriggerWithNoConditions()
))
.build();
var simpleFlowExecutionComingFromATest = Execution.newExecution(simpleFlow, EMPTY_LABELS)
.withState(State.Type.SUCCESS)
.toBuilder()
.kind(ExecutionKind.TEST)
.build();
var resultingExecutionsToRun = flowTriggerService.computeExecutionsFromFlowTriggers(
simpleFlowExecutionComingFromATest,
List.of(simpleFlow, flowWithFlowTrigger),
EMPTY_MULTIPLE_CONDITION_STORAGE
);
assertThat(resultingExecutionsToRun).size().isEqualTo(0);
}
private static Flow aSimpleFlow() {
return Flow.builder()
.id("simple-flow")
.namespace(TEST_NAMESPACE)
.tenantId(MAIN_TENANT)
.tasks(List.of(simpleLogTask()))
.build();
}
private static io.kestra.plugin.core.trigger.Flow flowTriggerWithNoConditions() {
return io.kestra.plugin.core.trigger.Flow.builder()
.id("flowTrigger")
.type(io.kestra.plugin.core.trigger.Flow.class.getName())
.build();
}
private static Log simpleLogTask() {
return Log.builder()
.id(IdUtils.create())
.type(Log.class.getName())
.message("Hello World")
.build();
}
}

View File

@@ -372,6 +372,51 @@ public class ForEachItemCaseTest {
assertThat(correlationId.get().value()).isEqualTo(execution.getId()); assertThat(correlationId.get().value()).isEqualTo(execution.getId());
} }
public void forEachItemWithAfterExecution() throws TimeoutException, InterruptedException, URISyntaxException, IOException, QueueException {
CountDownLatch countDownLatch = new CountDownLatch(26);
AtomicReference<Execution> triggered = new AtomicReference<>();
Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
if (execution.getFlowId().equals("for-each-item-subflow-after-execution") && execution.getState().getCurrent().isTerminated()) {
triggered.set(execution);
countDownLatch.countDown();
}
});
URI file = storageUpload();
Map<String, Object> inputs = Map.of("file", file.toString(), "batch", 4);
Execution execution = runnerUtils.runOne(MAIN_TENANT, TEST_NAMESPACE, "for-each-item-after-execution", null,
(flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, inputs),
Duration.ofSeconds(30));
// we should have triggered 26 subflows
assertThat(countDownLatch.await(1, TimeUnit.MINUTES)).isTrue();
receive.blockLast();
// assert on the main flow execution
assertThat(execution.getTaskRunList()).hasSize(5);
assertThat(execution.getTaskRunList().get(2).getAttempts()).hasSize(1);
assertThat(execution.getTaskRunList().get(2).getAttempts().getFirst().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
Map<String, Object> outputs = execution.getTaskRunList().get(2).getOutputs();
assertThat(outputs.get("numberOfBatches")).isEqualTo(26);
assertThat(outputs.get("iterations")).isNotNull();
Map<String, Integer> iterations = (Map<String, Integer>) outputs.get("iterations");
assertThat(iterations.get("CREATED")).isZero();
assertThat(iterations.get("RUNNING")).isZero();
assertThat(iterations.get("SUCCESS")).isEqualTo(26);
// assert on the last subflow execution
assertThat(triggered.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(triggered.get().getFlowId()).isEqualTo("for-each-item-subflow-after-execution");
assertThat((String) triggered.get().getInputs().get("items")).matches("kestra:///io/kestra/tests/for-each-item-after-execution/executions/.*/tasks/each-split/.*\\.txt");
assertThat(triggered.get().getTaskRunList()).hasSize(2);
Optional<Label> correlationId = triggered.get().getLabels().stream().filter(label -> label.key().equals(Label.CORRELATION_ID)).findAny();
assertThat(correlationId.isPresent()).isTrue();
assertThat(correlationId.get().value()).isEqualTo(execution.getId());
}
private URI storageUpload() throws URISyntaxException, IOException { private URI storageUpload() throws URISyntaxException, IOException {
File tempFile = File.createTempFile("file", ".txt"); File tempFile = File.createTempFile("file", ".txt");

View File

@@ -58,4 +58,15 @@ class ParallelTest {
assertThat(execution.findTaskRunsByTaskId("a2").getFirst().getState().getStartDate().isAfter(execution.findTaskRunsByTaskId("a1").getFirst().getState().getEndDate().orElseThrow())).isTrue(); assertThat(execution.findTaskRunsByTaskId("a2").getFirst().getState().getStartDate().isAfter(execution.findTaskRunsByTaskId("a1").getFirst().getState().getEndDate().orElseThrow())).isTrue();
assertThat(execution.findTaskRunsByTaskId("e2").getFirst().getState().getStartDate().isAfter(execution.findTaskRunsByTaskId("e1").getFirst().getState().getEndDate().orElseThrow())).isTrue(); assertThat(execution.findTaskRunsByTaskId("e2").getFirst().getState().getStartDate().isAfter(execution.findTaskRunsByTaskId("e1").getFirst().getState().getEndDate().orElseThrow())).isTrue();
} }
@Test
@ExecuteFlow("flows/valids/parallel-fail-with-flowable.yaml")
void parallelFailWithFlowable(Execution execution) {
assertThat(execution.getState().getCurrent()).isEqualTo(State.Type.FAILED);
assertThat(execution.getTaskRunList()).hasSize(5);
// all tasks must be terminated except the Sleep that will ends later as everything is concurrent
execution.getTaskRunList().stream()
.filter(taskRun -> !"sleep".equals(taskRun.getTaskId()))
.forEach(run -> assertThat(run.getState().isTerminated()).isTrue());
}
} }

View File

@@ -4,16 +4,24 @@ import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows; import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.Label; import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException; import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.ExecutionRepositoryInterface; import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.runners.RunnerUtils; import io.kestra.core.runners.RunnerUtils;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT; import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
@KestraTest(startRunner = true) @KestraTest(startRunner = true)
class SubflowRunnerTest { class SubflowRunnerTest {
@@ -24,6 +32,10 @@ class SubflowRunnerTest {
@Inject @Inject
private ExecutionRepositoryInterface executionRepository; private ExecutionRepositoryInterface executionRepository;
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue;
@Test @Test
@LoadFlows({"flows/valids/subflow-inherited-labels-child.yaml", "flows/valids/subflow-inherited-labels-parent.yaml"}) @LoadFlows({"flows/valids/subflow-inherited-labels-child.yaml", "flows/valids/subflow-inherited-labels-parent.yaml"})
void inheritedLabelsAreOverridden() throws QueueException, TimeoutException { void inheritedLabelsAreOverridden() throws QueueException, TimeoutException {
@@ -50,4 +62,29 @@ class SubflowRunnerTest {
new Label("parentFlowLabel2", "value2") // inherited from the parent flow new Label("parentFlowLabel2", "value2") // inherited from the parent flow
); );
} }
@Test
@LoadFlows({"flows/valids/subflow-parent-no-wait.yaml", "flows/valids/subflow-child-with-output.yaml"})
void subflowOutputWithoutWait() throws QueueException, TimeoutException, InterruptedException {
AtomicReference<Execution> childExecution = new AtomicReference<>();
CountDownLatch countDownLatch = new CountDownLatch(1);
Runnable closing = executionQueue.receive(either -> {
if (either.isLeft() && either.getLeft().getFlowId().equals("subflow-child-with-output") && either.getLeft().getState().isTerminated()) {
childExecution.set(either.getLeft());
countDownLatch.countDown();
}
});
Execution parentExecution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "subflow-parent-no-wait");
String childExecutionId = (String) parentExecution.findTaskRunsByTaskId("subflow").getFirst().getOutputs().get("executionId");
assertThat(childExecutionId).isNotBlank();
assertThat(parentExecution.getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(parentExecution.getTaskRunList()).hasSize(1);
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
assertThat(childExecution.get().getId()).isEqualTo(childExecutionId);
assertThat(childExecution.get().getState().getCurrent()).isEqualTo(State.Type.SUCCESS);
assertThat(childExecution.get().getTaskRunList()).hasSize(1);
closing.run();
}
} }

View File

@@ -206,14 +206,20 @@ class SetTest {
kv = createAndPerformSetTask("[{\"some\":\"value\"},{\"another\":\"value\"}]", KVType.JSON); kv = createAndPerformSetTask("[{\"some\":\"value\"},{\"another\":\"value\"}]", KVType.JSON);
assertThat(kv.getValue(TEST_KEY).orElseThrow().value()).isEqualTo(List.of(Map.of("some", "value"), Map.of("another", "value"))); assertThat(kv.getValue(TEST_KEY).orElseThrow().value()).isEqualTo(List.of(Map.of("some", "value"), Map.of("another", "value")));
kv = createAndPerformSetTask("{{ 200 }}", KVType.STRING);
assertThat(kv.getValue(TEST_KEY).orElseThrow().value()).isEqualTo("200");
kv = createAndPerformSetTask("{{ 200.1 }}", KVType.STRING);
assertThat(kv.getValue(TEST_KEY).orElseThrow().value()).isEqualTo("200.1");
} }
private KVStore createAndPerformSetTask(String value, KVType type) throws Exception { private KVStore createAndPerformSetTask(String value, KVType type) throws Exception {
Set set = Set.builder() Set set = Set.builder()
.id(Set.class.getSimpleName()) .id(Set.class.getSimpleName())
.type(Set.class.getName()) .type(Set.class.getName())
.key(new Property<>(TEST_KEY)) .key(Property.ofValue(TEST_KEY))
.value(new Property<>(value)) .value(value.contains("{{") ? Property.ofExpression(value) : Property.ofValue(value))
.kvType(Property.ofValue(type)) .kvType(Property.ofValue(type))
.build(); .build();
final RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, set, null); final RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, set, null);

View File

@@ -4,6 +4,7 @@ namespace: io.kestra.tests
tasks: tasks:
- id: cache - id: cache
type: io.kestra.core.runners.TaskCacheTest$CounterTask type: io.kestra.core.runners.TaskCacheTest$CounterTask
workingDir: "{{workingDir}}"
taskCache: taskCache:
enabled: true enabled: true
ttl: PT1S ttl: PT1S

View File

@@ -0,0 +1,17 @@
id: flow-concurrency-queue-after-execution
namespace: io.kestra.tests
concurrency:
behavior: QUEUE
limit: 1
tasks:
- id: sleep
type: io.kestra.plugin.core.flow.Sleep
duration: PT2S
afterExecution:
- id: afterExecution
type: io.kestra.plugin.core.output.OutputValues
values:
some: value

View File

@@ -0,0 +1,13 @@
id: flow-concurrency-queue-fail
namespace: io.kestra.tests
concurrency:
behavior: QUEUE
limit: 1
tasks:
- id: sleep
type: io.kestra.plugin.core.flow.Sleep
duration: PT2S
- id: fail
type: io.kestra.plugin.core.execution.Fail

View File

@@ -0,0 +1,26 @@
id: for-each-item-after-execution
namespace: io.kestra.tests
inputs:
- id: file
type: FILE
- id: batch
type: INT
tasks:
- id: each
type: io.kestra.plugin.core.flow.ForEachItem
items: "{{ inputs.file }}"
batch:
rows: "{{inputs.batch}}"
namespace: io.kestra.tests
flowId: for-each-item-subflow-after-execution
wait: true
transmitFailed: true
inputs:
items: "{{ taskrun.items }}"
afterExecution:
- id: afterExecution
type: io.kestra.plugin.core.log.Log
message: Hello from afterExecution!

View File

@@ -0,0 +1,16 @@
id: for-each-item-subflow-after-execution
namespace: io.kestra.tests
inputs:
- id: items
type: STRING
tasks:
- id: per-item
type: io.kestra.plugin.core.log.Log
message: "{{ inputs.items }}"
afterExecution:
- id: afterExecution
type: io.kestra.plugin.core.log.Log
message: Hello from afterExecution!

View File

@@ -0,0 +1,28 @@
id: parallel-fail-with-flowable
namespace: io.kestra.tests
inputs:
- id: user
type: STRING
defaults: Rick Astley
tasks:
- id: parallel
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: if-1
type: io.kestra.plugin.core.flow.If
condition: "{{ inputs.user == 'Rick Astley'}}"
then:
- id: sleep
type: io.kestra.plugin.core.flow.Sleep
duration: PT1S
- id: if-2
type: io.kestra.plugin.core.flow.If
condition: "{{ inputs.user == 'Rick Astley'}}"
then:
- id: fail_missing_variable
type: io.kestra.plugin.core.log.Log
message: "{{ vars.nonexistent_variable }}"

View File

@@ -0,0 +1,12 @@
id: subflow-child-with-output
namespace: io.kestra.tests
tasks:
- id: return
type: io.kestra.plugin.core.debug.Return
format: "Some value"
outputs:
- id: flow_a_output
type: STRING
value: "{{ outputs.return.value }}"

View File

@@ -0,0 +1,9 @@
id: subflow-parent-no-wait
namespace: io.kestra.tests
tasks:
- id: subflow
type: io.kestra.plugin.core.flow.Subflow
namespace: io.kestra.tests
flowId: subflow-child-with-output
wait: false

View File

@@ -1,6 +1,6 @@
version=0.24.0-SNAPSHOT version=0.24.3
org.gradle.jvmargs=-Xmx2g -XX:MaxMetaspaceSize=512m -XX:+HeapDumpOnOutOfMemoryError org.gradle.jvmargs=-Xmx2g -XX:MaxMetaspaceSize=512m -XX:+HeapDumpOnOutOfMemoryError
org.gradle.parallel=true org.gradle.parallel=true
org.gradle.caching=true org.gradle.caching=true
org.gradle.priority=low org.gradle.priority=low

View File

@@ -4,6 +4,7 @@ import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.Variables; import io.kestra.core.models.executions.Variables;
import io.kestra.core.models.flows.State; import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException; import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.UnsupportedMessageException;
import io.kestra.core.runners.WorkerTaskResult; import io.kestra.core.runners.WorkerTaskResult;
import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.IdUtils;
import io.kestra.jdbc.runner.JdbcQueueTest; import io.kestra.jdbc.runner.JdbcQueueTest;
@@ -31,7 +32,8 @@ class PostgresQueueTest extends JdbcQueueTest {
.build(); .build();
var exception = assertThrows(QueueException.class, () -> workerTaskResultQueue.emit(workerTaskResult)); var exception = assertThrows(QueueException.class, () -> workerTaskResultQueue.emit(workerTaskResult));
assertThat(exception.getMessage()).isEqualTo("Unable to emit a message to the queue"); assertThat(exception).isInstanceOf(UnsupportedMessageException.class);
assertThat(exception.getMessage()).contains("ERROR: unsupported Unicode escape sequence");
assertThat(exception.getCause()).isInstanceOf(DataException.class); assertThat(exception.getCause()).isInstanceOf(DataException.class);
} }
} }

View File

@@ -869,8 +869,8 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
@Override @Override
public List<Execution> lastExecutions( public List<Execution> lastExecutions(
@Nullable String tenantId, String tenantId,
List<FlowFilter> flows @Nullable List<FlowFilter> flows
) { ) {
return this.jdbcRepository return this.jdbcRepository
.getDslContextWrapper() .getDslContextWrapper()
@@ -892,14 +892,19 @@ public abstract class AbstractJdbcExecutionRepository extends AbstractJdbcReposi
.and(NORMAL_KIND_CONDITION) .and(NORMAL_KIND_CONDITION)
.and(field("end_date").isNotNull()) .and(field("end_date").isNotNull())
.and(DSL.or( .and(DSL.or(
flows ListUtils.emptyOnNull(flows).isEmpty() ?
.stream() DSL.trueCondition()
.map(flow -> DSL.and( :
field("namespace").eq(flow.getNamespace()), DSL.or(
field("flow_id").eq(flow.getId()) flows.stream()
)) .map(flow -> DSL.and(
.toList() field("namespace").eq(flow.getNamespace()),
)); field("flow_id").eq(flow.getId())
))
.toList()
)
)
);
Table<Record2<Object, Integer>> cte = subquery.asTable("cte"); Table<Record2<Object, Integer>> cte = subquery.asTable("cte");

View File

@@ -20,6 +20,12 @@ public class AbstractJdbcExecutionRunningStorage extends AbstractJdbcRepository
this.jdbcRepository = jdbcRepository; this.jdbcRepository = jdbcRepository;
} }
public void save(ExecutionRunning executionRunning) {
jdbcRepository.getDslContextWrapper().transaction(
configuration -> save(DSL.using(configuration), executionRunning)
);
}
public void save(DSLContext dslContext, ExecutionRunning executionRunning) { public void save(DSLContext dslContext, ExecutionRunning executionRunning) {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(executionRunning); Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(executionRunning);
this.jdbcRepository.persist(executionRunning, dslContext, fields); this.jdbcRepository.persist(executionRunning, dslContext, fields);

View File

@@ -72,6 +72,8 @@ public class JdbcExecutor implements ExecutorInterface, Service {
private static final ObjectMapper MAPPER = JdbcMapper.of(); private static final ObjectMapper MAPPER = JdbcMapper.of();
private final ScheduledExecutorService scheduledDelay = Executors.newSingleThreadScheduledExecutor(); private final ScheduledExecutorService scheduledDelay = Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> executionDelayFuture;
private ScheduledFuture<?> monitorSLAFuture;
@Inject @Inject
private AbstractJdbcExecutionRepository executionRepository; private AbstractJdbcExecutionRepository executionRepository;
@@ -312,14 +314,14 @@ public class JdbcExecutor implements ExecutorInterface, Service {
this.receiveCancellations.addFirst(this.executionRunningQueue.receive(Executor.class, this::executionRunningQueue)); this.receiveCancellations.addFirst(this.executionRunningQueue.receive(Executor.class, this::executionRunningQueue));
this.clusterEventQueue.ifPresent(clusterEventQueueInterface -> this.receiveCancellations.addFirst(clusterEventQueueInterface.receive(this::clusterEventQueue))); this.clusterEventQueue.ifPresent(clusterEventQueueInterface -> this.receiveCancellations.addFirst(clusterEventQueueInterface.receive(this::clusterEventQueue)));
ScheduledFuture<?> scheduledDelayFuture = scheduledDelay.scheduleAtFixedRate( executionDelayFuture = scheduledDelay.scheduleAtFixedRate(
this::executionDelaySend, this::executionDelaySend,
0, 0,
1, 1,
TimeUnit.SECONDS TimeUnit.SECONDS
); );
ScheduledFuture<?> scheduledSLAMonitorFuture = scheduledDelay.scheduleAtFixedRate( monitorSLAFuture = scheduledDelay.scheduleAtFixedRate(
this::executionSLAMonitor, this::executionSLAMonitor,
0, 0,
1, 1,
@@ -329,11 +331,13 @@ public class JdbcExecutor implements ExecutorInterface, Service {
// look at exceptions on the scheduledDelay thread // look at exceptions on the scheduledDelay thread
Thread.ofVirtual().name("jdbc-delay-exception-watcher").start( Thread.ofVirtual().name("jdbc-delay-exception-watcher").start(
() -> { () -> {
Await.until(scheduledDelayFuture::isDone); Await.until(executionDelayFuture::isDone);
try { try {
scheduledDelayFuture.get(); executionDelayFuture.get();
} catch (ExecutionException | InterruptedException | CancellationException e) { } catch (CancellationException ignored) {
} catch (ExecutionException | InterruptedException e) {
if (e.getCause() != null && e.getCause().getClass() != CannotCreateTransactionException.class) { if (e.getCause() != null && e.getCause().getClass() != CannotCreateTransactionException.class) {
log.error("Executor fatal exception in the scheduledDelay thread", e); log.error("Executor fatal exception in the scheduledDelay thread", e);
close(); close();
@@ -346,11 +350,13 @@ public class JdbcExecutor implements ExecutorInterface, Service {
// look at exceptions on the scheduledSLAMonitorFuture thread // look at exceptions on the scheduledSLAMonitorFuture thread
Thread.ofVirtual().name("jdbc-sla-monitor-exception-watcher").start( Thread.ofVirtual().name("jdbc-sla-monitor-exception-watcher").start(
() -> { () -> {
Await.until(scheduledSLAMonitorFuture::isDone); Await.until(monitorSLAFuture::isDone);
try { try {
scheduledSLAMonitorFuture.get(); monitorSLAFuture.get();
} catch (ExecutionException | InterruptedException | CancellationException e) { } catch (CancellationException ignored) {
} catch (ExecutionException | InterruptedException e) {
if (e.getCause() != null && e.getCause().getClass() != CannotCreateTransactionException.class) { if (e.getCause() != null && e.getCause().getClass() != CannotCreateTransactionException.class) {
log.error("Executor fatal exception in the scheduledSLAMonitor thread", e); log.error("Executor fatal exception in the scheduledSLAMonitor thread", e);
close(); close();
@@ -546,7 +552,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
} }
// create an SLA monitor if needed // create an SLA monitor if needed
if (execution.getState().getCurrent() == State.Type.CREATED && !ListUtils.isEmpty(flow.getSla())) { if ((execution.getState().getCurrent() == State.Type.CREATED || execution.getState().failedThenRestarted()) && !ListUtils.isEmpty(flow.getSla())) {
List<SLAMonitor> monitors = flow.getSla().stream() List<SLAMonitor> monitors = flow.getSla().stream()
.filter(ExecutionMonitoringSLA.class::isInstance) .filter(ExecutionMonitoringSLA.class::isInstance)
.map(ExecutionMonitoringSLA.class::cast) .map(ExecutionMonitoringSLA.class::cast)
@@ -562,7 +568,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
// handle concurrency limit, we need to use a different queue to be sure that execution running // handle concurrency limit, we need to use a different queue to be sure that execution running
// are processed sequentially so inside a queue with no parallelism // are processed sequentially so inside a queue with no parallelism
if (execution.getState().getCurrent() == State.Type.CREATED && flow.getConcurrency() != null) { if ((execution.getState().getCurrent() == State.Type.CREATED || execution.getState().failedThenRestarted()) && flow.getConcurrency() != null) {
ExecutionRunning executionRunning = ExecutionRunning.builder() ExecutionRunning executionRunning = ExecutionRunning.builder()
.tenantId(executor.getFlow().getTenantId()) .tenantId(executor.getFlow().getTenantId())
.namespace(executor.getFlow().getNamespace()) .namespace(executor.getFlow().getNamespace())
@@ -1065,7 +1071,11 @@ public class JdbcExecutor implements ExecutorInterface, Service {
executorService.log(log, false, executor); executorService.log(log, false, executor);
} }
// the terminated state can only come from the execution queue, in this case we always have a flow in the executor // the terminated state can come from the execution queue, in this case we always have a flow in the executor
// or from a worker task in an afterExecution block, in this case we need to load the flow
if (executor.getFlow() == null && executor.getExecution().getState().isTerminated()) {
executor = executor.withFlow(findFlow(executor.getExecution()));
}
boolean isTerminated = executor.getFlow() != null && executionService.isTerminated(executor.getFlow(), executor.getExecution()); boolean isTerminated = executor.getFlow() != null && executionService.isTerminated(executor.getFlow(), executor.getExecution());
// purge the executionQueue // purge the executionQueue
@@ -1121,8 +1131,16 @@ public class JdbcExecutor implements ExecutorInterface, Service {
executor.getFlow().getId(), executor.getFlow().getId(),
throwConsumer(queued -> { throwConsumer(queued -> {
var newExecution = queued.withState(State.Type.RUNNING); var newExecution = queued.withState(State.Type.RUNNING);
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment(); ExecutionRunning executionRunning = ExecutionRunning.builder()
.tenantId(newExecution.getTenantId())
.namespace(newExecution.getNamespace())
.flowId(newExecution.getFlowId())
.execution(newExecution)
.concurrencyState(ExecutionRunning.ConcurrencyState.RUNNING)
.build();
executionRunningStorage.save(executionRunning);
executionQueue.emit(newExecution); executionQueue.emit(newExecution);
metricRegistry.counter(MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT, MetricRegistry.METRIC_EXECUTOR_EXECUTION_POPPED_COUNT_DESCRIPTION, metricRegistry.tags(newExecution)).increment();
}) })
); );
} }
@@ -1207,13 +1225,13 @@ public class JdbcExecutor implements ExecutorInterface, Service {
try { try {
// Handle paused tasks // Handle paused tasks
if (executionDelay.getDelayType().equals(ExecutionDelay.DelayType.RESUME_FLOW) && !pair.getLeft().getState().isTerminated()) { if (executionDelay.getDelayType().equals(ExecutionDelay.DelayType.RESUME_FLOW) && !pair.getLeft().getState().isTerminated()) {
FlowInterface flow = flowMetaStore.findByExecution(pair.getLeft()).orElseThrow();
if (executionDelay.getTaskRunId() == null) { if (executionDelay.getTaskRunId() == null) {
// if taskRunId is null, this means we restart a flow that was delayed at startup (scheduled on) // if taskRunId is null, this means we restart a flow that was delayed at startup (scheduled on)
Execution markAsExecution = pair.getKey().withState(executionDelay.getState()); Execution markAsExecution = pair.getKey().withState(executionDelay.getState());
executor = executor.withExecution(markAsExecution, "pausedRestart"); executor = executor.withExecution(markAsExecution, "pausedRestart");
} else { } else {
// if there is a taskRun it means we restart a paused task // if there is a taskRun it means we restart a paused task
FlowInterface flow = flowMetaStore.findByExecution(pair.getLeft()).orElseThrow();
Execution markAsExecution = executionService.markAs( Execution markAsExecution = executionService.markAs(
pair.getKey(), pair.getKey(),
flow, flow,
@@ -1362,13 +1380,11 @@ public class JdbcExecutor implements ExecutorInterface, Service {
private Executor handleFailedExecutionFromExecutor(Executor executor, Exception e) { private Executor handleFailedExecutionFromExecutor(Executor executor, Exception e) {
Execution.FailedExecutionWithLog failedExecutionWithLog = executor.getExecution().failedExecutionFromExecutor(e); Execution.FailedExecutionWithLog failedExecutionWithLog = executor.getExecution().failedExecutionFromExecutor(e);
failedExecutionWithLog.getLogs().forEach(log -> { try {
try { logQueue.emitAsync(failedExecutionWithLog.getLogs());
logQueue.emitAsync(log); } catch (QueueException ex) {
} catch (QueueException ex) { // fail silently
// fail silently }
}
});
return executor.withExecution(failedExecutionWithLog.getExecution(), "exception"); return executor.withExecution(failedExecutionWithLog.getExecution(), "exception");
} }
@@ -1386,7 +1402,7 @@ public class JdbcExecutor implements ExecutorInterface, Service {
setState(ServiceState.TERMINATING); setState(ServiceState.TERMINATING);
this.receiveCancellations.forEach(Runnable::run); this.receiveCancellations.forEach(Runnable::run);
scheduledDelay.shutdown(); ExecutorsUtils.closeScheduledThreadPool(scheduledDelay, Duration.ofSeconds(5), List.of(executionDelayFuture, monitorSLAFuture));
setState(ServiceState.TERMINATED_GRACEFULLY); setState(ServiceState.TERMINATED_GRACEFULLY);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
@@ -1423,4 +1439,4 @@ public class JdbcExecutor implements ExecutorInterface, Service {
public ServiceState getState() { public ServiceState getState() {
return state.get(); return state.get();
} }
} }

View File

@@ -7,16 +7,13 @@ import com.google.common.collect.Iterables;
import io.kestra.core.exceptions.DeserializationException; import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.metrics.MetricRegistry; import io.kestra.core.metrics.MetricRegistry;
import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.Execution;
import io.kestra.core.queues.QueueException; import io.kestra.core.queues.*;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.QueueService;
import io.kestra.core.utils.Either; import io.kestra.core.utils.Either;
import io.kestra.core.utils.ExecutorsUtils; import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.IdUtils;
import io.kestra.jdbc.JdbcTableConfigs; import io.kestra.jdbc.JdbcTableConfigs;
import io.kestra.jdbc.JdbcMapper; import io.kestra.jdbc.JdbcMapper;
import io.kestra.jdbc.JooqDSLContextWrapper; import io.kestra.jdbc.JooqDSLContextWrapper;
import io.kestra.core.queues.MessageTooBigException;
import io.kestra.jdbc.repository.AbstractJdbcRepository; import io.kestra.jdbc.repository.AbstractJdbcRepository;
import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.Timer;
@@ -42,6 +39,7 @@ import java.util.function.BiConsumer;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Supplier; import java.util.function.Supplier;
import static io.kestra.core.utils.Rethrow.throwConsumer;
import static io.kestra.core.utils.Rethrow.throwRunnable; import static io.kestra.core.utils.Rethrow.throwRunnable;
@Slf4j @Slf4j
@@ -151,6 +149,11 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
.execute(); .execute();
}); });
} catch (DataException e) { // The exception is from the data itself, not the database/network/driver so instead of fail fast, we throw a recoverable QueueException } catch (DataException e) { // The exception is from the data itself, not the database/network/driver so instead of fail fast, we throw a recoverable QueueException
// Postgres refuses to store JSONB with the '\0000' codepoint as it has no textual representation.
// We try to detect that and fail with a specific exception so the Worker can recover from it.
if (e.getMessage() != null && e.getMessage().contains("ERROR: unsupported Unicode escape sequence")) {
throw new UnsupportedMessageException(e.getMessage(), e);
}
throw new QueueException("Unable to emit a message to the queue", e); throw new QueueException("Unable to emit a message to the queue", e);
} }
@@ -171,8 +174,8 @@ public abstract class JdbcQueue<T> implements QueueInterface<T> {
} }
@Override @Override
public void emitAsync(String consumerGroup, T message) throws QueueException { public void emitAsync(String consumerGroup, List<T> messages) throws QueueException {
this.asyncPoolExecutor.submit(throwRunnable(() -> this.emit(consumerGroup, message))); this.asyncPoolExecutor.submit(throwRunnable(() -> messages.forEach(throwConsumer(message -> this.emit(consumerGroup, message)))));
} }
@Override @Override

View File

@@ -27,7 +27,7 @@ dependencies {
// as Jackson is in the Micronaut BOM, to force its version we need to use enforcedPlatform but it didn't really work, see later :( // as Jackson is in the Micronaut BOM, to force its version we need to use enforcedPlatform but it didn't really work, see later :(
api enforcedPlatform("com.fasterxml.jackson:jackson-bom:$jacksonVersion") api enforcedPlatform("com.fasterxml.jackson:jackson-bom:$jacksonVersion")
api enforcedPlatform("org.slf4j:slf4j-api:$slf4jVersion") api enforcedPlatform("org.slf4j:slf4j-api:$slf4jVersion")
api platform("io.micronaut.platform:micronaut-platform:4.8.2") api platform("io.micronaut.platform:micronaut-platform:4.9.2")
api platform("io.qameta.allure:allure-bom:2.29.1") api platform("io.qameta.allure:allure-bom:2.29.1")
// we define cloud bom here for GCP, Azure and AWS so they are aligned for all plugins that use them (secret, storage, oss and ee plugins) // we define cloud bom here for GCP, Azure and AWS so they are aligned for all plugins that use them (secret, storage, oss and ee plugins)
api platform('com.google.cloud:libraries-bom:26.64.0') api platform('com.google.cloud:libraries-bom:26.64.0')
@@ -148,4 +148,4 @@ dependencies {
api "io.kestra:runner-memory:$version" api "io.kestra:runner-memory:$version"
api "io.kestra:storage-local:$version" api "io.kestra:storage-local:$version"
} }
} }

View File

@@ -38,6 +38,10 @@ public abstract class AbstractTaskRunnerTest {
@Test @Test
protected void run() throws Exception { protected void run() throws Exception {
var runContext = runContext(this.runContextFactory); var runContext = runContext(this.runContextFactory);
simpleRun(runContext);
}
private void simpleRun(RunContext runContext) throws Exception {
var commands = initScriptCommands(runContext); var commands = initScriptCommands(runContext);
Mockito.when(commands.getCommands()).thenReturn( Mockito.when(commands.getCommands()).thenReturn(
Property.ofValue(ScriptService.scriptCommands(List.of("/bin/sh", "-c"), Collections.emptyList(), List.of("echo 'Hello World'"))) Property.ofValue(ScriptService.scriptCommands(List.of("/bin/sh", "-c"), Collections.emptyList(), List.of("echo 'Hello World'")))
@@ -166,6 +170,13 @@ public abstract class AbstractTaskRunnerTest {
assertThat(taskException.getLogConsumer().getOutputs().get("logOutput")).isEqualTo("Hello World"); assertThat(taskException.getLogConsumer().getOutputs().get("logOutput")).isEqualTo("Hello World");
} }
@Test
protected void canWorkMultipleTimeInSameWdir() throws Exception {
var runContext = runContext(this.runContextFactory);
simpleRun(runContext);
simpleRun(runContext);
}
protected RunContext runContext(RunContextFactory runContextFactory) { protected RunContext runContext(RunContextFactory runContextFactory) {
return this.runContext(runContextFactory, null); return this.runContext(runContextFactory, null);
} }
@@ -236,4 +247,4 @@ public abstract class AbstractTaskRunnerTest {
protected boolean needsToSpecifyWorkingDirectory() { protected boolean needsToSpecifyWorkingDirectory() {
return false; return false;
} }
} }

14
ui/package-lock.json generated
View File

@@ -10,7 +10,7 @@
"hasInstallScript": true, "hasInstallScript": true,
"dependencies": { "dependencies": {
"@js-joda/core": "^5.6.5", "@js-joda/core": "^5.6.5",
"@kestra-io/ui-libs": "^0.0.228", "@kestra-io/ui-libs": "^0.0.232",
"@vue-flow/background": "^1.3.2", "@vue-flow/background": "^1.3.2",
"@vue-flow/controls": "^1.1.2", "@vue-flow/controls": "^1.1.2",
"@vue-flow/core": "^1.45.0", "@vue-flow/core": "^1.45.0",
@@ -1792,9 +1792,9 @@
} }
}, },
"node_modules/@eslint/plugin-kit": { "node_modules/@eslint/plugin-kit": {
"version": "0.3.3", "version": "0.3.4",
"resolved": "https://registry.npmjs.org/@eslint/plugin-kit/-/plugin-kit-0.3.3.tgz", "resolved": "https://registry.npmjs.org/@eslint/plugin-kit/-/plugin-kit-0.3.4.tgz",
"integrity": "sha512-1+WqvgNMhmlAambTvT3KPtCl/Ibr68VldY2XY40SL1CE0ZXiakFR/cbTspaF5HsnpDMvcYYoJHfl4980NBjGag==", "integrity": "sha512-Ul5l+lHEcw3L5+k8POx6r74mxEYKG5kOb6Xpy2gCRW6zweT6TEhAf8vhxGgjhqrd/VO/Dirhsb+1hNpD1ue9hw==",
"dev": true, "dev": true,
"license": "Apache-2.0", "license": "Apache-2.0",
"dependencies": { "dependencies": {
@@ -3133,9 +3133,9 @@
"license": "BSD-3-Clause" "license": "BSD-3-Clause"
}, },
"node_modules/@kestra-io/ui-libs": { "node_modules/@kestra-io/ui-libs": {
"version": "0.0.228", "version": "0.0.232",
"resolved": "https://registry.npmjs.org/@kestra-io/ui-libs/-/ui-libs-0.0.228.tgz", "resolved": "https://registry.npmjs.org/@kestra-io/ui-libs/-/ui-libs-0.0.232.tgz",
"integrity": "sha512-ZSUpBEhTJ7Ul0QtMU/ioDlgryoVwZv/BD1ko96q+m9sCA4Uab1yi2LUf+ZpEEzZWH8r37E/CNK6HNjG+tei7eA==", "integrity": "sha512-4Z1DNxWEZSEEy2Tv63uNf2remxb/IqVUY01/qCaeYjLcp5axrS7Dn43N8DspA4EPdlhe4JFq2RhG13Pom+JDQA==",
"dependencies": { "dependencies": {
"@nuxtjs/mdc": "^0.16.1", "@nuxtjs/mdc": "^0.16.1",
"@popperjs/core": "^2.11.8", "@popperjs/core": "^2.11.8",

View File

@@ -24,7 +24,7 @@
}, },
"dependencies": { "dependencies": {
"@js-joda/core": "^5.6.5", "@js-joda/core": "^5.6.5",
"@kestra-io/ui-libs": "^0.0.228", "@kestra-io/ui-libs": "^0.0.232",
"@vue-flow/background": "^1.3.2", "@vue-flow/background": "^1.3.2",
"@vue-flow/controls": "^1.1.2", "@vue-flow/controls": "^1.1.2",
"@vue-flow/core": "^1.45.0", "@vue-flow/core": "^1.45.0",
@@ -149,7 +149,7 @@
"@popperjs/core": "npm:@sxzz/popperjs-es@^2.11.7" "@popperjs/core": "npm:@sxzz/popperjs-es@^2.11.7"
}, },
"el-table-infinite-scroll": { "el-table-infinite-scroll": {
"vue": "$vue" "vue": "^3.5.18"
}, },
"storybook": "$storybook" "storybook": "$storybook"
}, },

View File

@@ -3,34 +3,37 @@
<el-table-column v-for="(column, index) in generateTableColumns" :key="index" :prop="column" :label="column"> <el-table-column v-for="(column, index) in generateTableColumns" :key="index" :prop="column" :label="column">
<template #default="scope"> <template #default="scope">
<template v-if="isComplex(scope.row[column])"> <template v-if="isComplex(scope.row[column])">
<editor <el-input
:full-height="false" type="textarea"
:input="true" :model-value="truncate(JSON.stringify(scope.row[column], null, 2))"
:navbar="false" readonly
:model-value="JSON.stringify(scope.row[column])" :rows="3"
lang="json" autosize
read-only class="ks-editor"
resize="none"
/> />
</template> </template>
<template v-else> <template v-else>
{{ scope.row[column] }} {{ truncate(scope.row[column]) }}
</template> </template>
</template> </template>
</el-table-column> </el-table-column>
</el-table> </el-table>
</template> </template>
<script> <script>
import Editor from "./inputs/Editor.vue";
export default { export default {
name: "ListPreview", name: "ListPreview",
components: {Editor},
props: { props: {
value: { value: {
type: Array, type: Array,
required: true required: true
} }
}, },
data() {
return {
maxColumnLength: 100
}
},
computed: { computed: {
generateTableColumns() { generateTableColumns() {
const allKeys = new Set(); const allKeys = new Set();
@@ -43,6 +46,12 @@
methods: { methods: {
isComplex(data) { isComplex(data) {
return data instanceof Array || data instanceof Object; return data instanceof Array || data instanceof Object;
},
truncate(text) {
if (typeof text !== "string") return text;
return text.length > this.maxColumnLength
? text.slice(0, this.maxColumnLength) + "..."
: text;
} }
} }
} }

View File

@@ -48,6 +48,7 @@
v-on="activeTab['v-on'] ?? {}" v-on="activeTab['v-on'] ?? {}"
ref="tabContent" ref="tabContent"
:is="activeTab.component" :is="activeTab.component"
:namespace="namespaceToForward"
@go-to-detail="blueprintId => selectedBlueprintId = blueprintId" @go-to-detail="blueprintId => selectedBlueprintId = blueprintId"
:embed="activeTab.props && activeTab.props.embed !== undefined ? activeTab.props.embed : true" :embed="activeTab.props && activeTab.props.embed !== undefined ? activeTab.props.embed : true"
/> />
@@ -163,16 +164,11 @@
}, },
getTabClasses(tab) { getTabClasses(tab) {
const isEnterpriseTab = tab.locked; const isEnterpriseTab = tab.locked;
const isGanttTab = tab.name === "gantt";
const ROUTES = ["/flows/edit/", "/namespaces/edit/"];
const EDIT_ROUTES = ROUTES.some(route => this.$route.path.startsWith(route));
const isOverviewTab = EDIT_ROUTES && tab.title === "Overview";
return { return {
"container": !isEnterpriseTab && !isOverviewTab, "container": !isEnterpriseTab,
"mt-4": !isEnterpriseTab && !isOverviewTab, "mt-4": !isEnterpriseTab,
"px-0": isEnterpriseTab && isOverviewTab, "px-0": isEnterpriseTab,
"gantt-container": isGanttTab
}; };
}, },
}, },
@@ -209,6 +205,11 @@
Object.entries(this.$attrs) Object.entries(this.$attrs)
.filter(([key]) => key !== "class") .filter(([key]) => key !== "class")
); );
},
namespaceToForward(){
return this.activeTab.props?.namespace ?? this.namespace;
// in the special case of Namespace creation on Namespaces page, the tabs are loaded before the namespace creation
// in this case this.props.namespace will be used
} }
} }
}; };

View File

@@ -1,6 +1,6 @@
<template> <template>
<div @click="handleClick" class="d-flex my-2 p-2 rounded element" :class="{'moved': moved}"> <div @click="handleClick" class="d-flex my-2 p-2 rounded element" :class="{'moved': moved}">
<div class="me-2 icon"> <div v-if="props.parentPathComplete !== 'inputs'" class="me-2 icon">
<TaskIcon :cls="element.type" :icons only-icon /> <TaskIcon :cls="element.type" :icons only-icon />
</div> </div>
@@ -85,6 +85,7 @@
<style scoped lang="scss"> <style scoped lang="scss">
@import "../../styles/code.scss"; @import "../../styles/code.scss";
@import "@kestra-io/ui-libs/src/scss/_color-palette";
.element { .element {
cursor: pointer; cursor: pointer;
@@ -107,7 +108,8 @@
} }
.playground-run-task{ .playground-run-task{
background-color: blue; color: $base-white;
background-color: $base-blue-400;
height: 16px; height: 16px;
width: 16px; width: 16px;
font-size: 4px; font-size: 4px;

View File

@@ -30,7 +30,7 @@
</template> </template>
<script setup lang="ts"> <script setup lang="ts">
import {onMounted, computed, inject, ref, provide} from "vue"; import {onMounted, computed, inject, ref, provide, onActivated} from "vue";
import {useI18n} from "vue-i18n"; import {useI18n} from "vue-i18n";
import {useStore} from "vuex"; import {useStore} from "vuex";
import {usePluginsStore} from "../../../stores/plugins"; import {usePluginsStore} from "../../../stores/plugins";
@@ -73,6 +73,10 @@
return !complexObject return !complexObject
} }
onActivated(() => {
pluginsStore.updateDocumentation();
});
function onTaskUpdateField(key: string, val: any) { function onTaskUpdateField(key: string, val: any) {
const realValue = val === null || val === undefined ? undefined : const realValue = val === null || val === undefined ? undefined :
// allow array to be created with null values (specifically for metadata) // allow array to be created with null values (specifically for metadata)
@@ -160,11 +164,8 @@
task: parsedFlow.value, task: parsedFlow.value,
}) })
const fieldsFromSchemaTop = computed(() => MAIN_KEYS.map(key => getFieldFromKey(key, "main"))) const fieldsFromSchemaTop = computed(() => MAIN_KEYS.map(key => getFieldFromKey(key, "main")))
const fieldsFromSchemaRest = computed(() => { const fieldsFromSchemaRest = computed(() => {
return Object.keys(pluginsStore.flowRootProperties ?? {}) return Object.keys(pluginsStore.flowRootProperties ?? {})
.filter((key) => !MAIN_KEYS.includes(key) && !HIDDEN_FIELDS.includes(key)) .filter((key) => !MAIN_KEYS.includes(key) && !HIDDEN_FIELDS.includes(key))

View File

@@ -14,11 +14,11 @@
/> />
</section> </section>
<Sections :key :dashboard :charts :show-default="dashboard.id === 'default'" :padding="padding" /> <Sections ref="dashboardComponent" :dashboard :charts :show-default="dashboard.id === 'default'" :padding="padding" />
</template> </template>
<script setup lang="ts"> <script setup lang="ts">
import {computed, onBeforeMount, ref} from "vue"; import {computed, onBeforeMount, ref, useTemplateRef} from "vue";
import type {Dashboard, Chart} from "./composables/useDashboards"; import type {Dashboard, Chart} from "./composables/useDashboards";
import {ALLOWED_CREATION_ROUTES, getDashboard, processFlowYaml} from "./composables/useDashboards"; import {ALLOWED_CREATION_ROUTES, getDashboard, processFlowYaml} from "./composables/useDashboards";
@@ -43,8 +43,6 @@
import YAML_FLOW from "./assets/default_flow_definition.yaml?raw"; import YAML_FLOW from "./assets/default_flow_definition.yaml?raw";
import YAML_NAMESPACE from "./assets/default_namespace_definition.yaml?raw"; import YAML_NAMESPACE from "./assets/default_namespace_definition.yaml?raw";
import UTILS from "../../utils/utils.js";
import {useRoute, useRouter} from "vue-router"; import {useRoute, useRouter} from "vue-router";
const route = useRoute(); const route = useRoute();
const router = useRouter(); const router = useRouter();
@@ -65,21 +63,18 @@
const dashboard = ref<Dashboard>({id: "", charts: []}); const dashboard = ref<Dashboard>({id: "", charts: []});
const charts = ref<Chart[]>([]); const charts = ref<Chart[]>([]);
// We use a key to force re-rendering of the Sections component
let key = ref(UTILS.uid());
const loadCharts = async (allCharts: Chart[] = []) => { const loadCharts = async (allCharts: Chart[] = []) => {
charts.value = []; charts.value = [];
for (const chart of allCharts) { for (const chart of allCharts) {
charts.value.push({...chart, content: stringify(chart)}); charts.value.push({...chart, content: stringify(chart)});
} }
refreshCharts()
}; };
const dashboardComponent = useTemplateRef("dashboardComponent");
const refreshCharts = () => { const refreshCharts = () => {
key.value = UTILS.uid(); dashboardComponent.value!.refreshCharts();
}; };
const load = async (id = "default", defaultYAML = YAML_MAIN) => { const load = async (id = "default", defaultYAML = YAML_MAIN) => {

View File

@@ -92,6 +92,20 @@ export function defaultConfig(override, theme) {
); );
} }
export function extractState(value) {
if (!value || typeof value !== "string") return value;
if (value.includes(",")) {
const stateNames = State.arrayAllStates().map(state => state.name);
const matchedState = value.split(",")
.map(part => part.trim())
.find(part => stateNames.includes(part.toUpperCase()));
return matchedState || value;
}
return value;
}
export function chartClick(moment, router, route, event, parsedData, elements, type = "label") { export function chartClick(moment, router, route, event, parsedData, elements, type = "label") {
const query = {}; const query = {};
@@ -107,7 +121,7 @@ export function chartClick(moment, router, route, event, parsedData, elements, t
state = parsedData.labels[element.index]; state = parsedData.labels[element.index];
} }
if (state) { if (state) {
query.state = state; query.state = extractState(state);
query.scope = "USER"; query.scope = "USER";
query.size = 100; query.size = 100;
query.page = 1; query.page = 1;
@@ -137,7 +151,7 @@ export function chartClick(moment, router, route, event, parsedData, elements, t
} }
if (event.state) { if (event.state) {
query.state = event.state; query.state = extractState(event.state);
} }
if (route.query.namespace) { if (route.query.namespace) {

View File

@@ -131,7 +131,7 @@ export function useChartGenerator(props: {chart: Chart; filters: string[]; showD
const data = ref(); const data = ref();
const generate = async (id: string, pagination?: { pageNumber: number; pageSize: number }) => { const generate = async (id: string, pagination?: { pageNumber: number; pageSize: number }) => {
const filters = props.filters.concat(decodeSearchParams(route.query, undefined, []) ?? []); const filters = props.filters.concat(decodeSearchParams(route.query, undefined, []) ?? []);
const parameters: Parameters = {...(pagination ?? {}), ...(filters ?? {})}; const parameters: Parameters = {...(pagination ?? {}), filters: (filters ?? {})};
if (!props.showDefault) { if (!props.showDefault) {
data.value = await dashboardStore.generate(id, props.chart.id, parameters); data.value = await dashboardStore.generate(id, props.chart.id, parameters);

View File

@@ -11,12 +11,12 @@
</template> </template>
<script lang="ts" setup> <script lang="ts" setup>
import {PropType, computed} from "vue"; import {PropType, computed, watch} from "vue";
import moment from "moment"; import moment from "moment";
import {Bar} from "vue-chartjs"; import {Bar} from "vue-chartjs";
import NoData from "../../layout/NoData.vue"; import NoData from "../../layout/NoData.vue";
import type {Chart} from "../composables/useDashboards"; import {Chart, getDashboard} from "../composables/useDashboards";
import {useChartGenerator} from "../composables/useDashboards"; import {useChartGenerator} from "../composables/useDashboards";
@@ -48,7 +48,7 @@
const DEFAULTS = { const DEFAULTS = {
display: true, display: true,
stacked: true, stacked: true,
ticks: {maxTicksLimit: 8 , stepSize: 1}, ticks: {maxTicksLimit: 8},
grid: {display: false}, grid: {display: false},
}; };
@@ -159,7 +159,19 @@
return {labels, datasets}; return {labels, datasets};
}); });
const {data: generated} = useChartGenerator(props); const {data: generated, generate} = useChartGenerator(props);
function refresh() {
return generate(getDashboard(route, "id")!);
}
defineExpose({
refresh
});
watch(() => route.params.filters, () => {
refresh();
}, {deep: true});
</script> </script>
<style lang="scss" scoped> <style lang="scss" scoped>
@@ -182,4 +194,4 @@
min-height: var(--chart-height); min-height: var(--chart-height);
max-height: var(--chart-height); max-height: var(--chart-height);
} }
</style> </style>

View File

@@ -10,12 +10,13 @@
</template> </template>
<script setup lang="ts"> <script setup lang="ts">
import {PropType} from "vue"; import {PropType, watch} from "vue";
import type {Chart} from "../composables/useDashboards"; import {Chart, getDashboard} from "../composables/useDashboards";
import {getChartTitle, getPropertyValue, useChartGenerator} from "../composables/useDashboards"; import {getChartTitle, getPropertyValue, useChartGenerator} from "../composables/useDashboards";
import NoData from "../../layout/NoData.vue"; import NoData from "../../layout/NoData.vue";
import {useRoute} from "vue-router";
const props = defineProps({ const props = defineProps({
chart: {type: Object as PropType<Chart>, required: true}, chart: {type: Object as PropType<Chart>, required: true},
@@ -23,7 +24,21 @@
showDefault: {type: Boolean, default: false}, showDefault: {type: Boolean, default: false},
}); });
const {percentageShown, EMPTY_TEXT, data} = useChartGenerator(props); const route = useRoute();
const {percentageShown, EMPTY_TEXT, data, generate} = useChartGenerator(props);
function refresh() {
return generate(getDashboard(route, "id")!);
}
defineExpose({
refresh
});
watch(() => route.params.filters, () => {
refresh();
}, {deep: true});
</script> </script>
<style scoped lang="scss"> <style scoped lang="scss">

View File

@@ -7,7 +7,7 @@
</template> </template>
<script setup lang="ts"> <script setup lang="ts">
import {PropType, onMounted, watch, ref} from "vue"; import {PropType, watch, ref} from "vue";
import type {RouteLocation} from "vue-router"; import type {RouteLocation} from "vue-router";
@@ -34,9 +34,17 @@
else data.value = props.chart.content ?? props.chart.source?.content; else data.value = props.chart.content ?? props.chart.source?.content;
}; };
const dashboardID = (route: RouteLocation) => getDashboard(route, "id") || "default" const dashboardID = (route: RouteLocation) => getDashboard(route, "id")!;
watch(route, async (changed) => await getData(dashboardID(changed))); function refresh() {
return getData(dashboardID(route));
}
onMounted(async () => await getData(dashboardID(route))); defineExpose({
refresh
});
watch(() => route.params.filters, () => {
refresh();
}, {deep: true, immediate: true});
</script> </script>

View File

@@ -22,9 +22,9 @@
</template> </template>
<script lang="ts" setup> <script lang="ts" setup>
import {computed,PropType} from "vue"; import {computed, PropType, watch} from "vue";
import type {Chart} from "../composables/useDashboards"; import {Chart, getDashboard} from "../composables/useDashboards";
import {useChartGenerator} from "../composables/useDashboards"; import {useChartGenerator} from "../composables/useDashboards";
@@ -183,7 +183,19 @@
}; };
}); });
const {data: generated} = useChartGenerator(props); const {data: generated, generate} = useChartGenerator(props);
function refresh() {
return generate(getDashboard(route, "id")!);
}
defineExpose({
refresh
});
watch(() => route.params.filters, () => {
refresh();
}, {deep: true});
</script> </script>
<style lang="scss" scoped> <style lang="scss" scoped>
@@ -192,4 +204,4 @@
.chart { .chart {
max-height: $height; max-height: $height;
} }
</style> </style>

View File

@@ -56,6 +56,7 @@
<div class="flex-grow-1"> <div class="flex-grow-1">
<component <component
ref="chartsComponents"
:is="TYPES[chart.type as keyof typeof TYPES]" :is="TYPES[chart.type as keyof typeof TYPES]"
:chart :chart
:filters :filters
@@ -89,6 +90,18 @@
import Download from "vue-material-design-icons/Download.vue"; import Download from "vue-material-design-icons/Download.vue";
import Pencil from "vue-material-design-icons/Pencil.vue"; import Pencil from "vue-material-design-icons/Pencil.vue";
const chartsComponents = ref<{refresh(): void}[]>();
function refreshCharts() {
chartsComponents.value!.forEach((component) => {
component.refresh();
});
}
defineExpose({
refreshCharts
});
const props = defineProps<{ const props = defineProps<{
dashboard: Dashboard; dashboard: Dashboard;
charts?: Chart[]; charts?: Chart[];

View File

@@ -34,7 +34,7 @@
</template> </template>
<script lang="ts" setup> <script lang="ts" setup>
import {PropType, onMounted, watch, ref, computed} from "vue"; import {PropType, watch, ref, computed} from "vue";
import type {RouteLocation} from "vue-router"; import type {RouteLocation} from "vue-router";
@@ -116,16 +116,24 @@
const dashboardID = (route: RouteLocation) => getDashboard(route, "id") as string; const dashboardID = (route: RouteLocation) => getDashboard(route, "id") as string;
const handlePageChange = async (options: { page: number; size: number }) => { const handlePageChange = (options: { page: number; size: number }) => {
if (pageNumber.value === options.page && pageSize.value === options.size) return; if (pageNumber.value === options.page && pageSize.value === options.size) return;
pageNumber.value = options.page; pageNumber.value = options.page;
pageSize.value = options.size; pageSize.value = options.size;
getData(dashboardID(route)); return getData(dashboardID(route));
}; };
watch(route, async (changed) => getData(dashboardID(changed))); function refresh() {
return getData(dashboardID(route));
}
onMounted(async () => getData(dashboardID(route))); defineExpose({
refresh
});
watch(() => route.params.filters, () => {
refresh();
}, {deep: true, immediate: true});
</script> </script>

View File

@@ -1,35 +1,39 @@
<template> <template>
<div :id="containerID" /> <div :id="containerID" />
<Bar <el-tooltip
v-if="generated !== undefined" v-if="generated !== undefined"
:data="parsedData" effect="light"
:options placement="top"
:plugins="chartOptions?.legend?.enabled ? [customBarLegend] : []" :persistent="false"
class="chart" :hide-after="0"
:class="chartOptions?.legend?.enabled ? 'with-legend' : ''" :popper-class="tooltipContent === '' ? 'd-none' : 'tooltip-stats'"
/> :content="tooltipContent"
<NoData v-else /> raw-content
>
<div>
<Bar
:data="parsedData"
:options
:plugins="chartOptions?.legend?.enabled ? [customBarLegend] : []"
:class="props.short ? 'short-chart' : 'chart'"
class="chart"
/>
</div>
</el-tooltip>
<NoData v-else-if="!props.short" />
</template> </template>
<script lang="ts" setup> <script lang="ts" setup>
import {PropType, computed} from "vue"; import {computed, ref, watch, PropType} from "vue";
import NoData from "../../layout/NoData.vue";
import {Bar} from "vue-chartjs";
import type {Chart} from "../composables/useDashboards";
import {useChartGenerator} from "../composables/useDashboards";
import {customBarLegend} from "../composables/useLegend";
import {defaultConfig, getConsistentHEXColor, chartClick} from "../composables/charts.js";
import moment from "moment";
import {useRoute, useRouter} from "vue-router"; import {useRoute, useRouter} from "vue-router";
import moment from "moment";
import {Bar} from "vue-chartjs";
import NoData from "../../layout/NoData.vue";
import {Chart, getDashboard, useChartGenerator} from "../composables/useDashboards";
import {customBarLegend} from "../composables/useLegend";
import {defaultConfig, getConsistentHEXColor, chartClick, tooltip} from "../composables/charts.js";
import {cssVariable, Utils} from "@kestra-io/ui-libs"; import {cssVariable, Utils} from "@kestra-io/ui-libs";
import KestraUtils, {useTheme} from "../../../utils/utils" import KestraUtils, {useTheme} from "../../../utils/utils";
const route = useRoute(); const route = useRoute();
const router = useRouter(); const router = useRouter();
@@ -39,30 +43,42 @@
chart: {type: Object as PropType<Chart>, required: true}, chart: {type: Object as PropType<Chart>, required: true},
filters: {type: Array as PropType<string[]>, default: () => []}, filters: {type: Array as PropType<string[]>, default: () => []},
showDefault: {type: Boolean, default: false}, showDefault: {type: Boolean, default: false},
short: {type: Boolean, default: false},
}); });
const containerID = `${props.chart.id}__${Math.random()}`; const containerID = `${props.chart.id}__${Math.random()}`;
const tooltipContent = ref("");
const {data, chartOptions} = props.chart; const {data, chartOptions} = props.chart;
const aggregator = Object.entries(data.columns) const aggregator = computed(() => {
.filter(([_, v]) => v.agg) return Object.entries(data.columns)
.sort((a, b) => a[1].graphStyle.localeCompare(b[1].graphStyle)); .filter(([_, v]) => v.agg)
const yBShown = aggregator.length === 2; .sort((a, b) => {
const aStyle = a[1].graphStyle || "";
const bStyle = b[1].graphStyle || "";
return aStyle.localeCompare(bStyle);
});
});
const yBShown = computed(() => aggregator.value.length === 2);
const theme = useTheme(); const theme = useTheme();
const DEFAULTS = { const DEFAULTS = {
display: true, display: true,
stacked: true, stacked: true,
ticks: {maxTicksLimit: 8, stepSize:1}, ticks: {maxTicksLimit: 8},
grid: {display: false}, grid: {display: false},
}; };
const options = computed(() => { const options = computed(() => {
return defaultConfig({ return defaultConfig({
skipNull: true, skipNull: true,
barThickness: 12, barThickness: props.short ? 8 : 12,
maxBarThickness: props.short ? 8 : 12,
categoryPercentage: props.short ? 1.0 : 0.8,
barPercentage: props.short ? 1.0 : 0.9,
borderSkipped: false, borderSkipped: false,
borderColor: "transparent", borderColor: "transparent",
borderWidth: 2, borderWidth: 2,
@@ -76,7 +92,7 @@
} }
: {}), : {}),
tooltip: { tooltip: {
enabled: true, enabled: props.short ? false : true,
filter: (value) => value.raw, filter: (value) => value.raw,
callbacks: { callbacks: {
label: (value) => { label: (value) => {
@@ -84,41 +100,46 @@
return `${value.dataset.tooltip}`; return `${value.dataset.tooltip}`;
}, },
}, },
external: (props.short) ? function (context) {
tooltipContent.value = tooltip(context.tooltip);
} : undefined,
}, },
}, },
scales: { scales: {
x: { x: {
title: { title: {
display: true, display: props.short ? false : true,
text: data.columns[chartOptions.column].displayName ?? chartOptions.column, text: data.columns[chartOptions.column].displayName ?? chartOptions.column,
}, },
position: "bottom", position: "bottom",
...DEFAULTS ...DEFAULTS,
display: props.short ? false : true,
}, },
y: { y: {
title: { title: {
display: true, display: props.short ? false : true,
text: aggregator[0][1].displayName ?? aggregator[0][0], text: aggregator.value[0]?.[1]?.displayName ?? aggregator.value[0]?.[0],
}, },
position: "left", position: "left",
...DEFAULTS, ...DEFAULTS,
display: props.short ? false : true,
ticks: { ticks: {
...DEFAULTS.ticks, ...DEFAULTS.ticks,
callback: value => isDuration(aggregator[0][1].field) ? Utils.humanDuration(value) : value callback: (value: any) => isDuration(aggregator.value[0]?.[1]?.field) ? Utils.humanDuration(value) : value
} }
}, },
...(yBShown && { ...(yBShown.value && {
yB: { yB: {
title: { title: {
display: true, display: props.short ? false : true,
text: aggregator[1][1].displayName ?? aggregator[1][0], text: aggregator.value[1]?.[1]?.displayName ?? aggregator.value[1]?.[0],
}, },
position: "right", position: "right",
...DEFAULTS, ...DEFAULTS,
display: true, display: props.short ? false : true,
ticks: { ticks: {
...DEFAULTS.ticks, ...DEFAULTS.ticks,
callback: value => isDuration(aggregator[1][1].field) ? Utils.humanDuration(value) : value callback: (value: any) => isDuration(aggregator.value[1]?.[1]?.field) ? Utils.humanDuration(value) : value
} }
}, },
}), }),
@@ -151,7 +172,7 @@
return Array.from(new Set(values)).sort(); return Array.from(new Set(values)).sort();
})(); })();
const aggregatorKeys = aggregator.map(([key]) => key); const aggregatorKeys = aggregator.value.map(([key]) => key);
const reducer = (array, field, yAxisID) => { const reducer = (array, field, yAxisID) => {
if (!array.length) return; if (!array.length) return;
@@ -164,8 +185,8 @@
.filter(key => !aggregatorKeys.includes(key)) .filter(key => !aggregatorKeys.includes(key))
.filter(key => key !== column); .filter(key => key !== column);
return array.reduce((acc, {...params}) => { return array.reduce((acc: any, {...params}) => {
const stack = `(${fields.map(field => params[field]).join(", ")}): ${aggregator.map(agg => agg[0] + " = " + (isDuration(agg[1].field) ? Utils.humanDuration(params[agg[0]]) : params[agg[0]])).join(", ")}`; const stack = `(${fields.map(field => params[field]).join(", ")}): ${aggregator.value.map(agg => agg[0] + " = " + (isDuration(agg[1].field) ? Utils.humanDuration(params[agg[0]]) : params[agg[0]])).join(", ")}`;
if (!acc[stack]) { if (!acc[stack]) {
acc[stack] = { acc[stack] = {
@@ -213,13 +234,13 @@
}); });
}; };
const yDataset = reducer(rawData, aggregator[0][0], "y"); const yDataset = reducer(rawData, aggregator.value[0][0], "y");
// Sorts the dataset array by the descending sum of 'data' values. // Sorts the dataset array by the descending sum of 'data' values.
// If two datasets have the same sum, it sorts them alphabetically by 'label'. // If two datasets have the same sum, it sorts them alphabetically by 'label'.
const yDatasetData = Object.values(getData(aggregator[0][0], yDataset)).sort((a, b) => { const yDatasetData = Object.values(getData(aggregator.value[0][0], yDataset)).sort((a: any, b: any) => {
const sumA = a.data.reduce((sum, val) => sum + val, 0); const sumA = a.data.reduce((sum: number, val: number) => sum + val, 0);
const sumB = b.data.reduce((sum, val) => sum + val, 0); const sumB = b.data.reduce((sum: number, val: number) => sum + val, 0);
if (sumB !== sumA) { if (sumB !== sumA) {
return sumB - sumA; // Descending by sum return sumB - sumA; // Descending by sum
@@ -228,10 +249,10 @@
return a.label.localeCompare(b.label); // Ascending alphabetically by label return a.label.localeCompare(b.label); // Ascending alphabetically by label
}); });
const label = aggregator?.[1]?.[1]?.displayName ?? aggregator?.[1]?.[1]?.field; const label = aggregator.value?.[1]?.[1]?.displayName ?? aggregator.value?.[1]?.[1]?.field;
let duration: number[] = []; let duration: number[] = [];
if(yBShown){ if(yBShown.value){
const helper = Array.from(new Set(rawData.map((v) => parseValue(v.date)))).sort(); const helper = Array.from(new Set(rawData.map((v) => parseValue(v.date)))).sort();
// Step 1: Group durations by formatted date // Step 1: Group durations by formatted date
@@ -247,7 +268,7 @@
return { return {
labels: xAxis, labels: xAxis,
datasets: yBShown datasets: yBShown.value
? [ ? [
{ {
yAxisID: "yB", yAxisID: "yB",
@@ -257,14 +278,26 @@
pointRadius: 0, pointRadius: 0,
borderWidth: 0.75, borderWidth: 0.75,
label: label, label: label,
borderColor: cssVariable("--ks-border-running") borderColor: props.short ? cssVariable("--ks-background-running") : cssVariable("--ks-border-running")
}, },
...yDatasetData, ...yDatasetData,
] ]
: yDatasetData, : yDatasetData,
}; };
}); });
const {data: generated} = useChartGenerator(props); const {data: generated, generate} = useChartGenerator(props);
function refresh() {
return generate(getDashboard(route, "id")!);
}
defineExpose({
refresh
});
watch(() => route.params.filters, () => {
refresh();
}, {deep: true});
</script> </script>
<style lang="scss" scoped> <style lang="scss" scoped>
@@ -278,4 +311,13 @@
min-height: var(--chart-height); min-height: var(--chart-height);
max-height: var(--chart-height); max-height: var(--chart-height);
} }
</style>
.short-chart {
&:not(.with-legend) {
#{--chart-height}: 40px;
}
min-height: var(--chart-height);
max-height: var(--chart-height);
}
</style>

View File

@@ -1,24 +1,22 @@
<template> <template>
<div class="execution-pending"> <EmptyTemplate class="queued">
<EmptyTemplate class="queued"> <img src="../../assets/queued_visual.svg" alt="Queued Execution">
<img src="../../assets/queued_visual.svg" alt="Queued Execution"> <h5 class="mt-4 fw-bold">
<h5 class="mt-4 fw-bold"> {{ $t('execution_status') }}
{{ $t('execution_status') }} <span
<span class="ms-2 px-2 py-1 rounded fs-7 fw-normal"
class="ms-2 px-2 py-1 rounded fs-7 fw-normal" :style="getStyle(execution.state.current)"
:style="getStyle(execution.state.current)" >
> {{ execution.state.current }}
{{ execution.state.current }} </span>
</span> </h5>
</h5> <p class="mt-4 mb-0">
<p class="mt-4 mb-0"> {{ $t('no_tasks_running') }}
{{ $t('no_tasks_running') }} </p>
</p> <p>
<p> {{ $t('execution_starts_progress') }}
{{ $t('execution_starts_progress') }} </p>
</p> </EmptyTemplate>
</EmptyTemplate>
</div>
</template> </template>
<script setup> <script setup>

View File

@@ -59,18 +59,12 @@
this.previousExecutionId = this.$route.params.id this.previousExecutionId = this.$route.params.id
}, },
watch: { watch: {
$route(newValue, oldValue) { $route() {
this.executionsStore.taskRun = undefined; this.executionsStore.taskRun = undefined;
if (oldValue.name === newValue.name && this.previousExecutionId !== this.$route.params.id) { if (this.previousExecutionId !== this.$route.params.id) {
this.follow()
}
// if we change the execution id, we need to close the sse
if (this.executionsStore.execution && this.$route.params.id != this.executionsStore.execution.id) {
this.executionsStore.closeSSE();
window.removeEventListener("popstate", this.follow)
this.executionsStore.execution = undefined;
this.$store.commit("flow/setFlow", undefined); this.$store.commit("flow/setFlow", undefined);
this.$store.commit("flow/setFlowGraph", undefined); this.$store.commit("flow/setFlowGraph", undefined);
this.follow();
} }
}, },
}, },
@@ -80,13 +74,6 @@
this.executionsStore.followExecution(this.$route.params, this.$t); this.executionsStore.followExecution(this.$route.params, this.$t);
}, },
getTabs() { getTabs() {
},
},
computed: {
...mapState("auth", ["user"]),
...mapStores(useCoreStore, useExecutionsStore),
tabs() {
return [ return [
{ {
name: undefined, name: undefined,
@@ -135,6 +122,13 @@
locked: true locked: true
} }
]; ];
}
},
computed: {
...mapState("auth", ["user"]),
...mapStores(useCoreStore, useExecutionsStore),
tabs() {
return this.getTabs();
}, },
routeInfo() { routeInfo() {
const ns = this.$route.params.namespace; const ns = this.$route.params.namespace;
@@ -212,4 +206,4 @@
.full-space { .full-space {
flex: 1 1 auto; flex: 1 1 auto;
} }
</style> </style>

View File

@@ -58,7 +58,7 @@
</template> </template>
<template v-if="showStatChart()" #top> <template v-if="showStatChart()" #top>
<Sections :dashboard="{id: 'default'}" :charts show-default /> <Sections ref="dashboardComponent" :dashboard="{id: 'default'}" :charts show-default />
</template> </template>
<template #table> <template #table>
@@ -260,7 +260,7 @@
class-name="shrink" class-name="shrink"
> >
<template #default="scope"> <template #default="scope">
<code>{{ scope.row.flowRevision }}</code> <code class="code-text">{{ scope.row.flowRevision }}</code>
</template> </template>
</el-table-column> </el-table-column>
@@ -293,7 +293,7 @@
</el-tooltip> </el-tooltip>
</template> </template>
<template #default="scope"> <template #default="scope">
<code> <code class="code-text">
{{ scope.row.taskRunList?.slice(-1)[0].taskId }} {{ scope.row.taskRunList?.slice(-1)[0].taskId }}
{{ {{
scope.row.taskRunList?.slice(-1)[0].attempts?.length > 1 ? `(${scope.row.taskRunList?.slice(-1)[0].attempts.length})` : "" scope.row.taskRunList?.slice(-1)[0].attempts?.length > 1 ? `(${scope.row.taskRunList?.slice(-1)[0].attempts.length})` : ""
@@ -771,6 +771,7 @@
}, },
refresh() { refresh() {
this.recomputeInterval = !this.recomputeInterval; this.recomputeInterval = !this.recomputeInterval;
this.$refs.dashboardComponent.refreshCharts();
this.load(); this.load();
}, },
selectionMapper(execution) { selectionMapper(execution) {
@@ -1122,6 +1123,9 @@
color: #ffb703; color: #ffb703;
} }
} }
.code-text {
color: var(--ks-content-primary);
}
</style> </style>
<style lang="scss"> <style lang="scss">

View File

@@ -1,51 +1,24 @@
<template> <template>
<el-button size="small" type="primary" :icon="EyeOutline" @click="getFilePreview"> <el-button
{{ $t("preview") }} size="small"
type="primary"
:icon="EyeOutline"
@click="getFilePreview"
:disabled="isZipFile"
>
{{ $t("preview.label") }}
</el-button> </el-button>
<drawer <drawer
v-if="selectedPreview === value && preview" v-if="selectedPreview === value && preview"
v-model="isPreviewOpen" v-model="isPreviewOpen"
> >
<template #header> <template #header>
{{ $t("preview") }} {{ $t("preview.label") }}
</template> </template>
<template #default> <template #default>
<el-alert v-if="preview.truncated" show-icon type="warning" :closable="false" class="mb-2"> <el-alert v-if="preview.truncated" show-icon type="warning" :closable="false" class="mb-2">
{{ $t('file preview truncated') }} {{ $t('file preview truncated') }}
</el-alert> </el-alert>
<list-preview v-if="preview.type === 'LIST'" :value="preview.content" />
<img v-else-if="preview.type === 'IMAGE'" :src="imageContent" alt="Image output preview">
<pdf-preview v-else-if="preview.type === 'PDF'" :source="preview.content" />
<markdown v-else-if="preview.type === 'MARKDOWN'" :source="preview.content" />
<editor
v-else
:model-value="preview.content"
:lang="extensionToMonacoLang"
read-only
input
:word-wrap="wordWrap"
:full-height="false"
:navbar="false"
class="position-relative"
>
<template #absolute>
<CopyToClipboard :text="preview.content">
<template #right>
<el-tooltip
:content="$t('toggle_word_wrap')"
placement="bottom"
:auto-close="2000"
>
<el-button
:icon="Wrap"
type="default"
@click="wordWrap = !wordWrap"
/>
</el-tooltip>
</template>
</CopyToClipboard>
</template>
</editor>
<el-form class="ks-horizontal max-size mt-3"> <el-form class="ks-horizontal max-size mt-3">
<el-form-item :label="$t('row count')"> <el-form-item :label="$t('row count')">
<el-select <el-select
@@ -81,7 +54,48 @@
/> />
</el-select> </el-select>
</el-form-item> </el-form-item>
<el-form-item :label="($t('preview.view'))">
<el-switch
v-model="forceEditor"
class="ml-3"
:active-text="$t('preview.force-editor')"
:inactive-text="$t('preview.auto-view')"
/>
</el-form-item>
</el-form> </el-form>
<list-preview v-if="!forceEditor && preview.type === 'LIST'" :value="preview.content" />
<img v-else-if="!forceEditor && preview.type === 'IMAGE'" :src="imageContent" alt="Image output preview">
<pdf-preview v-else-if="!forceEditor && preview.type === 'PDF'" :source="preview.content" />
<markdown v-else-if="!forceEditor && preview.type === 'MARKDOWN'" :source="preview.content" />
<editor
v-else
:model-value="!forceEditor ? preview.content : JSON.stringify(preview.content, null, 2)"
:lang="!forceEditor ? extensionToMonacoLang : 'json'"
read-only
input
:word-wrap="wordWrap"
:full-height="false"
:navbar="false"
class="position-relative"
>
<template #absolute>
<CopyToClipboard :text="!forceEditor ? preview.content : JSON.stringify(preview.content, null, 2)">
<template #right>
<el-tooltip
:content="$t('toggle_word_wrap')"
placement="bottom"
:auto-close="2000"
>
<el-button
:icon="Wrap"
type="default"
@click="wordWrap = !wordWrap"
/>
</el-tooltip>
</template>
</CopyToClipboard>
</template>
</editor>
</template> </template>
</drawer> </drawer>
</template> </template>
@@ -131,7 +145,8 @@
{value: "Cp500", label: "EBCDIC IBM-500"}, {value: "Cp500", label: "EBCDIC IBM-500"},
], ],
preview: undefined, preview: undefined,
wordWrap: false wordWrap: false,
forceEditor: false
} }
}, },
mounted() { mounted() {
@@ -163,13 +178,17 @@
return "data:image/" + this.extension + ";base64," + this.preview.content; return "data:image/" + this.extension + ";base64," + this.preview.content;
}, },
maxPreviewOptions() { maxPreviewOptions() {
return [10, 25, 100, 500, 1000, 5000, 10000, 25000, 50000].filter(value => value <= this.configPreviewMaxRows()) return [10, 25, 50, 100, 500, 1000, 5000, 10000, 25000, 50000].filter(value => value <= this.configPreviewMaxRows())
} },
isZipFile() {
// Checks if the file extension is .zip (case-insensitive)
return this.value?.toLowerCase().endsWith(".zip");
},
}, },
emits: ["preview"], emits: ["preview"],
methods: { methods: {
configPreviewInitialRows() { configPreviewInitialRows() {
return this.miscStore.configs?.preview.initial || 100 return this.miscStore.configs?.preview.initial || 50
}, },
configPreviewMaxRows() { configPreviewMaxRows() {
return this.miscStore.configs?.preview.max || 5000 return this.miscStore.configs?.preview.max || 5000

View File

@@ -45,8 +45,8 @@
</el-tooltip> </el-tooltip>
</el-form-item> </el-form-item>
<el-form-item> <el-form-item>
<el-button-group class="min-w-auto"> <el-button-group class="ks-b-group">
<restart :execution="executionsStore.execution" class="ms-0" @follow="forwardEvent('follow', $event)" /> <restart v-if="executionsStore.execution" :execution="executionsStore.execution" class="ms-0" @follow="forwardEvent('follow', $event)" />
<el-button @click="downloadContent()"> <el-button @click="downloadContent()">
<kicon :tooltip="$t('download logs')"> <kicon :tooltip="$t('download logs')">
<download /> <download />
@@ -60,7 +60,7 @@
</el-button-group> </el-button-group>
</el-form-item> </el-form-item>
<el-form-item> <el-form-item>
<el-button-group class="min-w-auto"> <el-button-group class="ks-b-group">
<el-button @click="loadLogs()"> <el-button @click="loadLogs()">
<kicon :tooltip="$t('refresh')"> <kicon :tooltip="$t('refresh')">
<refresh /> <refresh />
@@ -361,4 +361,9 @@
align-items: flex-start; align-items: flex-start;
} }
} }
.ks-b-group {
min-width: auto!important;
max-width: max-content !important;
}
</style> </style>

View File

@@ -102,7 +102,8 @@
loadDefinition() { loadDefinition() {
this.executionsStore.loadFlowForExecution({ this.executionsStore.loadFlowForExecution({
flowId: this.execution.flowId, flowId: this.execution.flowId,
namespace: this.execution.namespace namespace: this.execution.namespace,
store: true
}); });
}, },
}, },

View File

@@ -37,13 +37,14 @@
</div> </div>
<div class="d-flex flex-column p-3 debug"> <div class="d-flex flex-column p-3 debug">
<editor <Editor
ref="debugEditor" ref="debugEditor"
:full-height="false" :full-height="false"
:custom-height="20" :custom-height="20"
:input="true" :input="true"
:navbar="false" :navbar="false"
:model-value="computedDebugValue" :model-value="computedDebugValue"
@update:model-value="editorValue = $event"
@confirm="onDebugExpression($event)" @confirm="onDebugExpression($event)"
class="w-100" class="w-100"
/> />
@@ -53,7 +54,7 @@
:icon="Refresh" :icon="Refresh"
@click=" @click="
onDebugExpression( onDebugExpression(
debugEditor.editor.getValue(), editorValue.length > 0 ? editorValue : computedDebugValue,
) )
" "
class="mt-3" class="mt-3"
@@ -61,7 +62,7 @@
{{ $t("eval.render") }} {{ $t("eval.render") }}
</el-button> </el-button>
<editor <Editor
v-if="debugExpression" v-if="debugExpression"
:read-only="true" :read-only="true"
:input="true" :input="true"
@@ -98,7 +99,7 @@
<VarValue <VarValue
v-if="selectedValue && displayVarValue()" v-if="selectedValue && displayVarValue()"
:value="selectedValue.uri ? selectedValue.uri : selectedValue" :value="selectedValue?.uri ? selectedValue?.uri : selectedValue"
:execution="execution" :execution="execution"
/> />
</div> </div>
@@ -129,8 +130,9 @@
}>(); }>();
const cascader = ref<any>(null); const cascader = ref<any>(null);
const debugEditor = ref<any>(null); const debugEditor = ref<InstanceType<typeof Editor>>();
const selected = ref<string[]>([]); const selected = ref<string[]>([]);
const editorValue = ref("");
const debugExpression = ref(""); const debugExpression = ref("");
const debugError = ref(""); const debugError = ref("");
const debugStackTrace = ref(""); const debugStackTrace = ref("");
@@ -425,4 +427,4 @@
font-size: var(--el-font-size-base); font-size: var(--el-font-size-base);
} }
} }
</style> </style>

View File

@@ -80,6 +80,7 @@
:input="true" :input="true"
:navbar="false" :navbar="false"
:model-value="computedDebugValue" :model-value="computedDebugValue"
@update:model-value="editorValue = $event"
@confirm="onDebugExpression($event)" @confirm="onDebugExpression($event)"
class="w-100" class="w-100"
/> />
@@ -88,8 +89,9 @@
type="primary" type="primary"
@click=" @click="
onDebugExpression( onDebugExpression(
debugEditor.editor.getValue(), editorValue.length > 0 ? editorValue : computedDebugValue,
) )
" "
class="mt-3" class="mt-3"
> >
@@ -163,8 +165,9 @@
import CopyToClipboard from "../../layout/CopyToClipboard.vue"; import CopyToClipboard from "../../layout/CopyToClipboard.vue";
import Editor from "../../inputs/Editor.vue"; import Editor from "../../inputs/Editor.vue";
const editorValue = ref("");
const debugCollapse = ref(""); const debugCollapse = ref("");
const debugEditor = ref(null); const debugEditor = ref<InstanceType<typeof Editor>>();
const debugExpression = ref(""); const debugExpression = ref("");
const computedDebugValue = computed(() => { const computedDebugValue = computed(() => {
const formatTask = (task) => { const formatTask = (task) => {
@@ -422,7 +425,7 @@
const displayVarValue = () => const displayVarValue = () =>
isFile(selectedValue.value) || isFile(selectedValue.value) ||
selectedValue.value !== debugExpression.value; selectedValue.value !== debugExpression.value;
const leftWidth = ref(70); const leftWidth = ref(70);
const startDragging = (event: MouseEvent) => { const startDragging = (event: MouseEvent) => {
const startX = event.clientX; const startX = event.clientX;

View File

@@ -72,7 +72,7 @@
import {computed, getCurrentInstance, ref, Ref, watch} from "vue"; import {computed, getCurrentInstance, ref, Ref, watch} from "vue";
import Utils, {useTheme} from "../../utils/utils"; import Utils, {useTheme} from "../../utils/utils";
import {Buttons, Property, Shown} from "./utils/types"; import {Buttons, Property, Shown} from "./utils/types";
import {editor, KeyCode} from "monaco-editor/esm/vs/editor/editor.api"; import * as monaco from "monaco-editor";
import Items from "./segments/Items.vue"; import Items from "./segments/Items.vue";
import {cssVariable} from "@kestra-io/ui-libs"; import {cssVariable} from "@kestra-io/ui-libs";
import {LocationQuery, useRoute, useRouter} from "vue-router"; import {LocationQuery, useRoute, useRouter} from "vue-router";
@@ -370,7 +370,7 @@
}; };
const theme = useTheme(); const theme = useTheme();
const themeComputed: Ref<Omit<Partial<editor.IStandaloneThemeData>, "base"> & { base: ThemeBase }> = ref({ const themeComputed: Ref<Omit<Partial<monaco.editor.IStandaloneThemeData>, "base"> & { base: ThemeBase }> = ref({
base: Utils.getTheme()!, base: Utils.getTheme()!,
colors: { colors: {
"editor.background": cssVariable("--ks-background-input")! "editor.background": cssVariable("--ks-background-input")!
@@ -392,7 +392,7 @@
}, {immediate: true}); }, {immediate: true});
const options: editor.IStandaloneEditorConstructionOptions = { const options: monaco.editor.IStandaloneEditorConstructionOptions = {
lineNumbers: "off", lineNumbers: "off",
folding: false, folding: false,
renderLineHighlight: "none", renderLineHighlight: "none",
@@ -436,7 +436,27 @@
const monacoEditor = ref<typeof MonacoEditor>(); const monacoEditor = ref<typeof MonacoEditor>();
const editorDidMount = (mountedEditor: editor.IStandaloneCodeEditor) => { const updateQuery = () => {
const newQuery = {
...Object.fromEntries(queryParamsToKeep.value.map(key => {
return [
key,
route.query[key]
]
})),
...filterQueryString.value
};
if (_isEqual(route.query, newQuery)) {
props.buttons.refresh?.callback?.();
return; // Skip if the query hasn't changed
}
skipRouteWatcherOnce.value = true;
router.push({
query: newQuery
});
};
const editorDidMount = (mountedEditor: monaco.editor.IStandaloneCodeEditor) => {
mountedEditor.onDidContentSizeChange((e) => { mountedEditor.onDidContentSizeChange((e) => {
if (monacoEditor.value === undefined) { if (monacoEditor.value === undefined) {
return; return;
@@ -445,22 +465,42 @@
e.contentHeight + "px"; e.contentHeight + "px";
}); });
mountedEditor.onKeyDown((e) => { mountedEditor.addAction({
if (e.keyCode === KeyCode.Enter) { id: "accept_kestra_filter",
const suggestController = mountedEditor.getContribution("editor.contrib.suggestController") as any; label: "Accept Kestra Filter",
keybindingContext: "!suggestWidgetVisible",
if (suggestController && suggestController.widget) { keybindings: [monaco.KeyCode.Enter],
return; run: () => {
const model = mountedEditor.getModel();
if (!model) return;
const currentValue = model.getValue();
if (currentValue.trim().length > 0) {
const position = mountedEditor.getPosition();
const endPosition = model.getPositionAt(currentValue.length);
if (
position &&
position.lineNumber === endPosition.lineNumber &&
position.column === endPosition.column &&
!currentValue.endsWith(" ")
) {
mountedEditor.executeEdits("", [
{
range: new monaco.Range(position.lineNumber, position.column, position.lineNumber, position.column),
text: " ",
forceMoveMarkers: true
}
]);
mountedEditor.trigger("enterPressed", "editor.action.triggerSuggest", {});
}
} }
e.preventDefault(); updateQuery();
e.stopPropagation();
} }
}); });
mountedEditor.onDidChangeModelContent(e => { mountedEditor.onDidChangeModelContent(e => {
if (e.changes.length === 1 && e.changes[0].text === " ") { if (e.changes.length === 1 && (e.changes[0].text === " " || e.changes[0].text === "\n")) {
const model = mountedEditor.getModel(); if (mountedEditor.getModel()?.getValue().charAt(e.changes[0].rangeOffset - 1) === ",") {
if (model && model.getValue().charAt(e.changes[0].rangeOffset - 1) === ",") {
mountedEditor.executeEdits("", [ mountedEditor.executeEdits("", [
{ {
range: { range: {
@@ -474,39 +514,10 @@
]); ]);
} }
} }
// Remove any newlines (e.g., with paste)
if (e.changes.some(change => change.text.includes("\n"))) {
const model = mountedEditor.getModel();
if (model) {
const currentValue = model.getValue();
if (currentValue.includes("\n")) {
const newValue = currentValue.replace(/\n/g, " ");
model.setValue(newValue);
}
}
}
}); });
}; };
watchDebounced(filterQueryString, () => { watchDebounced(filterQueryString, updateQuery, {immediate: true, debounce: 1000});
const newQuery = {
...Object.fromEntries(queryParamsToKeep.value.map(key => {
return [
key,
route.query[key]
];
})),
...filterQueryString.value
};
if (_isEqual(route.query, newQuery)) {
return; // Skip if the query hasn't changed
}
skipRouteWatcherOnce.value = true;
router.push({
query: newQuery
});
}, {immediate: true, debounce: 1000});
</script> </script>
<style lang="scss" scoped> <style lang="scss" scoped>
@@ -520,7 +531,7 @@
border-bottom-right-radius: var(--el-border-radius-base); border-bottom-right-radius: var(--el-border-radius-base);
min-width: 0; min-width: 0;
.mtk25, .mtk28{ .mtk25, .mtk28 {
background-color: var(--ks-badge-background); background-color: var(--ks-badge-background);
padding: 2px 6px; padding: 2px 6px;
border-radius: var(--el-border-radius-base); border-radius: var(--el-border-radius-base);

Some files were not shown because too many files have changed in this diff Show More